diff mbox series

[v5,5/8] hash: add read and write concurrency support

Message ID 1531242001-381104-6-git-send-email-yipeng1.wang@intel.com (mailing list archive)
State Accepted, archived
Headers show
Series Add read-write concurrency to rte_hash library | expand

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Wang, Yipeng1 July 10, 2018, 4:59 p.m. UTC
The existing implementation of librte_hash does not support read-write
concurrency. This commit implements read-write safety using rte_rwlock
and rte_rwlock TM version if hardware transactional memory is available.

Both multi-writer and read-write concurrency is protected by rte_rwlock
now. The x86 specific header file is removed since the x86 specific RTM
function is not called directly by rte hash now.

Signed-off-by: Yipeng Wang <yipeng1.wang@intel.com>
Acked-by: Pablo de Lara <pablo.de.lara.guarch@intel.com>
---
 lib/librte_hash/meson.build           |   1 -
 lib/librte_hash/rte_cuckoo_hash.c     | 520 ++++++++++++++++++++++------------
 lib/librte_hash/rte_cuckoo_hash.h     |  18 +-
 lib/librte_hash/rte_cuckoo_hash_x86.h | 167 -----------
 lib/librte_hash/rte_hash.h            |   3 +
 5 files changed, 348 insertions(+), 361 deletions(-)
 delete mode 100644 lib/librte_hash/rte_cuckoo_hash_x86.h

Comments

Stephen Hemminger July 11, 2018, 8:49 p.m. UTC | #1
On Tue, 10 Jul 2018 09:59:58 -0700
Yipeng Wang <yipeng1.wang@intel.com> wrote:

> +
> +static inline void
> +__hash_rw_reader_lock(const struct rte_hash *h)
> +{
> +	if (h->readwrite_concur_support && h->hw_trans_mem_support)
> +		rte_rwlock_read_lock_tm(h->readwrite_lock);
> +	else if (h->readwrite_concur_support)
> +		rte_rwlock_read_lock(h->readwrite_lock);
> +}
> +
> +static inline void
> +__hash_rw_writer_unlock(const struct rte_hash *h)
> +{
> +	if (h->multi_writer_support && h->hw_trans_mem_support)
> +		rte_rwlock_write_unlock_tm(h->readwrite_lock);
> +	else if (h->multi_writer_support)
> +		rte_rwlock_write_unlock(h->readwrite_lock);
> +}
> +
> +static inline void
> +__hash_rw_reader_unlock(const struct rte_hash *h)
> +{
> +	if (h->readwrite_concur_support && h->hw_trans_mem_support)
> +		rte_rwlock_read_unlock_tm(h->readwrite_lock);
> +	else if (h->readwrite_concur_support)
> +		rte_rwlock_read_unlock(h->readwrite_lock);
> +}
> +

For small windows, reader-writer locks are slower than a spin lock
because there are more cache bounces.
Wang, Yipeng1 July 12, 2018, 1:22 a.m. UTC | #2
Hi, Stephen,

You are correct and we understand that spinlock might be slightly faster than counter based rwlock in this case. However, the counter based rwlock is the exception path when TSX fails.

If performance of this exception path is a big concern, a more optimal read-write lock scheme (e.g. TLRW) should be introduced into rte_rwlock in the future.

Thanks
Yipeng

>-----Original Message-----
>From: Stephen Hemminger [mailto:stephen@networkplumber.org]
>
>For small windows, reader-writer locks are slower than a spin lock
>because there are more cache bounces.
Thomas Monjalon July 12, 2018, 8:30 p.m. UTC | #3
12/07/2018 03:22, Wang, Yipeng1:
> From: Stephen Hemminger [mailto:stephen@networkplumber.org]
> 
> > For small windows, reader-writer locks are slower than a spin lock
> > because there are more cache bounces.
> 
> Hi, Stephen,
> 
> You are correct and we understand that spinlock might be slightly faster than counter based rwlock in this case. However, the counter based rwlock is the exception path when TSX fails.
> 
> If performance of this exception path is a big concern, a more optimal read-write lock scheme (e.g. TLRW) should be introduced into rte_rwlock in the future.

Something like this?
	eal/rwlocks: Try read/write and relock write to read locks added
	https://patches.dpdk.org/patch/40254/
Wang, Yipeng1 July 13, 2018, 1:55 a.m. UTC | #4
Thanks for pointing me to this patch.

I guess the try-locks still does not solve the overhead of multiple readers contending the counter.  It just provides a non-blocking version of the same algorithm.

The relock function looks interesting. But it would be helpful if any special use case or example is given. Specifically, who should call this function, the reader or the writer?  Leonid, could you provide more context?

The TLRW example I gave is potentially a better rw-lock algorithm. Paper Reference:  D. Dice and N Shavit. "TLRW: return of the read-write lock". In such scheme, readers won't contend the same reader counter which introduces heavy cache bouncing that Stephen mentioned.  Maybe we should introduce similar algorithm into rte_rwlock library.

>-----Original Message-----
>From: Thomas Monjalon [mailto:thomas@monjalon.net]
>Sent: Thursday, July 12, 2018 1:30 PM
>To: Wang, Yipeng1 <yipeng1.wang@intel.com>; Stephen Hemminger <stephen@networkplumber.org>
>Cc: dev@dpdk.org; De Lara Guarch, Pablo <pablo.de.lara.guarch@intel.com>; Richardson, Bruce <bruce.richardson@intel.com>;
>honnappa.nagarahalli@arm.com; vguvva@caviumnetworks.com; brijesh.s.singh@gmail.com; Wang, Ren <ren.wang@intel.com>;
>Gobriel, Sameh <sameh.gobriel@intel.com>; Tai, Charlie <charlie.tai@intel.com>
>Subject: Re: [dpdk-dev] [PATCH v5 5/8] hash: add read and write concurrency support
>
>12/07/2018 03:22, Wang, Yipeng1:
>> From: Stephen Hemminger [mailto:stephen@networkplumber.org]
>>
>> > For small windows, reader-writer locks are slower than a spin lock
>> > because there are more cache bounces.
>>
>> Hi, Stephen,
>>
>> You are correct and we understand that spinlock might be slightly faster than counter based rwlock in this case. However, the
>counter based rwlock is the exception path when TSX fails.
>>
>> If performance of this exception path is a big concern, a more optimal read-write lock scheme (e.g. TLRW) should be introduced into
>rte_rwlock in the future.
>
>Something like this?
>	eal/rwlocks: Try read/write and relock write to read locks added
>	https://patches.dpdk.org/patch/40254/
>
>
Leonid Myravjev Aug. 17, 2018, 12:51 p.m. UTC | #5
> I guess the try-locks still does not solve the overhead of multiple readers contending the counter.  It just provides a non-blocking version of the same algorithm.

DPDK project does not use any rwlock for solving any overhead problem
of multiple reader (at least I did not find it). For non-critical
sections it is often easier to write via rwlock (for to simplify the
code). The try locks, relock to read and other are use only for even
more simplifies uncritical code. For example some process need change
data (write lock), then reread the contents (read lock), it can
release write and lock read, but relock to read is easier. Process in
the write section can switch to read context and give control for
reader function.
I sought by third-party solution for rwlock, and decided that
rte_rwlock is the best choice for it, especially if there is a relock
to read, and try locks.

P.S. For solving the overhead problem of multiple readers (if using of
locks is _extremely_ required), best practice may be MCS lock and
rwlock based on the MCS locks.




---
Best regards,
Leonid Myravjev

On Fri, 13 Jul 2018 at 04:55, Wang, Yipeng1 <yipeng1.wang@intel.com> wrote:
>
> Thanks for pointing me to this patch.
>
> I guess the try-locks still does not solve the overhead of multiple readers contending the counter.  It just provides a non-blocking version of the same algorithm.
>
> The relock function looks interesting. But it would be helpful if any special use case or example is given. Specifically, who should call this function, the reader or the writer?  Leonid, could you provide more context?
>
> The TLRW example I gave is potentially a better rw-lock algorithm. Paper Reference:  D. Dice and N Shavit. "TLRW: return of the read-write lock". In such scheme, readers won't contend the same reader counter which introduces heavy cache bouncing that Stephen mentioned.  Maybe we should introduce similar algorithm into rte_rwlock library.
>
> >-----Original Message-----
> >From: Thomas Monjalon [mailto:thomas@monjalon.net]
> >Sent: Thursday, July 12, 2018 1:30 PM
> >To: Wang, Yipeng1 <yipeng1.wang@intel.com>; Stephen Hemminger <stephen@networkplumber.org>
> >Cc: dev@dpdk.org; De Lara Guarch, Pablo <pablo.de.lara.guarch@intel.com>; Richardson, Bruce <bruce.richardson@intel.com>;
> >honnappa.nagarahalli@arm.com; vguvva@caviumnetworks.com; brijesh.s.singh@gmail.com; Wang, Ren <ren.wang@intel.com>;
> >Gobriel, Sameh <sameh.gobriel@intel.com>; Tai, Charlie <charlie.tai@intel.com>
> >Subject: Re: [dpdk-dev] [PATCH v5 5/8] hash: add read and write concurrency support
> >
> >12/07/2018 03:22, Wang, Yipeng1:
> >> From: Stephen Hemminger [mailto:stephen@networkplumber.org]
> >>
> >> > For small windows, reader-writer locks are slower than a spin lock
> >> > because there are more cache bounces.
> >>
> >> Hi, Stephen,
> >>
> >> You are correct and we understand that spinlock might be slightly faster than counter based rwlock in this case. However, the
> >counter based rwlock is the exception path when TSX fails.
> >>
> >> If performance of this exception path is a big concern, a more optimal read-write lock scheme (e.g. TLRW) should be introduced into
> >rte_rwlock in the future.
> >
> >Something like this?
> >       eal/rwlocks: Try read/write and relock write to read locks added
> >       https://patches.dpdk.org/patch/40254/
> >
> >
>
diff mbox series

Patch

diff --git a/lib/librte_hash/meson.build b/lib/librte_hash/meson.build
index e139e1d..efc06ed 100644
--- a/lib/librte_hash/meson.build
+++ b/lib/librte_hash/meson.build
@@ -6,7 +6,6 @@  headers = files('rte_cmp_arm64.h',
 	'rte_cmp_x86.h',
 	'rte_crc_arm64.h',
 	'rte_cuckoo_hash.h',
-	'rte_cuckoo_hash_x86.h',
 	'rte_fbk_hash.h',
 	'rte_hash_crc.h',
 	'rte_hash.h',
diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c
index b812f33..35631cc 100644
--- a/lib/librte_hash/rte_cuckoo_hash.c
+++ b/lib/librte_hash/rte_cuckoo_hash.c
@@ -31,9 +31,6 @@ 
 #include "rte_hash.h"
 #include "rte_cuckoo_hash.h"
 
-#if defined(RTE_ARCH_X86)
-#include "rte_cuckoo_hash_x86.h"
-#endif
 
 TAILQ_HEAD(rte_hash_list, rte_tailq_entry);
 
@@ -93,8 +90,10 @@  rte_hash_create(const struct rte_hash_parameters *params)
 	void *buckets = NULL;
 	char ring_name[RTE_RING_NAMESIZE];
 	unsigned num_key_slots;
-	unsigned hw_trans_mem_support = 0;
 	unsigned i;
+	unsigned int hw_trans_mem_support = 0, multi_writer_support = 0;
+	unsigned int readwrite_concur_support = 0;
+
 	rte_hash_function default_hash_func = (rte_hash_function)rte_jhash;
 
 	hash_list = RTE_TAILQ_CAST(rte_hash_tailq.head, rte_hash_list);
@@ -118,8 +117,16 @@  rte_hash_create(const struct rte_hash_parameters *params)
 	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_TRANS_MEM_SUPPORT)
 		hw_trans_mem_support = 1;
 
+	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_MULTI_WRITER_ADD)
+		multi_writer_support = 1;
+
+	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY) {
+		readwrite_concur_support = 1;
+		multi_writer_support = 1;
+	}
+
 	/* Store all keys and leave the first entry as a dummy entry for lookup_bulk */
-	if (hw_trans_mem_support)
+	if (multi_writer_support)
 		/*
 		 * Increase number of slots by total number of indices
 		 * that can be stored in the lcore caches
@@ -233,7 +240,7 @@  rte_hash_create(const struct rte_hash_parameters *params)
 	h->cmp_jump_table_idx = KEY_OTHER_BYTES;
 #endif
 
-	if (hw_trans_mem_support) {
+	if (multi_writer_support) {
 		h->local_free_slots = rte_zmalloc_socket(NULL,
 				sizeof(struct lcore_cache) * RTE_MAX_LCORE,
 				RTE_CACHE_LINE_SIZE, params->socket_id);
@@ -261,6 +268,8 @@  rte_hash_create(const struct rte_hash_parameters *params)
 	h->key_store = k;
 	h->free_slots = r;
 	h->hw_trans_mem_support = hw_trans_mem_support;
+	h->multi_writer_support = multi_writer_support;
+	h->readwrite_concur_support = readwrite_concur_support;
 
 #if defined(RTE_ARCH_X86)
 	if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_AVX2))
@@ -271,24 +280,17 @@  rte_hash_create(const struct rte_hash_parameters *params)
 #endif
 		h->sig_cmp_fn = RTE_HASH_COMPARE_SCALAR;
 
-	/* Turn on multi-writer only with explicit flat from user and TM
+	/* Turn on multi-writer only with explicit flag from user and TM
 	 * support.
 	 */
-	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_MULTI_WRITER_ADD) {
-		if (h->hw_trans_mem_support) {
-			h->add_key = ADD_KEY_MULTIWRITER_TM;
-		} else {
-			h->add_key = ADD_KEY_MULTIWRITER;
-			h->multiwriter_lock = rte_malloc(NULL,
-							sizeof(rte_spinlock_t),
-							RTE_CACHE_LINE_SIZE);
-			if (h->multiwriter_lock == NULL)
-				goto err_unlock;
-
-			rte_spinlock_init(h->multiwriter_lock);
-		}
-	} else
-		h->add_key = ADD_KEY_SINGLEWRITER;
+	if (h->multi_writer_support) {
+		h->readwrite_lock = rte_malloc(NULL, sizeof(rte_rwlock_t),
+						RTE_CACHE_LINE_SIZE);
+		if (h->readwrite_lock == NULL)
+			goto err_unlock;
+
+		rte_rwlock_init(h->readwrite_lock);
+	}
 
 	/* Populate free slots ring. Entry zero is reserved for key misses. */
 	for (i = 1; i < num_key_slots; i++)
@@ -338,11 +340,10 @@  rte_hash_free(struct rte_hash *h)
 
 	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
 
-	if (h->hw_trans_mem_support)
+	if (h->multi_writer_support) {
 		rte_free(h->local_free_slots);
-
-	if (h->add_key == ADD_KEY_MULTIWRITER)
-		rte_free(h->multiwriter_lock);
+		rte_free(h->readwrite_lock);
+	}
 	rte_ring_free(h->free_slots);
 	rte_free(h->key_store);
 	rte_free(h->buckets);
@@ -369,6 +370,44 @@  rte_hash_secondary_hash(const hash_sig_t primary_hash)
 	return primary_hash ^ ((tag + 1) * alt_bits_xor);
 }
 
+/* Read write locks implemented using rte_rwlock */
+static inline void
+__hash_rw_writer_lock(const struct rte_hash *h)
+{
+	if (h->multi_writer_support && h->hw_trans_mem_support)
+		rte_rwlock_write_lock_tm(h->readwrite_lock);
+	else if (h->multi_writer_support)
+		rte_rwlock_write_lock(h->readwrite_lock);
+}
+
+
+static inline void
+__hash_rw_reader_lock(const struct rte_hash *h)
+{
+	if (h->readwrite_concur_support && h->hw_trans_mem_support)
+		rte_rwlock_read_lock_tm(h->readwrite_lock);
+	else if (h->readwrite_concur_support)
+		rte_rwlock_read_lock(h->readwrite_lock);
+}
+
+static inline void
+__hash_rw_writer_unlock(const struct rte_hash *h)
+{
+	if (h->multi_writer_support && h->hw_trans_mem_support)
+		rte_rwlock_write_unlock_tm(h->readwrite_lock);
+	else if (h->multi_writer_support)
+		rte_rwlock_write_unlock(h->readwrite_lock);
+}
+
+static inline void
+__hash_rw_reader_unlock(const struct rte_hash *h)
+{
+	if (h->readwrite_concur_support && h->hw_trans_mem_support)
+		rte_rwlock_read_unlock_tm(h->readwrite_lock);
+	else if (h->readwrite_concur_support)
+		rte_rwlock_read_unlock(h->readwrite_lock);
+}
+
 void
 rte_hash_reset(struct rte_hash *h)
 {
@@ -378,6 +417,7 @@  rte_hash_reset(struct rte_hash *h)
 	if (h == NULL)
 		return;
 
+	__hash_rw_writer_lock(h);
 	memset(h->buckets, 0, h->num_buckets * sizeof(struct rte_hash_bucket));
 	memset(h->key_store, 0, h->key_entry_size * (h->entries + 1));
 
@@ -386,7 +426,7 @@  rte_hash_reset(struct rte_hash *h)
 		rte_pause();
 
 	/* Repopulate the free slots ring. Entry zero is reserved for key misses */
-	if (h->hw_trans_mem_support)
+	if (h->multi_writer_support)
 		tot_ring_cnt = h->entries + (RTE_MAX_LCORE - 1) *
 					(LCORE_CACHE_SIZE - 1);
 	else
@@ -395,77 +435,12 @@  rte_hash_reset(struct rte_hash *h)
 	for (i = 1; i < tot_ring_cnt + 1; i++)
 		rte_ring_sp_enqueue(h->free_slots, (void *)((uintptr_t) i));
 
-	if (h->hw_trans_mem_support) {
+	if (h->multi_writer_support) {
 		/* Reset local caches per lcore */
 		for (i = 0; i < RTE_MAX_LCORE; i++)
 			h->local_free_slots[i].len = 0;
 	}
-}
-
-/* Search for an entry that can be pushed to its alternative location */
-static inline int
-make_space_bucket(const struct rte_hash *h, struct rte_hash_bucket *bkt,
-		unsigned int *nr_pushes)
-{
-	unsigned i, j;
-	int ret;
-	uint32_t next_bucket_idx;
-	struct rte_hash_bucket *next_bkt[RTE_HASH_BUCKET_ENTRIES];
-
-	/*
-	 * Push existing item (search for bucket with space in
-	 * alternative locations) to its alternative location
-	 */
-	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-		/* Search for space in alternative locations */
-		next_bucket_idx = bkt->sig_alt[i] & h->bucket_bitmask;
-		next_bkt[i] = &h->buckets[next_bucket_idx];
-		for (j = 0; j < RTE_HASH_BUCKET_ENTRIES; j++) {
-			if (next_bkt[i]->key_idx[j] == EMPTY_SLOT)
-				break;
-		}
-
-		if (j != RTE_HASH_BUCKET_ENTRIES)
-			break;
-	}
-
-	/* Alternative location has spare room (end of recursive function) */
-	if (i != RTE_HASH_BUCKET_ENTRIES) {
-		next_bkt[i]->sig_alt[j] = bkt->sig_current[i];
-		next_bkt[i]->sig_current[j] = bkt->sig_alt[i];
-		next_bkt[i]->key_idx[j] = bkt->key_idx[i];
-		return i;
-	}
-
-	/* Pick entry that has not been pushed yet */
-	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++)
-		if (bkt->flag[i] == 0)
-			break;
-
-	/* All entries have been pushed, so entry cannot be added */
-	if (i == RTE_HASH_BUCKET_ENTRIES || ++(*nr_pushes) > RTE_HASH_MAX_PUSHES)
-		return -ENOSPC;
-
-	/* Set flag to indicate that this entry is going to be pushed */
-	bkt->flag[i] = 1;
-
-	/* Need room in alternative bucket to insert the pushed entry */
-	ret = make_space_bucket(h, next_bkt[i], nr_pushes);
-	/*
-	 * After recursive function.
-	 * Clear flags and insert the pushed entry
-	 * in its alternative location if successful,
-	 * or return error
-	 */
-	bkt->flag[i] = 0;
-	if (ret >= 0) {
-		next_bkt[i]->sig_alt[ret] = bkt->sig_current[i];
-		next_bkt[i]->sig_current[ret] = bkt->sig_alt[i];
-		next_bkt[i]->key_idx[ret] = bkt->key_idx[i];
-		return i;
-	} else
-		return ret;
-
+	__hash_rw_writer_unlock(h);
 }
 
 /*
@@ -478,7 +453,7 @@  enqueue_slot_back(const struct rte_hash *h,
 		struct lcore_cache *cached_free_slots,
 		void *slot_id)
 {
-	if (h->hw_trans_mem_support) {
+	if (h->multi_writer_support) {
 		cached_free_slots->objs[cached_free_slots->len] = slot_id;
 		cached_free_slots->len++;
 	} else
@@ -512,13 +487,207 @@  search_and_update(const struct rte_hash *h, void *data, const void *key,
 	return -1;
 }
 
+/* Only tries to insert at one bucket (@prim_bkt) without trying to push
+ * buckets around.
+ * return 1 if matching existing key, return 0 if succeeds, return -1 for no
+ * empty entry.
+ */
+static inline int32_t
+rte_hash_cuckoo_insert_mw(const struct rte_hash *h,
+		struct rte_hash_bucket *prim_bkt,
+		struct rte_hash_bucket *sec_bkt,
+		const struct rte_hash_key *key, void *data,
+		hash_sig_t sig, hash_sig_t alt_hash, uint32_t new_idx,
+		int32_t *ret_val)
+{
+	unsigned int i;
+	struct rte_hash_bucket *cur_bkt = prim_bkt;
+	int32_t ret;
+
+	__hash_rw_writer_lock(h);
+	/* Check if key was inserted after last check but before this
+	 * protected region in case of inserting duplicated keys.
+	 */
+	ret = search_and_update(h, data, key, cur_bkt, sig, alt_hash);
+	if (ret != -1) {
+		__hash_rw_writer_unlock(h);
+		*ret_val = ret;
+		return 1;
+	}
+	ret = search_and_update(h, data, key, sec_bkt, alt_hash, sig);
+	if (ret != -1) {
+		__hash_rw_writer_unlock(h);
+		*ret_val = ret;
+		return 1;
+	}
+
+	/* Insert new entry if there is room in the primary
+	 * bucket.
+	 */
+	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
+		/* Check if slot is available */
+		if (likely(prim_bkt->key_idx[i] == EMPTY_SLOT)) {
+			prim_bkt->sig_current[i] = sig;
+			prim_bkt->sig_alt[i] = alt_hash;
+			prim_bkt->key_idx[i] = new_idx;
+			break;
+		}
+	}
+	__hash_rw_writer_unlock(h);
+
+	if (i != RTE_HASH_BUCKET_ENTRIES)
+		return 0;
+
+	/* no empty entry */
+	return -1;
+}
+
+/* Shift buckets along provided cuckoo_path (@leaf and @leaf_slot) and fill
+ * the path head with new entry (sig, alt_hash, new_idx)
+ * return 1 if matched key found, return -1 if cuckoo path invalided and fail,
+ * return 0 if succeeds.
+ */
+static inline int
+rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
+			struct rte_hash_bucket *bkt,
+			struct rte_hash_bucket *alt_bkt,
+			const struct rte_hash_key *key, void *data,
+			struct queue_node *leaf, uint32_t leaf_slot,
+			hash_sig_t sig, hash_sig_t alt_hash, uint32_t new_idx,
+			int32_t *ret_val)
+{
+	uint32_t prev_alt_bkt_idx;
+	struct rte_hash_bucket *cur_bkt = bkt;
+	struct queue_node *prev_node, *curr_node = leaf;
+	struct rte_hash_bucket *prev_bkt, *curr_bkt = leaf->bkt;
+	uint32_t prev_slot, curr_slot = leaf_slot;
+	int32_t ret;
+
+	__hash_rw_writer_lock(h);
+
+	/* In case empty slot was gone before entering protected region */
+	if (curr_bkt->key_idx[curr_slot] != EMPTY_SLOT) {
+		__hash_rw_writer_unlock(h);
+		return -1;
+	}
+
+	/* Check if key was inserted after last check but before this
+	 * protected region.
+	 */
+	ret = search_and_update(h, data, key, cur_bkt, sig, alt_hash);
+	if (ret != -1) {
+		__hash_rw_writer_unlock(h);
+		*ret_val = ret;
+		return 1;
+	}
+
+	ret = search_and_update(h, data, key, alt_bkt, alt_hash, sig);
+	if (ret != -1) {
+		__hash_rw_writer_unlock(h);
+		*ret_val = ret;
+		return 1;
+	}
+
+	while (likely(curr_node->prev != NULL)) {
+		prev_node = curr_node->prev;
+		prev_bkt = prev_node->bkt;
+		prev_slot = curr_node->prev_slot;
+
+		prev_alt_bkt_idx =
+			prev_bkt->sig_alt[prev_slot] & h->bucket_bitmask;
+
+		if (unlikely(&h->buckets[prev_alt_bkt_idx]
+				!= curr_bkt)) {
+			/* revert it to empty, otherwise duplicated keys */
+			curr_bkt->key_idx[curr_slot] = EMPTY_SLOT;
+			__hash_rw_writer_unlock(h);
+			return -1;
+		}
+
+		/* Need to swap current/alt sig to allow later
+		 * Cuckoo insert to move elements back to its
+		 * primary bucket if available
+		 */
+		curr_bkt->sig_alt[curr_slot] =
+			 prev_bkt->sig_current[prev_slot];
+		curr_bkt->sig_current[curr_slot] =
+			prev_bkt->sig_alt[prev_slot];
+		curr_bkt->key_idx[curr_slot] =
+			prev_bkt->key_idx[prev_slot];
+
+		curr_slot = prev_slot;
+		curr_node = prev_node;
+		curr_bkt = curr_node->bkt;
+	}
+
+	curr_bkt->sig_current[curr_slot] = sig;
+	curr_bkt->sig_alt[curr_slot] = alt_hash;
+	curr_bkt->key_idx[curr_slot] = new_idx;
+
+	__hash_rw_writer_unlock(h);
+
+	return 0;
+
+}
+
+/*
+ * Make space for new key, using bfs Cuckoo Search and Multi-Writer safe
+ * Cuckoo
+ */
+static inline int
+rte_hash_cuckoo_make_space_mw(const struct rte_hash *h,
+			struct rte_hash_bucket *bkt,
+			struct rte_hash_bucket *sec_bkt,
+			const struct rte_hash_key *key, void *data,
+			hash_sig_t sig, hash_sig_t alt_hash,
+			uint32_t new_idx, int32_t *ret_val)
+{
+	unsigned int i;
+	struct queue_node queue[RTE_HASH_BFS_QUEUE_MAX_LEN];
+	struct queue_node *tail, *head;
+	struct rte_hash_bucket *curr_bkt, *alt_bkt;
+
+	tail = queue;
+	head = queue + 1;
+	tail->bkt = bkt;
+	tail->prev = NULL;
+	tail->prev_slot = -1;
+
+	/* Cuckoo bfs Search */
+	while (likely(tail != head && head <
+					queue + RTE_HASH_BFS_QUEUE_MAX_LEN -
+					RTE_HASH_BUCKET_ENTRIES)) {
+		curr_bkt = tail->bkt;
+		for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
+			if (curr_bkt->key_idx[i] == EMPTY_SLOT) {
+				int32_t ret = rte_hash_cuckoo_move_insert_mw(h,
+						bkt, sec_bkt, key, data,
+						tail, i, sig, alt_hash,
+						new_idx, ret_val);
+				if (likely(ret != -1))
+					return ret;
+			}
+
+			/* Enqueue new node and keep prev node info */
+			alt_bkt = &(h->buckets[curr_bkt->sig_alt[i]
+						    & h->bucket_bitmask]);
+			head->bkt = alt_bkt;
+			head->prev = tail;
+			head->prev_slot = i;
+			head++;
+		}
+		tail++;
+	}
+
+	return -ENOSPC;
+}
+
 static inline int32_t
 __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 						hash_sig_t sig, void *data)
 {
 	hash_sig_t alt_hash;
 	uint32_t prim_bucket_idx, sec_bucket_idx;
-	unsigned i;
 	struct rte_hash_bucket *prim_bkt, *sec_bkt;
 	struct rte_hash_key *new_k, *keys = h->key_store;
 	void *slot_id = NULL;
@@ -527,10 +696,7 @@  __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 	unsigned n_slots;
 	unsigned lcore_id;
 	struct lcore_cache *cached_free_slots = NULL;
-	unsigned int nr_pushes = 0;
-
-	if (h->add_key == ADD_KEY_MULTIWRITER)
-		rte_spinlock_lock(h->multiwriter_lock);
+	int32_t ret_val;
 
 	prim_bucket_idx = sig & h->bucket_bitmask;
 	prim_bkt = &h->buckets[prim_bucket_idx];
@@ -541,8 +707,24 @@  __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 	sec_bkt = &h->buckets[sec_bucket_idx];
 	rte_prefetch0(sec_bkt);
 
-	/* Get a new slot for storing the new key */
-	if (h->hw_trans_mem_support) {
+	/* Check if key is already inserted in primary location */
+	__hash_rw_writer_lock(h);
+	ret = search_and_update(h, data, key, prim_bkt, sig, alt_hash);
+	if (ret != -1) {
+		__hash_rw_writer_unlock(h);
+		return ret;
+	}
+
+	/* Check if key is already inserted in secondary location */
+	ret = search_and_update(h, data, key, sec_bkt, alt_hash, sig);
+	if (ret != -1) {
+		__hash_rw_writer_unlock(h);
+		return ret;
+	}
+	__hash_rw_writer_unlock(h);
+
+	/* Did not find a match, so get a new slot for storing the new key */
+	if (h->multi_writer_support) {
 		lcore_id = rte_lcore_id();
 		cached_free_slots = &h->local_free_slots[lcore_id];
 		/* Try to get a free slot from the local cache */
@@ -552,8 +734,7 @@  __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 					cached_free_slots->objs,
 					LCORE_CACHE_SIZE, NULL);
 			if (n_slots == 0) {
-				ret = -ENOSPC;
-				goto failure;
+				return -ENOSPC;
 			}
 
 			cached_free_slots->len += n_slots;
@@ -564,92 +745,50 @@  __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 		slot_id = cached_free_slots->objs[cached_free_slots->len];
 	} else {
 		if (rte_ring_sc_dequeue(h->free_slots, &slot_id) != 0) {
-			ret = -ENOSPC;
-			goto failure;
+			return -ENOSPC;
 		}
 	}
 
 	new_k = RTE_PTR_ADD(keys, (uintptr_t)slot_id * h->key_entry_size);
-	rte_prefetch0(new_k);
 	new_idx = (uint32_t)((uintptr_t) slot_id);
-
-	/* Check if key is already inserted in primary location */
-	ret = search_and_update(h, data, key, prim_bkt, sig, alt_hash);
-	if (ret != -1)
-		goto failure;
-
-	/* Check if key is already inserted in secondary location */
-	ret = search_and_update(h, data, key, sec_bkt, alt_hash, sig);
-	if (ret != -1)
-		goto failure;
-
 	/* Copy key */
 	rte_memcpy(new_k->key, key, h->key_len);
 	new_k->pdata = data;
 
-#if defined(RTE_ARCH_X86) /* currently only x86 support HTM */
-	if (h->add_key == ADD_KEY_MULTIWRITER_TM) {
-		ret = rte_hash_cuckoo_insert_mw_tm(prim_bkt,
-				sig, alt_hash, new_idx);
-		if (ret >= 0)
-			return new_idx - 1;
 
-		/* Primary bucket full, need to make space for new entry */
-		ret = rte_hash_cuckoo_make_space_mw_tm(h, prim_bkt, sig,
-							alt_hash, new_idx);
+	/* Find an empty slot and insert */
+	ret = rte_hash_cuckoo_insert_mw(h, prim_bkt, sec_bkt, key, data,
+					sig, alt_hash, new_idx, &ret_val);
+	if (ret == 0)
+		return new_idx - 1;
+	else if (ret == 1) {
+		enqueue_slot_back(h, cached_free_slots, slot_id);
+		return ret_val;
+	}
 
-		if (ret >= 0)
-			return new_idx - 1;
+	/* Primary bucket full, need to make space for new entry */
+	ret = rte_hash_cuckoo_make_space_mw(h, prim_bkt, sec_bkt, key, data,
+					sig, alt_hash, new_idx, &ret_val);
+	if (ret == 0)
+		return new_idx - 1;
+	else if (ret == 1) {
+		enqueue_slot_back(h, cached_free_slots, slot_id);
+		return ret_val;
+	}
 
-		/* Also search secondary bucket to get better occupancy */
-		ret = rte_hash_cuckoo_make_space_mw_tm(h, sec_bkt, sig,
-							alt_hash, new_idx);
+	/* Also search secondary bucket to get better occupancy */
+	ret = rte_hash_cuckoo_make_space_mw(h, sec_bkt, prim_bkt, key, data,
+					alt_hash, sig, new_idx, &ret_val);
 
-		if (ret >= 0)
-			return new_idx - 1;
+	if (ret == 0)
+		return new_idx - 1;
+	else if (ret == 1) {
+		enqueue_slot_back(h, cached_free_slots, slot_id);
+		return ret_val;
 	} else {
-#endif
-		for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-			/* Check if slot is available */
-			if (likely(prim_bkt->key_idx[i] == EMPTY_SLOT)) {
-				prim_bkt->sig_current[i] = sig;
-				prim_bkt->sig_alt[i] = alt_hash;
-				prim_bkt->key_idx[i] = new_idx;
-				break;
-			}
-		}
-
-		if (i != RTE_HASH_BUCKET_ENTRIES) {
-			if (h->add_key == ADD_KEY_MULTIWRITER)
-				rte_spinlock_unlock(h->multiwriter_lock);
-			return new_idx - 1;
-		}
-
-		/* Primary bucket full, need to make space for new entry
-		 * After recursive function.
-		 * Insert the new entry in the position of the pushed entry
-		 * if successful or return error and
-		 * store the new slot back in the ring
-		 */
-		ret = make_space_bucket(h, prim_bkt, &nr_pushes);
-		if (ret >= 0) {
-			prim_bkt->sig_current[ret] = sig;
-			prim_bkt->sig_alt[ret] = alt_hash;
-			prim_bkt->key_idx[ret] = new_idx;
-			if (h->add_key == ADD_KEY_MULTIWRITER)
-				rte_spinlock_unlock(h->multiwriter_lock);
-			return new_idx - 1;
-		}
-#if defined(RTE_ARCH_X86)
+		enqueue_slot_back(h, cached_free_slots, slot_id);
+		return ret;
 	}
-#endif
-	/* Error in addition, store new slot back in the ring and return error */
-	enqueue_slot_back(h, cached_free_slots, (void *)((uintptr_t) new_idx));
-
-failure:
-	if (h->add_key == ADD_KEY_MULTIWRITER)
-		rte_spinlock_unlock(h->multiwriter_lock);
-	return ret;
 }
 
 int32_t
@@ -734,12 +873,14 @@  __rte_hash_lookup_with_hash(const struct rte_hash *h, const void *key,
 	bucket_idx = sig & h->bucket_bitmask;
 	bkt = &h->buckets[bucket_idx];
 
+	__hash_rw_reader_lock(h);
 
 	/* Check if key is in primary location */
 	ret = search_one_bucket(h, key, sig, data, bkt);
-	if (ret != -1)
+	if (ret != -1) {
+		__hash_rw_reader_unlock(h);
 		return ret;
-
+	}
 	/* Calculate secondary hash */
 	alt_hash = rte_hash_secondary_hash(sig);
 	bucket_idx = alt_hash & h->bucket_bitmask;
@@ -747,9 +888,11 @@  __rte_hash_lookup_with_hash(const struct rte_hash *h, const void *key,
 
 	/* Check if key is in secondary location */
 	ret = search_one_bucket(h, key, alt_hash, data, bkt);
-	if (ret != -1)
+	if (ret != -1) {
+		__hash_rw_reader_unlock(h);
 		return ret;
-
+	}
+	__hash_rw_reader_unlock(h);
 	return -ENOENT;
 }
 
@@ -791,7 +934,7 @@  remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
 
 	bkt->sig_current[i] = NULL_SIGNATURE;
 	bkt->sig_alt[i] = NULL_SIGNATURE;
-	if (h->hw_trans_mem_support) {
+	if (h->multi_writer_support) {
 		lcore_id = rte_lcore_id();
 		cached_free_slots = &h->local_free_slots[lcore_id];
 		/* Cache full, need to free it. */
@@ -855,10 +998,13 @@  __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 	bucket_idx = sig & h->bucket_bitmask;
 	bkt = &h->buckets[bucket_idx];
 
+	__hash_rw_writer_lock(h);
 	/* look for key in primary bucket */
 	ret = search_and_remove(h, key, bkt, sig);
-	if (ret != -1)
+	if (ret != -1) {
+		__hash_rw_writer_unlock(h);
 		return ret;
+	}
 
 	/* Calculate secondary hash */
 	alt_hash = rte_hash_secondary_hash(sig);
@@ -867,9 +1013,12 @@  __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 
 	/* look for key in secondary bucket */
 	ret = search_and_remove(h, key, bkt, alt_hash);
-	if (ret != -1)
+	if (ret != -1) {
+		__hash_rw_writer_unlock(h);
 		return ret;
+	}
 
+	__hash_rw_writer_unlock(h);
 	return -ENOENT;
 }
 
@@ -1011,6 +1160,7 @@  __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
 		rte_prefetch0(secondary_bkt[i]);
 	}
 
+	__hash_rw_reader_lock(h);
 	/* Compare signatures and prefetch key slot of first hit */
 	for (i = 0; i < num_keys; i++) {
 		compare_signatures(&prim_hitmask[i], &sec_hitmask[i],
@@ -1093,6 +1243,8 @@  __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
 		continue;
 	}
 
+	__hash_rw_reader_unlock(h);
+
 	if (hit_mask != NULL)
 		*hit_mask = hits;
 }
@@ -1151,7 +1303,7 @@  rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32
 		bucket_idx = *next / RTE_HASH_BUCKET_ENTRIES;
 		idx = *next % RTE_HASH_BUCKET_ENTRIES;
 	}
-
+	__hash_rw_reader_lock(h);
 	/* Get position of entry in key table */
 	position = h->buckets[bucket_idx].key_idx[idx];
 	next_key = (struct rte_hash_key *) ((char *)h->key_store +
@@ -1160,6 +1312,8 @@  rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32
 	*key = next_key->key;
 	*data = next_key->pdata;
 
+	__hash_rw_reader_unlock(h);
+
 	/* Increment iterator */
 	(*next)++;
 
diff --git a/lib/librte_hash/rte_cuckoo_hash.h b/lib/librte_hash/rte_cuckoo_hash.h
index 7a54e55..db4d1a0 100644
--- a/lib/librte_hash/rte_cuckoo_hash.h
+++ b/lib/librte_hash/rte_cuckoo_hash.h
@@ -88,11 +88,6 @@  const rte_hash_cmp_eq_t cmp_jump_table[NUM_KEY_CMP_CASES] = {
 
 #endif
 
-enum add_key_case {
-	ADD_KEY_SINGLEWRITER = 0,
-	ADD_KEY_MULTIWRITER,
-	ADD_KEY_MULTIWRITER_TM,
-};
 
 /** Number of items per bucket. */
 #define RTE_HASH_BUCKET_ENTRIES		8
@@ -155,18 +150,20 @@  struct rte_hash {
 
 	struct rte_ring *free_slots;
 	/**< Ring that stores all indexes of the free slots in the key table */
-	uint8_t hw_trans_mem_support;
-	/**< Hardware transactional memory support */
+
 	struct lcore_cache *local_free_slots;
 	/**< Local cache per lcore, storing some indexes of the free slots */
-	enum add_key_case add_key; /**< Multi-writer hash add behavior */
-
-	rte_spinlock_t *multiwriter_lock; /**< Multi-writer spinlock for w/o TM */
 
 	/* Fields used in lookup */
 
 	uint32_t key_len __rte_cache_aligned;
 	/**< Length of hash key. */
+	uint8_t hw_trans_mem_support;
+	/**< If hardware transactional memory is used. */
+	uint8_t multi_writer_support;
+	/**< If multi-writer support is enabled. */
+	uint8_t readwrite_concur_support;
+	/**< If read-write concurrency support is enabled */
 	rte_hash_function hash_func;    /**< Function used to calculate hash. */
 	uint32_t hash_func_init_val;    /**< Init value used by hash_func. */
 	rte_hash_cmp_eq_t rte_hash_custom_cmp_eq;
@@ -184,6 +181,7 @@  struct rte_hash {
 	/**< Table with buckets storing all the	hash values and key indexes
 	 * to the key table.
 	 */
+	rte_rwlock_t *readwrite_lock; /**< Read-write lock thread-safety. */
 } __rte_cache_aligned;
 
 struct queue_node {
diff --git a/lib/librte_hash/rte_cuckoo_hash_x86.h b/lib/librte_hash/rte_cuckoo_hash_x86.h
deleted file mode 100644
index 981d7bd..0000000
--- a/lib/librte_hash/rte_cuckoo_hash_x86.h
+++ /dev/null
@@ -1,167 +0,0 @@ 
-/* SPDX-License-Identifier: BSD-3-Clause
- * Copyright(c) 2016 Intel Corporation
- */
-
-/* rte_cuckoo_hash_x86.h
- * This file holds all x86 specific Cuckoo Hash functions
- */
-
-/* Only tries to insert at one bucket (@prim_bkt) without trying to push
- * buckets around
- */
-static inline unsigned
-rte_hash_cuckoo_insert_mw_tm(struct rte_hash_bucket *prim_bkt,
-		hash_sig_t sig, hash_sig_t alt_hash, uint32_t new_idx)
-{
-	unsigned i, status;
-	unsigned try = 0;
-
-	while (try < RTE_HASH_TSX_MAX_RETRY) {
-		status = rte_xbegin();
-		if (likely(status == RTE_XBEGIN_STARTED)) {
-			/* Insert new entry if there is room in the primary
-			* bucket.
-			*/
-			for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-				/* Check if slot is available */
-				if (likely(prim_bkt->key_idx[i] == EMPTY_SLOT)) {
-					prim_bkt->sig_current[i] = sig;
-					prim_bkt->sig_alt[i] = alt_hash;
-					prim_bkt->key_idx[i] = new_idx;
-					break;
-				}
-			}
-			rte_xend();
-
-			if (i != RTE_HASH_BUCKET_ENTRIES)
-				return 0;
-
-			break; /* break off try loop if transaction commits */
-		} else {
-			/* If we abort we give up this cuckoo path. */
-			try++;
-			rte_pause();
-		}
-	}
-
-	return -1;
-}
-
-/* Shift buckets along provided cuckoo_path (@leaf and @leaf_slot) and fill
- * the path head with new entry (sig, alt_hash, new_idx)
- */
-static inline int
-rte_hash_cuckoo_move_insert_mw_tm(const struct rte_hash *h,
-			struct queue_node *leaf, uint32_t leaf_slot,
-			hash_sig_t sig, hash_sig_t alt_hash, uint32_t new_idx)
-{
-	unsigned try = 0;
-	unsigned status;
-	uint32_t prev_alt_bkt_idx;
-
-	struct queue_node *prev_node, *curr_node = leaf;
-	struct rte_hash_bucket *prev_bkt, *curr_bkt = leaf->bkt;
-	uint32_t prev_slot, curr_slot = leaf_slot;
-
-	while (try < RTE_HASH_TSX_MAX_RETRY) {
-		status = rte_xbegin();
-		if (likely(status == RTE_XBEGIN_STARTED)) {
-			/* In case empty slot was gone before entering TSX */
-			if (curr_bkt->key_idx[curr_slot] != EMPTY_SLOT)
-				rte_xabort(RTE_XABORT_CUCKOO_PATH_INVALIDED);
-			while (likely(curr_node->prev != NULL)) {
-				prev_node = curr_node->prev;
-				prev_bkt = prev_node->bkt;
-				prev_slot = curr_node->prev_slot;
-
-				prev_alt_bkt_idx
-					= prev_bkt->sig_alt[prev_slot]
-					    & h->bucket_bitmask;
-
-				if (unlikely(&h->buckets[prev_alt_bkt_idx]
-					     != curr_bkt)) {
-					rte_xabort(RTE_XABORT_CUCKOO_PATH_INVALIDED);
-				}
-
-				/* Need to swap current/alt sig to allow later
-				 * Cuckoo insert to move elements back to its
-				 * primary bucket if available
-				 */
-				curr_bkt->sig_alt[curr_slot] =
-				    prev_bkt->sig_current[prev_slot];
-				curr_bkt->sig_current[curr_slot] =
-				    prev_bkt->sig_alt[prev_slot];
-				curr_bkt->key_idx[curr_slot]
-				    = prev_bkt->key_idx[prev_slot];
-
-				curr_slot = prev_slot;
-				curr_node = prev_node;
-				curr_bkt = curr_node->bkt;
-			}
-
-			curr_bkt->sig_current[curr_slot] = sig;
-			curr_bkt->sig_alt[curr_slot] = alt_hash;
-			curr_bkt->key_idx[curr_slot] = new_idx;
-
-			rte_xend();
-
-			return 0;
-		}
-
-		/* If we abort we give up this cuckoo path, since most likely it's
-		 * no longer valid as TSX detected data conflict
-		 */
-		try++;
-		rte_pause();
-	}
-
-	return -1;
-}
-
-/*
- * Make space for new key, using bfs Cuckoo Search and Multi-Writer safe
- * Cuckoo
- */
-static inline int
-rte_hash_cuckoo_make_space_mw_tm(const struct rte_hash *h,
-			struct rte_hash_bucket *bkt,
-			hash_sig_t sig, hash_sig_t alt_hash,
-			uint32_t new_idx)
-{
-	unsigned i;
-	struct queue_node queue[RTE_HASH_BFS_QUEUE_MAX_LEN];
-	struct queue_node *tail, *head;
-	struct rte_hash_bucket *curr_bkt, *alt_bkt;
-
-	tail = queue;
-	head = queue + 1;
-	tail->bkt = bkt;
-	tail->prev = NULL;
-	tail->prev_slot = -1;
-
-	/* Cuckoo bfs Search */
-	while (likely(tail != head && head <
-					queue + RTE_HASH_BFS_QUEUE_MAX_LEN -
-					RTE_HASH_BUCKET_ENTRIES)) {
-		curr_bkt = tail->bkt;
-		for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-			if (curr_bkt->key_idx[i] == EMPTY_SLOT) {
-				if (likely(rte_hash_cuckoo_move_insert_mw_tm(h,
-						tail, i, sig,
-						alt_hash, new_idx) == 0))
-					return 0;
-			}
-
-			/* Enqueue new node and keep prev node info */
-			alt_bkt = &(h->buckets[curr_bkt->sig_alt[i]
-						    & h->bucket_bitmask]);
-			head->bkt = alt_bkt;
-			head->prev = tail;
-			head->prev_slot = i;
-			head++;
-		}
-		tail++;
-	}
-
-	return -ENOSPC;
-}
diff --git a/lib/librte_hash/rte_hash.h b/lib/librte_hash/rte_hash.h
index f71ca9f..ecb49e4 100644
--- a/lib/librte_hash/rte_hash.h
+++ b/lib/librte_hash/rte_hash.h
@@ -34,6 +34,9 @@  extern "C" {
 /** Default behavior of insertion, single writer/multi writer */
 #define RTE_HASH_EXTRA_FLAGS_MULTI_WRITER_ADD 0x02
 
+/** Flag to support reader writer concurrency */
+#define RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY 0x04
+
 /** Signature of key that is stored internally. */
 typedef uint32_t hash_sig_t;