[RFC,2/3] lib/hash: integrate RCU QSBR

Message ID 20190901065810.15137-3-dharmik.thakkar@arm.com (mailing list archive)
State Superseded, archived
Delegated to: David Marchand
Headers
Series RCU integration with hash library |

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/Intel-compilation success Compilation OK

Commit Message

Dharmik Thakkar Sept. 1, 2019, 6:58 a.m. UTC
  Integrate RCU QSBR process.
(Refer to RCU documentation to understand various aspects of
integrating RCU library into other libraries.)

Suggested-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Signed-off-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
---
 lib/librte_hash/Makefile             |   2 +-
 lib/librte_hash/meson.build          |   2 +
 lib/librte_hash/rte_cuckoo_hash.c    | 352 +++++++++++++++++++++++++--
 lib/librte_hash/rte_cuckoo_hash.h    |   3 +
 lib/librte_hash/rte_hash.h           |  38 ++-
 lib/librte_hash/rte_hash_version.map |   2 +-
 6 files changed, 363 insertions(+), 36 deletions(-)
  

Patch

diff --git a/lib/librte_hash/Makefile b/lib/librte_hash/Makefile
index 5669d83f454f..3edc9c96beaa 100644
--- a/lib/librte_hash/Makefile
+++ b/lib/librte_hash/Makefile
@@ -8,7 +8,7 @@  LIB = librte_hash.a
 
 CFLAGS += -O3 -DALLOW_EXPERIMENTAL_API
 CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
-LDLIBS += -lrte_eal -lrte_ring
+LDLIBS += -lrte_eal -lrte_ring -lrte_rcu
 
 EXPORT_MAP := rte_hash_version.map
 
diff --git a/lib/librte_hash/meson.build b/lib/librte_hash/meson.build
index ebf70de89014..468668ea2488 100644
--- a/lib/librte_hash/meson.build
+++ b/lib/librte_hash/meson.build
@@ -2,6 +2,7 @@ 
 # Copyright(c) 2017 Intel Corporation
 
 version = 2
+allow_experimental_apis = true
 headers = files('rte_cmp_arm64.h',
 	'rte_cmp_x86.h',
 	'rte_crc_arm64.h',
@@ -14,6 +15,7 @@  headers = files('rte_cmp_arm64.h',
 
 sources = files('rte_cuckoo_hash.c', 'rte_fbk_hash.c')
 deps += ['ring']
+deps += ['rcu']
 
 # rte ring reset is not yet part of stable API
 allow_experimental_apis = true
diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c
index 87a4c01f2f9e..851c73328c8f 100644
--- a/lib/librte_hash/rte_cuckoo_hash.c
+++ b/lib/librte_hash/rte_cuckoo_hash.c
@@ -28,6 +28,7 @@ 
 #include <rte_compat.h>
 #include <rte_vect.h>
 #include <rte_tailq.h>
+#include <rte_rcu_qsbr.h>
 
 #include "rte_hash.h"
 #include "rte_cuckoo_hash.h"
@@ -44,6 +45,11 @@  static struct rte_tailq_elem rte_hash_tailq = {
 };
 EAL_REGISTER_TAILQ(rte_hash_tailq)
 
+struct __rte_hash_qs_item {
+	uint64_t token;	/**< QSBR token.*/
+	uint64_t index;	/**< key_idx and ext_bkt_idx pair*/
+};
+
 struct rte_hash *
 rte_hash_find_existing(const char *name)
 {
@@ -121,6 +127,68 @@  get_alt_bucket_index(const struct rte_hash *h,
 	return (cur_bkt_idx ^ sig) & h->bucket_bitmask;
 }
 
+/* Add an item into FIFO.
+ * return: 0 - success
+ */
+static int
+__rte_hash_rcu_qsbr_fifo_push(struct rte_ring *fifo,
+	struct __rte_hash_qs_item *item)
+{
+	if (rte_ring_sp_enqueue(fifo, (void *)(uintptr_t)item->token) != 0) {
+		rte_errno = ENOSPC;
+		return 1;
+	}
+	if (rte_ring_sp_enqueue(fifo, (void *)(uintptr_t)item->index) != 0) {
+		void *obj;
+		/* token needs to be dequeued when index enqueue fails */
+		rte_ring_sc_dequeue(fifo, &obj);
+		rte_errno = ENOSPC;
+		return 1;
+	}
+
+	return 0;
+}
+
+/* Remove item from FIFO.
+ * Used when data observed by rte_ring_peek.
+ */
+static void
+__rte_hash_rcu_qsbr_fifo_pop(struct rte_ring *fifo,
+	struct __rte_hash_qs_item *item)
+{
+	void *obj_token = NULL;
+	void *obj_index = NULL;
+
+	(void)rte_ring_sc_dequeue(fifo, &obj_token);
+	(void)rte_ring_sc_dequeue(fifo, &obj_index);
+
+	if (item) {
+		item->token = (uint64_t)((uintptr_t)obj_token);
+		item->index = (uint32_t)((uintptr_t)obj_index);
+	}
+}
+
+/* RCU clean up
+ * This only cleans up data associated with keys (pdata)
+ * Assumption: No reader is using the hash table
+ */
+static void
+__rte_hash_rcu_qsbr_clean_up(const struct rte_hash *h)
+{
+		struct __rte_hash_qs_item qs_item;
+		uint32_t key_to_free;
+		void *hash_data = NULL;
+		struct rte_hash_key *k = NULL;
+		while (!rte_ring_empty(h->qs_fifo)) {
+			__rte_hash_rcu_qsbr_fifo_pop(h->qs_fifo, &qs_item);
+			key_to_free = (uint32_t)(qs_item.index);
+			k = (struct rte_hash_key *) ((char *)h->key_store +
+				key_to_free * h->key_entry_size);
+			hash_data = k->pdata;
+			h->free_key_data_func(hash_data);
+		}
+}
+
 struct rte_hash *
 rte_hash_create(const struct rte_hash_parameters *params)
 {
@@ -193,11 +261,8 @@  rte_hash_create(const struct rte_hash_parameters *params)
 	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL)
 		no_free_on_del = 1;
 
-	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF) {
+	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF)
 		readwrite_concur_lf_support = 1;
-		/* Enable not freeing internal memory/index on delete */
-		no_free_on_del = 1;
-	}
 
 	/* Store all keys and leave the first entry as a dummy entry for lookup_bulk */
 	if (use_local_cache)
@@ -484,6 +549,13 @@  rte_hash_free(struct rte_hash *h)
 
 	rte_mcfg_tailq_write_unlock();
 
+	/* RCU clean up
+	 * This only cleans up data associated with keys (pdata)
+	 * Assumption: No reader is using the hash table
+	 */
+	if (h->qsv && h->free_key_data_func)
+		__rte_hash_rcu_qsbr_clean_up(h);
+
 	if (h->use_local_cache)
 		rte_free(h->local_free_slots);
 	if (h->writer_takes_lock)
@@ -495,6 +567,7 @@  rte_hash_free(struct rte_hash *h)
 	rte_free(h->buckets_ext);
 	rte_free(h->tbl_chng_cnt);
 	rte_free(h->ext_bkt_to_free);
+	rte_ring_free(h->qs_fifo);
 	rte_free(h);
 	rte_free(te);
 }
@@ -567,6 +640,106 @@  __hash_rw_reader_unlock(const struct rte_hash *h)
 		rte_rwlock_read_unlock(h->readwrite_lock);
 }
 
+/* Max number of indexes to reclaim at one time. */
+#define RCU_QSBR_RECLAIM_SIZE	8
+
+/* When RCU QSBR FIFO usage is above 1/(2^RCU_QSBR_RECLAIM_LEVEL),
+ * reclaim will be triggered.
+ */
+#define RCU_QSBR_RECLAIM_LEVEL	3
+
+/* Reclaim some key/ext-bkt indexes based on quiescent state check.
+ * RCU_QSBR_RECLAIM_SIZE groups will be reclaimed at max.
+ * return: 0 - success, -EINVAL - no index reclaimed.
+ */
+static int
+__rte_hash_rcu_qsbr_reclaim_chunk(const struct rte_hash *h, uint32_t *index)
+{
+	struct __rte_hash_qs_item qs_item;
+	void *obj_token;
+	uint32_t cnt = 0;
+	uint32_t key_to_free;
+	uint32_t ext_bkt_to_free;
+	void *hash_data = NULL;
+	struct rte_hash_key *keys, *k;
+	if (h->qsv == NULL)
+		return -ENOSPC;
+
+	unsigned int lcore_id, n_slots;
+	struct lcore_cache *cached_free_slots;
+	keys = h->key_store;
+
+	/* Check reader threads quiescent state and
+	 * reclaim as much as possible.
+	 */
+	while ((cnt < RCU_QSBR_RECLAIM_SIZE) &&
+		(rte_ring_peek(h->qs_fifo, &obj_token) == 0) &&
+		(rte_rcu_qsbr_check(h->qsv, (uint64_t)((uintptr_t)obj_token),
+					false) == 1)) {
+		__rte_hash_rcu_qsbr_fifo_pop(h->qs_fifo, &qs_item);
+
+		key_to_free = (uint32_t)(qs_item.index);
+		ext_bkt_to_free = (uint32_t)(qs_item.index >> 32);
+		k = (struct rte_hash_key *) ((char *)keys +
+					key_to_free * h->key_entry_size);
+		hash_data = k->pdata;
+		h->free_key_data_func(hash_data);
+		if (h->ext_table_support) {
+			if (ext_bkt_to_free)
+				/* Recycle empty ext bkt to free list. */
+				rte_ring_sp_enqueue(h->free_ext_bkts,
+					(void *)(uintptr_t)ext_bkt_to_free);
+		}
+		/* Store key_idx for the new key in index*/
+		if (index != NULL  && *index != EMPTY_SLOT)
+			*index = key_to_free;
+
+		/* Return rest of the key indexes to free slot ring */
+		else if (h->use_local_cache) {
+			lcore_id = rte_lcore_id();
+			cached_free_slots = &h->local_free_slots[lcore_id];
+			/* Cache full, need to free it. */
+			if (cached_free_slots->len == LCORE_CACHE_SIZE) {
+				/* Need to enqueue the free slots in global ring. */
+				n_slots = rte_ring_mp_enqueue_burst(h->free_slots,
+							cached_free_slots->objs,
+							LCORE_CACHE_SIZE, NULL);
+				RETURN_IF_TRUE((n_slots == 0), -EFAULT);
+				cached_free_slots->len -= n_slots;
+			}
+			/* Put index of new free slot in cache. */
+			cached_free_slots->objs[cached_free_slots->len] =
+						(void *)((uintptr_t)key_to_free);
+			cached_free_slots->len++;
+		} else {
+			rte_ring_sp_enqueue(h->free_slots,
+					(void *)((uintptr_t)key_to_free));
+		}
+
+		cnt++;
+	}
+
+	if (cnt)
+		return 0;
+	return -ENOSPC;
+}
+
+/* Trigger reclaim when necessary.
+ * Reclaim happens when RCU QSBR queue usage is over 12.5%.
+ */
+static void
+__rte_hash_rcu_qsbr_try_reclaim(const struct rte_hash *h)
+{
+	if (h->qsv == NULL)
+		return;
+
+	if (rte_ring_count(h->qs_fifo) <
+		(rte_ring_get_capacity(h->qs_fifo) >> RCU_QSBR_RECLAIM_LEVEL))
+		return;
+
+	(void)__rte_hash_rcu_qsbr_reclaim_chunk(h, NULL);
+}
+
 void
 rte_hash_reset(struct rte_hash *h)
 {
@@ -576,6 +749,14 @@  rte_hash_reset(struct rte_hash *h)
 		return;
 
 	__hash_rw_writer_lock(h);
+
+	/* RCU clean up
+	 * This only cleans up data associated with keys (pdata)
+	 * Assumption: No reader is using the hash table
+	 */
+	if (h->qsv && h->free_key_data_func)
+		__rte_hash_rcu_qsbr_clean_up(h);
+
 	memset(h->buckets, 0, h->num_buckets * sizeof(struct rte_hash_bucket));
 	memset(h->key_store, 0, h->key_entry_size * (h->entries + 1));
 	*h->tbl_chng_cnt = 0;
@@ -612,6 +793,10 @@  rte_hash_reset(struct rte_hash *h)
 		for (i = 0; i < RTE_MAX_LCORE; i++)
 			h->local_free_slots[i].len = 0;
 	}
+
+	if (h->qs_fifo)
+		rte_ring_reset(h->qs_fifo);
+
 	__hash_rw_writer_unlock(h);
 }
 
@@ -915,6 +1100,19 @@  rte_hash_cuckoo_make_space_mw(const struct rte_hash *h,
 	return -ENOSPC;
 }
 
+static inline int32_t
+get_free_slot(const struct rte_hash *h)
+{
+	uint32_t key_to_free;
+	__hash_rw_writer_lock(h);
+	if (__rte_hash_rcu_qsbr_reclaim_chunk(h, &key_to_free) < 0) {
+		__hash_rw_writer_unlock(h);
+		return -ENOSPC;
+	}
+	__hash_rw_writer_unlock(h);
+	return key_to_free;
+}
+
 static inline int32_t
 __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 						hash_sig_t sig, void *data)
@@ -972,7 +1170,7 @@  __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 					cached_free_slots->objs,
 					LCORE_CACHE_SIZE, NULL);
 			if (n_slots == 0) {
-				return -ENOSPC;
+				goto rcu_check;
 			}
 
 			cached_free_slots->len += n_slots;
@@ -982,11 +1180,19 @@  __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 		cached_free_slots->len--;
 		slot_id = cached_free_slots->objs[cached_free_slots->len];
 	} else {
-		if (rte_ring_sc_dequeue(h->free_slots, &slot_id) != 0) {
-			return -ENOSPC;
-		}
+		if (rte_ring_sc_dequeue(h->free_slots, &slot_id) != 0)
+			goto rcu_check;
 	}
+	goto new_slot_found;
 
+rcu_check:
+	ret = get_free_slot(h);
+	if (ret < 0)
+		return -ENOSPC;
+	else
+		slot_id = (void *)((uintptr_t)ret);
+
+new_slot_found:
 	new_k = RTE_PTR_ADD(keys, (uintptr_t)slot_id * h->key_entry_size);
 	new_idx = (uint32_t)((uintptr_t) slot_id);
 	/* The store to application data (by the application) at *data should
@@ -1454,10 +1660,11 @@  search_and_remove(const struct rte_hash *h, const void *key,
 					key_idx * h->key_entry_size);
 			if (rte_hash_cmp_eq(key, k->key, h) == 0) {
 				bkt->sig_current[i] = NULL_SIGNATURE;
-				/* Free the key store index if
-				 * no_free_on_del is disabled.
+				/* Free the key store index if no_free_on_del
+				 * and readwrite_concur_lf_support is disabled.
 				 */
-				if (!h->no_free_on_del)
+				if (!h->readwrite_concur_lf_support &&
+					!h->no_free_on_del)
 					remove_entry(h, bkt, i);
 
 				__atomic_store_n(&bkt->key_idx[i],
@@ -1486,6 +1693,9 @@  __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 	int pos;
 	int32_t ret, i;
 	uint16_t short_sig;
+	uint32_t ext_bkt_to_free = 0;
+	uint32_t key_to_free = 0;
+	struct __rte_hash_qs_item qs_item;
 
 	short_sig = get_short_sig(sig);
 	prim_bucket_idx = get_prim_bucket_index(h, sig);
@@ -1520,10 +1730,9 @@  __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 
 /* Search last bucket to see if empty to be recycled */
 return_bkt:
-	if (!last_bkt) {
-		__hash_rw_writer_unlock(h);
-		return ret;
-	}
+	if (!last_bkt)
+		goto return_key;
+
 	while (last_bkt->next) {
 		prev_bkt = last_bkt;
 		last_bkt = last_bkt->next;
@@ -1538,9 +1747,9 @@  __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 		prev_bkt->next = NULL;
 		uint32_t index = last_bkt - h->buckets_ext + 1;
 		/* Recycle the empty bkt if
-		 * no_free_on_del is disabled.
+		 * readwrite_concur_lf_support is disabled.
 		 */
-		if (h->no_free_on_del)
+		if (h->readwrite_concur_lf_support) {
 			/* Store index of an empty ext bkt to be recycled
 			 * on calling rte_hash_del_xxx APIs.
 			 * When lock free read-write concurrency is enabled,
@@ -1548,10 +1757,34 @@  __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 			 * immediately (as readers might be using it still).
 			 * Hence freeing of the ext bkt is piggy-backed to
 			 * freeing of the key index.
+			 * If using external RCU, store this index in an array.
 			 */
-			h->ext_bkt_to_free[ret] = index;
-		else
-			rte_ring_sp_enqueue(h->free_ext_bkts, (void *)(uintptr_t)index);
+			if (h->qsv == NULL)
+				h->ext_bkt_to_free[ret] = index;
+			/* Enqueue this index to RCU fifo if using internal RCU
+			 * and no_free_on_del is disabled.
+			 */
+			else if (!h->no_free_on_del)
+				ext_bkt_to_free = index;
+		} else
+			rte_ring_sp_enqueue(h->free_ext_bkts,
+					(void *)(uintptr_t)index);
+	}
+
+return_key:
+	if (h->readwrite_concur_lf_support && h->qsv && !h->no_free_on_del) {
+		qs_item.token = rte_rcu_qsbr_start(h->qsv);
+		/* Key index where key is stored, adding the first dummy index */
+		key_to_free = ret + 1;
+		qs_item.index = ((uint64_t)ext_bkt_to_free << 32) | key_to_free;
+		/* Push into QSBR FIFO. */
+		if (__rte_hash_rcu_qsbr_fifo_push(h->qs_fifo, &qs_item) != 0)
+			RTE_LOG(ERR, HASH, "Failed to push QSBR FIFO\n");
+
+		/* Speculatively reclaim 'deleted' indexes.
+		 * Help spread the reclaim work load across multiple calls.
+		 */
+		__rte_hash_rcu_qsbr_try_reclaim(h);
 	}
 	__hash_rw_writer_unlock(h);
 	return ret;
@@ -1610,12 +1843,42 @@  rte_hash_free_key_with_position(const struct rte_hash *h,
 	/* Out of bounds */
 	if (key_idx >= total_entries)
 		return -EINVAL;
-	if (h->ext_table_support && h->readwrite_concur_lf_support) {
-		uint32_t index = h->ext_bkt_to_free[position];
-		if (index) {
-			/* Recycle empty ext bkt to free list. */
-			rte_ring_sp_enqueue(h->free_ext_bkts, (void *)(uintptr_t)index);
-			h->ext_bkt_to_free[position] = 0;
+	if (h->readwrite_concur_lf_support) {
+		if (!h->qsv) {
+			if (h->ext_table_support) {
+				__hash_rw_writer_lock(h);
+				uint32_t index = h->ext_bkt_to_free[position];
+				if (index) {
+					/* Recycle empty ext bkt to free list. */
+					rte_ring_sp_enqueue(h->free_ext_bkts,
+						(void *)(uintptr_t)index);
+					h->ext_bkt_to_free[position] = 0;
+				}
+				__hash_rw_writer_unlock(h);
+			}
+		} else if (h->no_free_on_del) {
+			/* Push into QSBR FIFO */
+			struct __rte_hash_qs_item qs_item;
+			uint32_t ext_bkt_to_free = 0;
+			uint32_t key_to_free = 0;
+			qs_item.token = rte_rcu_qsbr_start(h->qsv);
+			key_to_free = key_idx;
+			if (h->ext_table_support)
+				ext_bkt_to_free = h->ext_bkt_to_free[position];
+			qs_item.index = ((uint64_t)ext_bkt_to_free << 32) |
+						key_to_free;
+			__hash_rw_writer_lock(h);
+			/* Push into QSBR FIFO. */
+			if (__rte_hash_rcu_qsbr_fifo_push(h->qs_fifo, &qs_item)
+				!= 0)
+				RTE_LOG(ERR, HASH, "Failed to push QSBR FIFO\n");
+
+			/* Speculatively reclaim 'deleted' indexes.
+			 * Help spread the reclaim work load across multiple calls.
+			 */
+			__rte_hash_rcu_qsbr_try_reclaim(h);
+			__hash_rw_writer_unlock(h);
+			return 0;
 		}
 	}
 
@@ -2225,3 +2488,40 @@  rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32
 	(*next)++;
 	return position - 1;
 }
+
+__rte_experimental
+int rte_hash_rcu_qsbr_add(struct rte_hash *h, struct rte_rcu_qsbr *v,
+				rte_hash_free_key_data free_key_data_func)
+{
+	uint32_t qs_fifo_size;
+	char rcu_ring_name[RTE_RING_NAMESIZE];
+
+	if ((h == NULL) || (v == NULL)) {
+		rte_errno = EINVAL;
+		return 1;
+	}
+
+	if (h->qsv) {
+		rte_errno = EEXIST;
+		return 1;
+	}
+
+	/* round up qs_fifo_size to next power of two that is not less than
+	 * total hash table entries. Will store 'token' and 'index'.
+	 */
+	qs_fifo_size = 2 * rte_align32pow2(h->entries);
+
+	/* Init QSBR reclaiming FIFO. */
+	snprintf(rcu_ring_name, sizeof(rcu_ring_name), "HT_RCU_%s", h->name);
+	h->qs_fifo = rte_ring_create(rcu_ring_name, rte_align32pow2(qs_fifo_size),
+			SOCKET_ID_ANY, 0);
+	if (h->qs_fifo == NULL) {
+		RTE_LOG(ERR, HASH, "Hash QS FIFO memory allocation failed\n");
+		rte_errno = ENOMEM;
+		return 1;
+	}
+	h->qsv = v;
+	h->free_key_data_func = free_key_data_func;
+
+	return 0;
+}
diff --git a/lib/librte_hash/rte_cuckoo_hash.h b/lib/librte_hash/rte_cuckoo_hash.h
index fb19bb27dfef..c64494ee8b85 100644
--- a/lib/librte_hash/rte_cuckoo_hash.h
+++ b/lib/librte_hash/rte_cuckoo_hash.h
@@ -168,6 +168,9 @@  struct rte_hash {
 	struct lcore_cache *local_free_slots;
 	/**< Local cache per lcore, storing some indexes of the free slots */
 
+	struct rte_rcu_qsbr *qsv;
+	struct rte_ring *qs_fifo;
+	rte_hash_free_key_data free_key_data_func;
 	/* Fields used in lookup */
 
 	uint32_t key_len __rte_cache_aligned;
diff --git a/lib/librte_hash/rte_hash.h b/lib/librte_hash/rte_hash.h
index 0d73370dc483..20db00784d0b 100644
--- a/lib/librte_hash/rte_hash.h
+++ b/lib/librte_hash/rte_hash.h
@@ -15,6 +15,7 @@ 
 #include <stddef.h>
 
 #include <rte_compat.h>
+#include <rte_rcu_qsbr.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -44,8 +45,6 @@  extern "C" {
 
 /** Flag to disable freeing of key index on hash delete.
  * Refer to rte_hash_del_xxx APIs for more details.
- * This is enabled by default when RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF
- * is enabled.
  */
 #define RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL 0x10
 
@@ -69,6 +68,13 @@  typedef uint32_t (*rte_hash_function)(const void *key, uint32_t key_len,
 /** Type of function used to compare the hash key. */
 typedef int (*rte_hash_cmp_eq_t)(const void *key1, const void *key2, size_t key_len);
 
+/**
+ * Type of function used to free data stored in the key.
+ * Required when using internal RCU to allow appication to free key-data once
+ * the key is returned to the the ring of free key-slots.
+ */
+typedef void (*rte_hash_free_key_data)(void *key_data);
+
 /**
  * Parameters used when creating the hash table.
  */
@@ -267,8 +273,7 @@  rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, hash_sig_t
  * and should only be called from one thread by default.
  * Thread safety can be enabled by setting flag during
  * table creation.
- * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
- * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL is enabled,
  * the key index returned by rte_hash_add_key_xxx APIs will not be
  * freed by this API. rte_hash_free_key_with_position API must be called
  * additionally to free the index associated with the key.
@@ -296,8 +301,7 @@  rte_hash_del_key(const struct rte_hash *h, const void *key);
  * and should only be called from one thread by default.
  * Thread safety can be enabled by setting flag during
  * table creation.
- * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
- * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL is enabled,
  * the key index returned by rte_hash_add_key_xxx APIs will not be
  * freed by this API. rte_hash_free_key_with_position API must be called
  * additionally to free the index associated with the key.
@@ -350,8 +354,7 @@  rte_hash_get_key_with_position(const struct rte_hash *h, const int32_t position,
  * of the key. This operation is not multi-thread safe and should
  * only be called from one thread by default. Thread safety
  * can be enabled by setting flag during table creation.
- * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
- * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL is enabled,
  * the key index returned by rte_hash_del_key_xxx APIs must be freed
  * using this API. This API should be called after all the readers
  * have stopped referencing the entry corresponding to this key.
@@ -545,6 +548,25 @@  rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
  */
 int32_t
 rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32_t *next);
+
+/**
+ * Configure the RCU QSBR variable to use in hash structure.
+ *
+ * @param h
+ *   Hash table
+ * @param v
+ *   RCU QSBR variable
+ * @return
+ *   On success - 0
+ *   On error - 1 with error code set in rte_errno.
+ *   Possible rte_errno codes are:
+ *   - EINVAL if invalid pointer
+ *   - EEXIST if already added QSBR
+ *   - ENOMEM if memory allocation failure
+ */
+__rte_experimental
+int rte_hash_rcu_qsbr_add(struct rte_hash *h, struct rte_rcu_qsbr *v,
+			rte_hash_free_key_data free_key_data_func);
 #ifdef __cplusplus
 }
 #endif
diff --git a/lib/librte_hash/rte_hash_version.map b/lib/librte_hash/rte_hash_version.map
index 734ae28b0408..8c3b21891de0 100644
--- a/lib/librte_hash/rte_hash_version.map
+++ b/lib/librte_hash/rte_hash_version.map
@@ -58,5 +58,5 @@  EXPERIMENTAL {
 	global:
 
 	rte_hash_free_key_with_position;
-
+	rte_hash_rcu_qsbr_add;
 };