[RFC,v2] lib/hash: integrate RCU QSBR

Message ID 20200819040537.1792-1-dharmik.thakkar@arm.com (mailing list archive)
State Superseded, archived
Delegated to: David Marchand
Headers
Series [RFC,v2] lib/hash: integrate RCU QSBR |

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/Intel-compilation fail Compilation issues

Commit Message

Dharmik Thakkar Aug. 19, 2020, 4:05 a.m. UTC
  Integrate RCU QSBR to make it easier for the applications to use lock
free algorithm.

Resource reclamation implementation was split from the original
series, and has already been part of RCU library. Rework the series
to base hash integration on RCU reclamation APIs.

Refer 'Resource reclamation framework for DPDK' available at [1]
to understand various aspects of integrating RCU library
into other libraries.

[1] https://doc.dpdk.org/guides/prog_guide/rcu_lib.html

Introduce a new API rte_hash_rcu_qsbr_add for application to
register a RCU variable that hash library will use.

Suggested-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Signed-off-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
---
v2:
 - Remove defer queue related functions and use resource reclamation
   APIs from the RCU QSBR library instead

 - Remove patch (net/ixgbe: avoid multpile definitions of 'bool')
   from the series as it is already accepted

---
 lib/Makefile                         |   2 +-
 lib/librte_hash/Makefile             |   2 +-
 lib/librte_hash/meson.build          |   1 +
 lib/librte_hash/rte_cuckoo_hash.c    | 291 +++++++++++++++++++++------
 lib/librte_hash/rte_cuckoo_hash.h    |   8 +
 lib/librte_hash/rte_hash.h           |  75 ++++++-
 lib/librte_hash/rte_hash_version.map |   2 +-
 7 files changed, 308 insertions(+), 73 deletions(-)
  

Comments

Dharmik Thakkar Aug. 25, 2020, 7:59 p.m. UTC | #1
CI has reported some compilation issues for this patch. I will resolve these issues once the RFC patch is approved.

Thank you!

> On Aug 18, 2020, at 11:05 PM, Dharmik Thakkar <Dharmik.Thakkar@arm.com> wrote:
> 
> Integrate RCU QSBR to make it easier for the applications to use lock
> free algorithm.
> 
> Resource reclamation implementation was split from the original
> series, and has already been part of RCU library. Rework the series
> to base hash integration on RCU reclamation APIs.
> 
> Refer 'Resource reclamation framework for DPDK' available at [1]
> to understand various aspects of integrating RCU library
> into other libraries.
> 
> [1] https://doc.dpdk.org/guides/prog_guide/rcu_lib.html
> 
> Introduce a new API rte_hash_rcu_qsbr_add for application to
> register a RCU variable that hash library will use.
> 
> Suggested-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> Signed-off-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
> Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
> ---
> v2:
> - Remove defer queue related functions and use resource reclamation
>   APIs from the RCU QSBR library instead
> 
> - Remove patch (net/ixgbe: avoid multpile definitions of 'bool')
>   from the series as it is already accepted
> 
> ---
> lib/Makefile                         |   2 +-
> lib/librte_hash/Makefile             |   2 +-
> lib/librte_hash/meson.build          |   1 +
> lib/librte_hash/rte_cuckoo_hash.c    | 291 +++++++++++++++++++++------
> lib/librte_hash/rte_cuckoo_hash.h    |   8 +
> lib/librte_hash/rte_hash.h           |  75 ++++++-
> lib/librte_hash/rte_hash_version.map |   2 +-
> 7 files changed, 308 insertions(+), 73 deletions(-)
> 
> diff --git a/lib/Makefile b/lib/Makefile
> index 8f5b68a2d469..dffe31c829f0 100644
> --- a/lib/Makefile
> +++ b/lib/Makefile
> @@ -51,7 +51,7 @@ DIRS-$(CONFIG_RTE_LIBRTE_VHOST) += librte_vhost
> DEPDIRS-librte_vhost := librte_eal librte_mempool librte_mbuf librte_ethdev \
> 			librte_net librte_hash librte_cryptodev
> DIRS-$(CONFIG_RTE_LIBRTE_HASH) += librte_hash
> -DEPDIRS-librte_hash := librte_eal librte_ring
> +DEPDIRS-librte_hash := librte_eal librte_ring librte_rcu
> DIRS-$(CONFIG_RTE_LIBRTE_EFD) += librte_efd
> DEPDIRS-librte_efd := librte_eal librte_ring librte_hash
> DIRS-$(CONFIG_RTE_LIBRTE_RIB) += librte_rib
> diff --git a/lib/librte_hash/Makefile b/lib/librte_hash/Makefile
> index ec9f86499262..10e697f48652 100644
> --- a/lib/librte_hash/Makefile
> +++ b/lib/librte_hash/Makefile
> @@ -8,7 +8,7 @@ LIB = librte_hash.a
> 
> CFLAGS += -O3
> CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
> -LDLIBS += -lrte_eal -lrte_ring
> +LDLIBS += -lrte_eal -lrte_ring -lrte_rcu
> 
> EXPORT_MAP := rte_hash_version.map
> 
> diff --git a/lib/librte_hash/meson.build b/lib/librte_hash/meson.build
> index 6ab46ae9d768..0977a63fd279 100644
> --- a/lib/librte_hash/meson.build
> +++ b/lib/librte_hash/meson.build
> @@ -10,3 +10,4 @@ headers = files('rte_crc_arm64.h',
> 
> sources = files('rte_cuckoo_hash.c', 'rte_fbk_hash.c')
> deps += ['ring']
> +deps += ['rcu']
> diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c
> index 0a6d474713a2..01c2cbe0e38b 100644
> --- a/lib/librte_hash/rte_cuckoo_hash.c
> +++ b/lib/librte_hash/rte_cuckoo_hash.c
> @@ -52,6 +52,11 @@ static struct rte_tailq_elem rte_hash_tailq = {
> };
> EAL_REGISTER_TAILQ(rte_hash_tailq)
> 
> +struct __rte_hash_rcu_dq_entry {
> +	uint32_t key_idx;
> +	uint32_t ext_bkt_idx; /**< Extended bkt index */
> +};
> +
> struct rte_hash *
> rte_hash_find_existing(const char *name)
> {
> @@ -210,7 +215,10 @@ rte_hash_create(const struct rte_hash_parameters *params)
> 
> 	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF) {
> 		readwrite_concur_lf_support = 1;
> -		/* Enable not freeing internal memory/index on delete */
> +		/* Enable not freeing internal memory/index on delete.
> +		 * If internal RCU is enabled, freeing of internal memory/index
> +		 * is done on delete
> +		 */
> 		no_free_on_del = 1;
> 	}
> 
> @@ -505,6 +513,10 @@ rte_hash_free(struct rte_hash *h)
> 
> 	rte_mcfg_tailq_write_unlock();
> 
> +	/* RCU clean up. */
> +	if (h->dq)
> +		rte_rcu_qsbr_dq_delete(h->dq);
> +
> 	if (h->use_local_cache)
> 		rte_free(h->local_free_slots);
> 	if (h->writer_takes_lock)
> @@ -612,6 +624,16 @@ rte_hash_reset(struct rte_hash *h)
> 		return;
> 
> 	__hash_rw_writer_lock(h);
> +
> +	/* RCU clean up. */
> +	if (h->hash_rcu_cfg->v) {
> +		rte_rcu_qsbr_dq_delete(h->dq);
> +		h->dq = NULL;
> +		if (rte_hash_rcu_qsbr_add(h, h->hash_rcu_cfg) < 0) {
> +			RTE_LOG(ERR, HASH, "RCU init failed\n");
> +			return;
> +		}
> +	}
> 	memset(h->buckets, 0, h->num_buckets * sizeof(struct rte_hash_bucket));
> 	memset(h->key_store, 0, h->key_entry_size * (h->entries + 1));
> 	*h->tbl_chng_cnt = 0;
> @@ -952,6 +974,37 @@ rte_hash_cuckoo_make_space_mw(const struct rte_hash *h,
> 	return -ENOSPC;
> }
> 
> +static inline uint32_t
> +alloc_slot(const struct rte_hash *h, struct lcore_cache *cached_free_slots)
> +{
> +	unsigned int  n_slots;
> +	uint32_t slot_id;
> +	if (h->use_local_cache) {
> +		/* Try to get a free slot from the local cache */
> +		if (cached_free_slots->len == 0) {
> +			/* Need to get another burst of free slots from global ring */
> +			n_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots,
> +					cached_free_slots->objs,
> +					sizeof(uint32_t),
> +					LCORE_CACHE_SIZE, NULL);
> +			if (n_slots == 0)
> +				return EMPTY_SLOT;
> +
> +			cached_free_slots->len += n_slots;
> +		}
> +
> +		/* Get a free slot from the local cache */
> +		cached_free_slots->len--;
> +		slot_id = cached_free_slots->objs[cached_free_slots->len];
> +	} else {
> +		if (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id,
> +						sizeof(uint32_t)) != 0)
> +			return EMPTY_SLOT;
> +	}
> +
> +	return slot_id;
> +}
> +
> static inline int32_t
> __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
> 						hash_sig_t sig, void *data)
> @@ -963,7 +1016,6 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
> 	uint32_t ext_bkt_id = 0;
> 	uint32_t slot_id;
> 	int ret;
> -	unsigned n_slots;
> 	unsigned lcore_id;
> 	unsigned int i;
> 	struct lcore_cache *cached_free_slots = NULL;
> @@ -1001,28 +1053,19 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
> 	if (h->use_local_cache) {
> 		lcore_id = rte_lcore_id();
> 		cached_free_slots = &h->local_free_slots[lcore_id];
> -		/* Try to get a free slot from the local cache */
> -		if (cached_free_slots->len == 0) {
> -			/* Need to get another burst of free slots from global ring */
> -			n_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots,
> -					cached_free_slots->objs,
> -					sizeof(uint32_t),
> -					LCORE_CACHE_SIZE, NULL);
> -			if (n_slots == 0) {
> -				return -ENOSPC;
> +	}
> +	slot_id = alloc_slot(h, cached_free_slots);
> +	if (slot_id == EMPTY_SLOT) {
> +		if (h->hash_rcu_cfg->v) {
> +			if (rte_rcu_qsbr_dq_reclaim(h->dq,
> +						    h->hash_rcu_cfg->reclaim_max,
> +						    NULL, NULL, NULL)
> +					== 0) {
> +				slot_id = alloc_slot(h, cached_free_slots);
> 			}
> -
> -			cached_free_slots->len += n_slots;
> 		}
> -
> -		/* Get a free slot from the local cache */
> -		cached_free_slots->len--;
> -		slot_id = cached_free_slots->objs[cached_free_slots->len];
> -	} else {
> -		if (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id,
> -						sizeof(uint32_t)) != 0) {
> +		if (slot_id == EMPTY_SLOT)
> 			return -ENOSPC;
> -		}
> 	}
> 
> 	new_k = RTE_PTR_ADD(keys, slot_id * h->key_entry_size);
> @@ -1118,8 +1161,20 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
> 	if (rte_ring_sc_dequeue_elem(h->free_ext_bkts, &ext_bkt_id,
> 						sizeof(uint32_t)) != 0 ||
> 					ext_bkt_id == 0) {
> -		ret = -ENOSPC;
> -		goto failure;
> +		if (h->hash_rcu_cfg->v) {
> +			if (rte_rcu_qsbr_dq_reclaim(h->dq,
> +						    h->hash_rcu_cfg->reclaim_max,
> +						    NULL, NULL, NULL)
> +					== 0) {
> +				rte_ring_sc_dequeue_elem(h->free_ext_bkts,
> +							 &ext_bkt_id,
> +							 sizeof(uint32_t));
> +			}
> +		}
> +		if (ext_bkt_id == 0) {
> +			ret = -ENOSPC;
> +			goto failure;
> +		}
> 	}
> 
> 	/* Use the first location of the new bucket */
> @@ -1395,12 +1450,12 @@ rte_hash_lookup_data(const struct rte_hash *h, const void *key, void **data)
> 	return __rte_hash_lookup_with_hash(h, key, rte_hash_hash(h, key), data);
> }
> 
> -static inline void
> -remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
> +static int
> +free_slot(const struct rte_hash *h, uint32_t slot_id)
> {
> 	unsigned lcore_id, n_slots;
> 	struct lcore_cache *cached_free_slots;
> -
> +	/* Return key indexes to free slot ring */
> 	if (h->use_local_cache) {
> 		lcore_id = rte_lcore_id();
> 		cached_free_slots = &h->local_free_slots[lcore_id];
> @@ -1411,18 +1466,122 @@ remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
> 						cached_free_slots->objs,
> 						sizeof(uint32_t),
> 						LCORE_CACHE_SIZE, NULL);
> -			ERR_IF_TRUE((n_slots == 0),
> -				"%s: could not enqueue free slots in global ring\n",
> -				__func__);
> +			RETURN_IF_TRUE((n_slots == 0), -EFAULT);
> 			cached_free_slots->len -= n_slots;
> 		}
> -		/* Put index of new free slot in cache. */
> -		cached_free_slots->objs[cached_free_slots->len] =
> -							bkt->key_idx[i];
> -		cached_free_slots->len++;
> +	}
> +
> +	enqueue_slot_back(h, cached_free_slots, slot_id);
> +	return 0;
> +}
> +
> +static void
> +__hash_rcu_qsbr_free_resource(void *p, void *e, unsigned int n)
> +{
> +	void *key_data = NULL;
> +	int ret;
> +	struct rte_hash_key *keys, *k;
> +	struct rte_hash *h = (struct rte_hash *)p;
> +	struct __rte_hash_rcu_dq_entry rcu_dq_entry =
> +			*((struct __rte_hash_rcu_dq_entry *)e);
> +
> +	RTE_SET_USED(n);
> +	keys = h->key_store;
> +
> +	k = (struct rte_hash_key *) ((char *)keys +
> +				rcu_dq_entry.key_idx * h->key_entry_size);
> +	key_data = k->pdata;
> +	if (h->hash_rcu_cfg->free_key_data_func)
> +		h->hash_rcu_cfg->free_key_data_func(h->hash_rcu_cfg->key_data_ptr,
> +						    key_data);
> +
> +	if (h->ext_table_support && rcu_dq_entry.ext_bkt_idx != EMPTY_SLOT)
> +		/* Recycle empty ext bkt to free list. */
> +		rte_ring_sp_enqueue_elem(h->free_ext_bkts,
> +			&rcu_dq_entry.ext_bkt_idx, sizeof(uint32_t));
> +
> +	/* Return key indexes to free slot ring */
> +	ret = free_slot(h, rcu_dq_entry.key_idx);
> +	if (ret < 0) {
> +		RTE_LOG(ERR, HASH,
> +			"%s: could not enqueue free slots in global ring\n",
> +				__func__);
> +	}
> +}
> +
> +int
> +rte_hash_rcu_qsbr_add(struct rte_hash *h,
> +				struct rte_hash_rcu_config *cfg)
> +{
> +	struct rte_rcu_qsbr_dq_parameters params = {0};
> +	char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
> +	struct rte_hash_rcu_config *hash_rcu_cfg = NULL;
> +
> +	const uint32_t total_entries = h->use_local_cache ?
> +		h->entries + (RTE_MAX_LCORE - 1) * (LCORE_CACHE_SIZE - 1) + 1
> +							: h->entries + 1;
> +
> +	if ((h == NULL) || cfg == NULL) {
> +		rte_errno = EINVAL;
> +		return 1;
> +	}
> +
> +	hash_rcu_cfg = rte_zmalloc(NULL, sizeof(struct rte_hash_rcu_config), 0);
> +	if (hash_rcu_cfg == NULL) {
> +		RTE_LOG(ERR, HASH, "memory allocation failed\n");
> +		return 1;
> +	}
> +
> +	if (cfg->mode == RTE_HASH_QSBR_MODE_SYNC) {
> +		/* No other things to do. */
> +	} else if (cfg->mode == RTE_HASH_QSBR_MODE_DQ) {
> +		/* Init QSBR defer queue. */
> +		snprintf(rcu_dq_name, sizeof(rcu_dq_name),
> +					"HASH_RCU_%s", h->name);
> +		params.name = rcu_dq_name;
> +		params.size = cfg->dq_size;
> +		if (params.size == 0)
> +			params.size = total_entries;
> +		params.trigger_reclaim_limit = cfg->reclaim_thd;
> +		if (params.max_reclaim_size == 0)
> +			params.max_reclaim_size = RTE_HASH_RCU_DQ_RECLAIM_MAX;
> +		params.esize = sizeof(struct __rte_hash_rcu_dq_entry);
> +		params.free_fn = __hash_rcu_qsbr_free_resource;
> +		params.p = h;
> +		params.v = cfg->v;
> +		h->dq = rte_rcu_qsbr_dq_create(&params);
> +		if (h->dq == NULL) {
> +			rte_free(hash_rcu_cfg);
> +			RTE_LOG(ERR, HASH, "HASH defer queue creation failed\n");
> +			return 1;
> +		}
> 	} else {
> -		rte_ring_sp_enqueue_elem(h->free_slots,
> -				&bkt->key_idx[i], sizeof(uint32_t));
> +		rte_free(hash_rcu_cfg);
> +		rte_errno = EINVAL;
> +		return 1;
> +	}
> +
> +	hash_rcu_cfg->v = cfg->v;
> +	hash_rcu_cfg->mode = cfg->mode;
> +	hash_rcu_cfg->dq_size = cfg->dq_size;
> +	hash_rcu_cfg->reclaim_thd = cfg->reclaim_thd;
> +	hash_rcu_cfg->reclaim_max = cfg->reclaim_max;
> +	hash_rcu_cfg->free_key_data_func = cfg->free_key_data_func;
> +	hash_rcu_cfg->key_data_ptr = cfg->key_data_ptr;
> +
> +	h->hash_rcu_cfg = hash_rcu_cfg;
> +
> +	return 0;
> +}
> +
> +static inline void
> +remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
> +{
> +	int ret = free_slot(h, bkt->key_idx[i]);
> +	if (ret < 0) {
> +		RTE_LOG(ERR, HASH,
> +			"%s: could not enqueue free slots in global ring\n",
> +				__func__);
> 	}
> }
> 
> @@ -1521,6 +1680,8 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
> 	int pos;
> 	int32_t ret, i;
> 	uint16_t short_sig;
> +	uint32_t index = EMPTY_SLOT;
> +	struct __rte_hash_rcu_dq_entry rcu_dq_entry;
> 
> 	short_sig = get_short_sig(sig);
> 	prim_bucket_idx = get_prim_bucket_index(h, sig);
> @@ -1555,10 +1716,9 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
> 
> /* Search last bucket to see if empty to be recycled */
> return_bkt:
> -	if (!last_bkt) {
> -		__hash_rw_writer_unlock(h);
> -		return ret;
> -	}
> +	if (!last_bkt)
> +		goto return_key;
> +
> 	while (last_bkt->next) {
> 		prev_bkt = last_bkt;
> 		last_bkt = last_bkt->next;
> @@ -1571,11 +1731,11 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
> 	/* found empty bucket and recycle */
> 	if (i == RTE_HASH_BUCKET_ENTRIES) {
> 		prev_bkt->next = NULL;
> -		uint32_t index = last_bkt - h->buckets_ext + 1;
> +		index = last_bkt - h->buckets_ext + 1;
> 		/* Recycle the empty bkt if
> 		 * no_free_on_del is disabled.
> 		 */
> -		if (h->no_free_on_del)
> +		if (h->no_free_on_del) {
> 			/* Store index of an empty ext bkt to be recycled
> 			 * on calling rte_hash_del_xxx APIs.
> 			 * When lock free read-write concurrency is enabled,
> @@ -1583,12 +1743,32 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
> 			 * immediately (as readers might be using it still).
> 			 * Hence freeing of the ext bkt is piggy-backed to
> 			 * freeing of the key index.
> +			 * If using external RCU, store this index in an array.
> 			 */
> -			h->ext_bkt_to_free[ret] = index;
> -		else
> +			if (h->hash_rcu_cfg->v == NULL)
> +				h->ext_bkt_to_free[ret] = index;
> +		} else
> 			rte_ring_sp_enqueue_elem(h->free_ext_bkts, &index,
> 							sizeof(uint32_t));
> 	}
> +
> +return_key:
> +	/* Using internal RCU QSBR */
> +	if (h->hash_rcu_cfg->v) {
> +		/* Key index where key is stored, adding the first dummy index */
> +		rcu_dq_entry.key_idx = ret + 1;
> +		rcu_dq_entry.ext_bkt_idx = index;
> +		if (h->hash_rcu_cfg->mode == RTE_HASH_QSBR_MODE_SYNC) {
> +			/* Wait for quiescent state change. */
> +			rte_rcu_qsbr_synchronize(h->hash_rcu_cfg->v,
> +						 RTE_QSBR_THRID_INVALID);
> +			__hash_rcu_qsbr_free_resource((void *)((uintptr_t)h),
> +						      &rcu_dq_entry, 1);
> +		} else if (h->hash_rcu_cfg->mode == RTE_HASH_QSBR_MODE_DQ)
> +			/* Push into QSBR FIFO. */
> +			if (rte_rcu_qsbr_dq_enqueue(h->dq, &rcu_dq_entry) != 0)
> +				RTE_LOG(ERR, HASH, "Failed to push QSBR FIFO\n");
> +	}
> 	__hash_rw_writer_unlock(h);
> 	return ret;
> }
> @@ -1637,8 +1817,6 @@ rte_hash_free_key_with_position(const struct rte_hash *h,
> 
> 	RETURN_IF_TRUE(((h == NULL) || (key_idx == EMPTY_SLOT)), -EINVAL);
> 
> -	unsigned int lcore_id, n_slots;
> -	struct lcore_cache *cached_free_slots;
> 	const uint32_t total_entries = h->use_local_cache ?
> 		h->entries + (RTE_MAX_LCORE - 1) * (LCORE_CACHE_SIZE - 1) + 1
> 							: h->entries + 1;
> @@ -1656,28 +1834,9 @@ rte_hash_free_key_with_position(const struct rte_hash *h,
> 		}
> 	}
> 
> -	if (h->use_local_cache) {
> -		lcore_id = rte_lcore_id();
> -		cached_free_slots = &h->local_free_slots[lcore_id];
> -		/* Cache full, need to free it. */
> -		if (cached_free_slots->len == LCORE_CACHE_SIZE) {
> -			/* Need to enqueue the free slots in global ring. */
> -			n_slots = rte_ring_mp_enqueue_burst_elem(h->free_slots,
> -						cached_free_slots->objs,
> -						sizeof(uint32_t),
> -						LCORE_CACHE_SIZE, NULL);
> -			RETURN_IF_TRUE((n_slots == 0), -EFAULT);
> -			cached_free_slots->len -= n_slots;
> -		}
> -		/* Put index of new free slot in cache. */
> -		cached_free_slots->objs[cached_free_slots->len] = key_idx;
> -		cached_free_slots->len++;
> -	} else {
> -		rte_ring_sp_enqueue_elem(h->free_slots, &key_idx,
> -						sizeof(uint32_t));
> -	}
> +	/* Enqueue slot to cache/ring of free slots. */
> +	return free_slot(h, key_idx);
> 
> -	return 0;
> }
> 
> static inline void
> diff --git a/lib/librte_hash/rte_cuckoo_hash.h b/lib/librte_hash/rte_cuckoo_hash.h
> index 345de6bf9cfd..85be49d3bbe7 100644
> --- a/lib/librte_hash/rte_cuckoo_hash.h
> +++ b/lib/librte_hash/rte_cuckoo_hash.h
> @@ -168,6 +168,11 @@ struct rte_hash {
> 	struct lcore_cache *local_free_slots;
> 	/**< Local cache per lcore, storing some indexes of the free slots */
> 
> +	/* RCU config */
> +	struct rte_hash_rcu_config *hash_rcu_cfg;
> +	/**< HASH RCU QSBR configuration structure */
> +	struct rte_rcu_qsbr_dq *dq;	/**< RCU QSBR defer queue. */
> +
> 	/* Fields used in lookup */
> 
> 	uint32_t key_len __rte_cache_aligned;
> @@ -230,4 +235,7 @@ struct queue_node {
> 	int prev_slot;               /* Parent(slot) in search path */
> };
> 
> +/** @internal Default RCU defer queue entries to reclaim in one go. */
> +#define RTE_HASH_RCU_DQ_RECLAIM_MAX	16
> +
> #endif
> diff --git a/lib/librte_hash/rte_hash.h b/lib/librte_hash/rte_hash.h
> index bff40251bc98..5431bcf4aeb1 100644
> --- a/lib/librte_hash/rte_hash.h
> +++ b/lib/librte_hash/rte_hash.h
> @@ -15,6 +15,7 @@
> #include <stddef.h>
> 
> #include <rte_compat.h>
> +#include <rte_rcu_qsbr.h>
> 
> #ifdef __cplusplus
> extern "C" {
> @@ -45,7 +46,8 @@ extern "C" {
> /** Flag to disable freeing of key index on hash delete.
>  * Refer to rte_hash_del_xxx APIs for more details.
>  * This is enabled by default when RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF
> - * is enabled.
> + * is enabled. However, if internal RCU is enabled, freeing of internal
> + * memory/index is done on delete
>  */
> #define RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL 0x10
> 
> @@ -67,6 +69,13 @@ typedef uint32_t (*rte_hash_function)(const void *key, uint32_t key_len,
> /** Type of function used to compare the hash key. */
> typedef int (*rte_hash_cmp_eq_t)(const void *key1, const void *key2, size_t key_len);
> 
> +/**
> + * Type of function used to free data stored in the key.
> + * Required when using internal RCU to allow application to free key-data once
> + * the key is returned to the the ring of free key-slots.
> + */
> +typedef void (*rte_hash_free_key_data)(void *p, void *key_data);
> +
> /**
>  * Parameters used when creating the hash table.
>  */
> @@ -81,6 +90,39 @@ struct rte_hash_parameters {
> 	uint8_t extra_flag;		/**< Indicate if additional parameters are present. */
> };
> 
> +/** RCU reclamation modes */
> +enum rte_hash_qsbr_mode {
> +	/** Create defer queue for reclaim. */
> +	RTE_HASH_QSBR_MODE_DQ = 0,
> +	/** Use blocking mode reclaim. No defer queue created. */
> +	RTE_HASH_QSBR_MODE_SYNC
> +};
> +
> +/** HASH RCU QSBR configuration structure. */
> +struct rte_hash_rcu_config {
> +	struct rte_rcu_qsbr *v;		/**< RCU QSBR variable. */
> +	enum rte_hash_qsbr_mode mode;
> +	/**< Mode of RCU QSBR. RTE_HASH_QSBR_MODE_xxx
> +	 * '0' for default: create defer queue for reclaim.
> +	 */
> +	uint32_t dq_size;
> +	/**< RCU defer queue size.
> +	 * default: total hash table entries.
> +	 */
> +	uint32_t reclaim_thd;	/**< Threshold to trigger auto reclaim. */
> +	uint32_t reclaim_max;
> +	/**< Max entries to reclaim in one go.
> +	 * default: RTE_HASH_RCU_DQ_RECLAIM_MAX.
> +	 */
> +	void *key_data_ptr;
> +	/**< Pointer passed to the free function. Typically, this is the
> +	 * pointer to the data structure to which the resource to free
> +	 * (key-data) belongs. This can be NULL.
> +	 */
> +	rte_hash_free_key_data free_key_data_func;
> +	/**< Function to call to free the resource (key-data). */
> +};
> +
> /** @internal A hash table structure. */
> struct rte_hash;
> 
> @@ -287,7 +329,8 @@ rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, hash_sig_t
>  * Thread safety can be enabled by setting flag during
>  * table creation.
>  * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
> - * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
> + * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and
> + * internal RCU is NOT enabled,
>  * the key index returned by rte_hash_add_key_xxx APIs will not be
>  * freed by this API. rte_hash_free_key_with_position API must be called
>  * additionally to free the index associated with the key.
> @@ -316,7 +359,8 @@ rte_hash_del_key(const struct rte_hash *h, const void *key);
>  * Thread safety can be enabled by setting flag during
>  * table creation.
>  * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
> - * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
> + * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and
> + * internal RCU is NOT enabled,
>  * the key index returned by rte_hash_add_key_xxx APIs will not be
>  * freed by this API. rte_hash_free_key_with_position API must be called
>  * additionally to free the index associated with the key.
> @@ -370,7 +414,8 @@ rte_hash_get_key_with_position(const struct rte_hash *h, const int32_t position,
>  * only be called from one thread by default. Thread safety
>  * can be enabled by setting flag during table creation.
>  * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
> - * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
> + * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and
> + * internal RCU is NOT enabled,
>  * the key index returned by rte_hash_del_key_xxx APIs must be freed
>  * using this API. This API should be called after all the readers
>  * have stopped referencing the entry corresponding to this key.
> @@ -625,6 +670,28 @@ rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
>  */
> int32_t
> rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32_t *next);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Associate RCU QSBR variable with an Hash object.
> + *
> + * @param h
> + *   the hash object to add RCU QSBR
> + * @param cfg
> + *   RCU QSBR configuration
> + * @return
> + *   On success - 0
> + *   On error - 1 with error code set in rte_errno.
> + *   Possible rte_errno codes are:
> + *   - EINVAL - invalid pointer
> + *   - EEXIST - already added QSBR
> + *   - ENOMEM - memory allocation failure
> + */
> +__rte_experimental
> +int rte_hash_rcu_qsbr_add(struct rte_hash *h,
> +				struct rte_hash_rcu_config *cfg);
> #ifdef __cplusplus
> }
> #endif
> diff --git a/lib/librte_hash/rte_hash_version.map b/lib/librte_hash/rte_hash_version.map
> index c0db81014ff9..c6d73080f478 100644
> --- a/lib/librte_hash/rte_hash_version.map
> +++ b/lib/librte_hash/rte_hash_version.map
> @@ -36,5 +36,5 @@ EXPERIMENTAL {
> 	rte_hash_lookup_with_hash_bulk;
> 	rte_hash_lookup_with_hash_bulk_data;
> 	rte_hash_max_key_id;
> -
> +	rte_hash_rcu_qsbr_add;
> };
> -- 
> 2.17.1
>
  
Wang, Yipeng1 Aug. 31, 2020, 8:47 p.m. UTC | #2
> -----Original Message-----
> From: Dharmik Thakkar <dharmik.thakkar@arm.com>
> Sent: Tuesday, August 18, 2020 9:06 PM
> To: Wang, Yipeng1 <yipeng1.wang@intel.com>; Gobriel, Sameh
> <sameh.gobriel@intel.com>; Richardson, Bruce <bruce.richardson@intel.com>;
> Ray Kinsella <mdr@ashroe.eu>; Neil Horman <nhorman@tuxdriver.com>
> Cc: dev@dpdk.org; nd@arm.com; Dharmik Thakkar
> <dharmik.thakkar@arm.com>
> Subject: [RFC v2] lib/hash: integrate RCU QSBR
> 
> Integrate RCU QSBR to make it easier for the applications to use lock free
> algorithm.
> 
> Resource reclamation implementation was split from the original series, and
> has already been part of RCU library. Rework the series to base hash
> integration on RCU reclamation APIs.
> 
> Refer 'Resource reclamation framework for DPDK' available at [1] to
> understand various aspects of integrating RCU library into other libraries.
> 
> [1] https://doc.dpdk.org/guides/prog_guide/rcu_lib.html
> 
> Introduce a new API rte_hash_rcu_qsbr_add for application to register a RCU
> variable that hash library will use.
> 
> Suggested-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> Signed-off-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
> Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
> ---
> v2:
>  - Remove defer queue related functions and use resource reclamation
>    APIs from the RCU QSBR library instead
> 
>  - Remove patch (net/ixgbe: avoid multpile definitions of 'bool')
>    from the series as it is already accepted
> 
> ---
>  lib/Makefile                         |   2 +-
>  lib/librte_hash/Makefile             |   2 +-
>  lib/librte_hash/meson.build          |   1 +
>  lib/librte_hash/rte_cuckoo_hash.c    | 291 +++++++++++++++++++++------
>  lib/librte_hash/rte_cuckoo_hash.h    |   8 +
>  lib/librte_hash/rte_hash.h           |  75 ++++++-
>  lib/librte_hash/rte_hash_version.map |   2 +-
>  7 files changed, 308 insertions(+), 73 deletions(-)
> 

<......>


> +/** HASH RCU QSBR configuration structure. */ struct
> +rte_hash_rcu_config {
> +	struct rte_rcu_qsbr *v;		/**< RCU QSBR variable. */
> +	enum rte_hash_qsbr_mode mode;
> +	/**< Mode of RCU QSBR. RTE_HASH_QSBR_MODE_xxx
> +	 * '0' for default: create defer queue for reclaim.
> +	 */
> +	uint32_t dq_size;
> +	/**< RCU defer queue size.
> +	 * default: total hash table entries.
> +	 */
> +	uint32_t reclaim_thd;	/**< Threshold to trigger auto reclaim. */
> +	uint32_t reclaim_max;
> +	/**< Max entries to reclaim in one go.
> +	 * default: RTE_HASH_RCU_DQ_RECLAIM_MAX.
> +	 */
> +	void *key_data_ptr;
> +	/**< Pointer passed to the free function. Typically, this is the
> +	 * pointer to the data structure to which the resource to free
> +	 * (key-data) belongs. This can be NULL.
> +	 */
> +	rte_hash_free_key_data free_key_data_func;
> +	/**< Function to call to free the resource (key-data). */ };
> +
[Wang, Yipeng] 
I guess this is mostly a wrapper of rte_rcu_qsbr_dq_parameters.
Personally, I incline to use variable names that match the existing qsbr parameters better.
For example, you could still call reclaim_thd as reclaim_limit. And _max to be _size.
Thus, people who are already familiar with qsbr can match the meanings better.


>  /** @internal A hash table structure. */  struct rte_hash;
> 
> @@ -287,7 +329,8 @@ rte_hash_add_key_with_hash(const struct rte_hash *h,
> const void *key, hash_sig_t
>   * Thread safety can be enabled by setting flag during
>   * table creation.
>   * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
> - * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
> + * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and
> + * internal RCU is NOT enabled,
>   * the key index returned by rte_hash_add_key_xxx APIs will not be
>   * freed by this API. rte_hash_free_key_with_position API must be called
>   * additionally to free the index associated with the key.
> @@ -316,7 +359,8 @@ rte_hash_del_key(const struct rte_hash *h, const void
> *key);
>   * Thread safety can be enabled by setting flag during
>   * table creation.
>   * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
> - * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
> + * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and
> + * internal RCU is NOT enabled,
>   * the key index returned by rte_hash_add_key_xxx APIs will not be
>   * freed by this API. rte_hash_free_key_with_position API must be called
>   * additionally to free the index associated with the key.
> @@ -370,7 +414,8 @@ rte_hash_get_key_with_position(const struct rte_hash
> *h, const int32_t position,
>   * only be called from one thread by default. Thread safety
>   * can be enabled by setting flag during table creation.
>   * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
> - * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
> + * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and
> + * internal RCU is NOT enabled,
>   * the key index returned by rte_hash_del_key_xxx APIs must be freed
>   * using this API. This API should be called after all the readers
>   * have stopped referencing the entry corresponding to this key.
> @@ -625,6 +670,28 @@ rte_hash_lookup_bulk(const struct rte_hash *h, const
> void **keys,
>   */
>  int32_t
>  rte_hash_iterate(const struct rte_hash *h, const void **key, void **data,
> uint32_t *next);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Associate RCU QSBR variable with an Hash object.
[Wang, Yipeng] To enable RCU we need to call this func.
I think you can be more explicit, e.g. "This API should be called to enable the RCU support"

> + *
> + * @param h
> + *   the hash object to add RCU QSBR
> + * @param cfg
> + *   RCU QSBR configuration
> + * @return
> + *   On success - 0
> + *   On error - 1 with error code set in rte_errno.
> + *   Possible rte_errno codes are:
> + *   - EINVAL - invalid pointer
> + *   - EEXIST - already added QSBR
> + *   - ENOMEM - memory allocation failure
> + */
[Wang, Yipeng] Is there any further requirement for when to call this API? 
E.g. you could say "this API should be called immediately after rte_hash_create()"
 
> +__rte_experimental
> +int rte_hash_rcu_qsbr_add(struct rte_hash *h,
> +				struct rte_hash_rcu_config *cfg);
>  #ifdef __cplusplus
>  }
>  #endif
> diff --git a/lib/librte_hash/rte_hash_version.map
> b/lib/librte_hash/rte_hash_version.map
> index c0db81014ff9..c6d73080f478 100644
> --- a/lib/librte_hash/rte_hash_version.map
> +++ b/lib/librte_hash/rte_hash_version.map
> @@ -36,5 +36,5 @@ EXPERIMENTAL {
>  	rte_hash_lookup_with_hash_bulk;
>  	rte_hash_lookup_with_hash_bulk_data;
>  	rte_hash_max_key_id;
> -
> +	rte_hash_rcu_qsbr_add;
>  };
> --
> 2.17.1
[Wang, Yipeng] 
Hi, Dharmik,
Thanks for the patch. It generally looks good to me. 
I guess you will revise documentation and the unit test as well after the RFC.
That is helpful for users to understand how to use hash appropriately with the RCU lib.
  
Dharmik Thakkar Sept. 1, 2020, 10:01 p.m. UTC | #3
Hi Yipeng,
Thank you for the comments!

> On Aug 31, 2020, at 3:47 PM, Wang, Yipeng1 <yipeng1.wang@intel.com> wrote:
> 
>> -----Original Message-----
>> From: Dharmik Thakkar <dharmik.thakkar@arm.com>
>> Sent: Tuesday, August 18, 2020 9:06 PM
>> To: Wang, Yipeng1 <yipeng1.wang@intel.com>; Gobriel, Sameh
>> <sameh.gobriel@intel.com>; Richardson, Bruce <bruce.richardson@intel.com>;
>> Ray Kinsella <mdr@ashroe.eu>; Neil Horman <nhorman@tuxdriver.com>
>> Cc: dev@dpdk.org; nd@arm.com; Dharmik Thakkar
>> <dharmik.thakkar@arm.com>
>> Subject: [RFC v2] lib/hash: integrate RCU QSBR
>> 
>> Integrate RCU QSBR to make it easier for the applications to use lock free
>> algorithm.
>> 
>> Resource reclamation implementation was split from the original series, and
>> has already been part of RCU library. Rework the series to base hash
>> integration on RCU reclamation APIs.
>> 
>> Refer 'Resource reclamation framework for DPDK' available at [1] to
>> understand various aspects of integrating RCU library into other libraries.
>> 
>> [1] https://doc.dpdk.org/guides/prog_guide/rcu_lib.html
>> 
>> Introduce a new API rte_hash_rcu_qsbr_add for application to register a RCU
>> variable that hash library will use.
>> 
>> Suggested-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
>> Signed-off-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
>> Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
>> ---
>> v2:
>> - Remove defer queue related functions and use resource reclamation
>>   APIs from the RCU QSBR library instead
>> 
>> - Remove patch (net/ixgbe: avoid multpile definitions of 'bool')
>>   from the series as it is already accepted
>> 
>> ---
>> lib/Makefile                         |   2 +-
>> lib/librte_hash/Makefile             |   2 +-
>> lib/librte_hash/meson.build          |   1 +
>> lib/librte_hash/rte_cuckoo_hash.c    | 291 +++++++++++++++++++++------
>> lib/librte_hash/rte_cuckoo_hash.h    |   8 +
>> lib/librte_hash/rte_hash.h           |  75 ++++++-
>> lib/librte_hash/rte_hash_version.map |   2 +-
>> 7 files changed, 308 insertions(+), 73 deletions(-)
>> 
> 
> <......>
> 
> 
>> +/** HASH RCU QSBR configuration structure. */ struct
>> +rte_hash_rcu_config {
>> +	struct rte_rcu_qsbr *v;		/**< RCU QSBR variable. */
>> +	enum rte_hash_qsbr_mode mode;
>> +	/**< Mode of RCU QSBR. RTE_HASH_QSBR_MODE_xxx
>> +	 * '0' for default: create defer queue for reclaim.
>> +	 */
>> +	uint32_t dq_size;
>> +	/**< RCU defer queue size.
>> +	 * default: total hash table entries.
>> +	 */
>> +	uint32_t reclaim_thd;	/**< Threshold to trigger auto reclaim. */
>> +	uint32_t reclaim_max;
>> +	/**< Max entries to reclaim in one go.
>> +	 * default: RTE_HASH_RCU_DQ_RECLAIM_MAX.
>> +	 */
>> +	void *key_data_ptr;
>> +	/**< Pointer passed to the free function. Typically, this is the
>> +	 * pointer to the data structure to which the resource to free
>> +	 * (key-data) belongs. This can be NULL.
>> +	 */
>> +	rte_hash_free_key_data free_key_data_func;
>> +	/**< Function to call to free the resource (key-data). */ };
>> +
> [Wang, Yipeng] 
> I guess this is mostly a wrapper of rte_rcu_qsbr_dq_parameters.
> Personally, I incline to use variable names that match the existing qsbr parameters better.
> For example, you could still call reclaim_thd as reclaim_limit. And _max to be _size.
> Thus, people who are already familiar with qsbr can match the meanings better.
> 

Makes sense. I will update it.

> 
>> /** @internal A hash table structure. */  struct rte_hash;
>> 
>> @@ -287,7 +329,8 @@ rte_hash_add_key_with_hash(const struct rte_hash *h,
>> const void *key, hash_sig_t
>>  * Thread safety can be enabled by setting flag during
>>  * table creation.
>>  * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
>> - * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
>> + * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and
>> + * internal RCU is NOT enabled,
>>  * the key index returned by rte_hash_add_key_xxx APIs will not be
>>  * freed by this API. rte_hash_free_key_with_position API must be called
>>  * additionally to free the index associated with the key.
>> @@ -316,7 +359,8 @@ rte_hash_del_key(const struct rte_hash *h, const void
>> *key);
>>  * Thread safety can be enabled by setting flag during
>>  * table creation.
>>  * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
>> - * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
>> + * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and
>> + * internal RCU is NOT enabled,
>>  * the key index returned by rte_hash_add_key_xxx APIs will not be
>>  * freed by this API. rte_hash_free_key_with_position API must be called
>>  * additionally to free the index associated with the key.
>> @@ -370,7 +414,8 @@ rte_hash_get_key_with_position(const struct rte_hash
>> *h, const int32_t position,
>>  * only be called from one thread by default. Thread safety
>>  * can be enabled by setting flag during table creation.
>>  * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
>> - * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
>> + * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and
>> + * internal RCU is NOT enabled,
>>  * the key index returned by rte_hash_del_key_xxx APIs must be freed
>>  * using this API. This API should be called after all the readers
>>  * have stopped referencing the entry corresponding to this key.
>> @@ -625,6 +670,28 @@ rte_hash_lookup_bulk(const struct rte_hash *h, const
>> void **keys,
>>  */
>> int32_t
>> rte_hash_iterate(const struct rte_hash *h, const void **key, void **data,
>> uint32_t *next);
>> +
>> +/**
>> + * @warning
>> + * @b EXPERIMENTAL: this API may change without prior notice
>> + *
>> + * Associate RCU QSBR variable with an Hash object.
> [Wang, Yipeng] To enable RCU we need to call this func.
> I think you can be more explicit, e.g. "This API should be called to enable the RCU support"
> 

Yes.

>> + *
>> + * @param h
>> + *   the hash object to add RCU QSBR
>> + * @param cfg
>> + *   RCU QSBR configuration
>> + * @return
>> + *   On success - 0
>> + *   On error - 1 with error code set in rte_errno.
>> + *   Possible rte_errno codes are:
>> + *   - EINVAL - invalid pointer
>> + *   - EEXIST - already added QSBR
>> + *   - ENOMEM - memory allocation failure
>> + */
> [Wang, Yipeng] Is there any further requirement for when to call this API? 
> E.g. you could say "this API should be called immediately after rte_hash_create()"
> 

Sure, I will add further guidelines/requirements.

>> +__rte_experimental
>> +int rte_hash_rcu_qsbr_add(struct rte_hash *h,
>> +				struct rte_hash_rcu_config *cfg);
>> #ifdef __cplusplus
>> }
>> #endif
>> diff --git a/lib/librte_hash/rte_hash_version.map
>> b/lib/librte_hash/rte_hash_version.map
>> index c0db81014ff9..c6d73080f478 100644
>> --- a/lib/librte_hash/rte_hash_version.map
>> +++ b/lib/librte_hash/rte_hash_version.map
>> @@ -36,5 +36,5 @@ EXPERIMENTAL {
>> 	rte_hash_lookup_with_hash_bulk;
>> 	rte_hash_lookup_with_hash_bulk_data;
>> 	rte_hash_max_key_id;
>> -
>> +	rte_hash_rcu_qsbr_add;
>> };
>> --
>> 2.17.1
> [Wang, Yipeng] 
> Hi, Dharmik,
> Thanks for the patch. It generally looks good to me. 

That’s great. I will convert it to a patch.

> I guess you will revise documentation and the unit test as well after the RFC.
> That is helpful for users to understand how to use hash appropriately with the RCU lib.

Yes, I will add the documentation and unit test patches.
  
David Marchand Oct. 16, 2020, 1:53 p.m. UTC | #4
Hello Dharmik,

On Wed, Sep 2, 2020 at 12:01 AM Dharmik Thakkar <Dharmik.Thakkar@arm.com> wrote:
> > [Wang, Yipeng]
> > Hi, Dharmik,
> > Thanks for the patch. It generally looks good to me.
>
> That’s great. I will convert it to a patch.
>
> > I guess you will revise documentation and the unit test as well after the RFC.
> > That is helpful for users to understand how to use hash appropriately with the RCU lib.
>
> Yes, I will add the documentation and unit test patches.

I did not see a non RFC patch.
Is this still targeting 20.11?
  
Dharmik Thakkar Oct. 16, 2020, 3:04 p.m. UTC | #5
Hi David,

> On Oct 16, 2020, at 8:53 AM, David Marchand <david.marchand@redhat.com> wrote:
> 
> Hello Dharmik,
> 
> On Wed, Sep 2, 2020 at 12:01 AM Dharmik Thakkar <Dharmik.Thakkar@arm.com> wrote:
>>> [Wang, Yipeng]
>>> Hi, Dharmik,
>>> Thanks for the patch. It generally looks good to me.
>> 
>> That’s great. I will convert it to a patch.
>> 
>>> I guess you will revise documentation and the unit test as well after the RFC.
>>> That is helpful for users to understand how to use hash appropriately with the RCU lib.
>> 
>> Yes, I will add the documentation and unit test patches.
> 
> I did not see a non RFC patch.
> Is this still targeting 20.11?
> 

Yes, I will send out the patch series today. Sorry, it got delayed!

> 
> -- 
> David Marchand
>
  

Patch

diff --git a/lib/Makefile b/lib/Makefile
index 8f5b68a2d469..dffe31c829f0 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -51,7 +51,7 @@  DIRS-$(CONFIG_RTE_LIBRTE_VHOST) += librte_vhost
 DEPDIRS-librte_vhost := librte_eal librte_mempool librte_mbuf librte_ethdev \
 			librte_net librte_hash librte_cryptodev
 DIRS-$(CONFIG_RTE_LIBRTE_HASH) += librte_hash
-DEPDIRS-librte_hash := librte_eal librte_ring
+DEPDIRS-librte_hash := librte_eal librte_ring librte_rcu
 DIRS-$(CONFIG_RTE_LIBRTE_EFD) += librte_efd
 DEPDIRS-librte_efd := librte_eal librte_ring librte_hash
 DIRS-$(CONFIG_RTE_LIBRTE_RIB) += librte_rib
diff --git a/lib/librte_hash/Makefile b/lib/librte_hash/Makefile
index ec9f86499262..10e697f48652 100644
--- a/lib/librte_hash/Makefile
+++ b/lib/librte_hash/Makefile
@@ -8,7 +8,7 @@  LIB = librte_hash.a
 
 CFLAGS += -O3
 CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
-LDLIBS += -lrte_eal -lrte_ring
+LDLIBS += -lrte_eal -lrte_ring -lrte_rcu
 
 EXPORT_MAP := rte_hash_version.map
 
diff --git a/lib/librte_hash/meson.build b/lib/librte_hash/meson.build
index 6ab46ae9d768..0977a63fd279 100644
--- a/lib/librte_hash/meson.build
+++ b/lib/librte_hash/meson.build
@@ -10,3 +10,4 @@  headers = files('rte_crc_arm64.h',
 
 sources = files('rte_cuckoo_hash.c', 'rte_fbk_hash.c')
 deps += ['ring']
+deps += ['rcu']
diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c
index 0a6d474713a2..01c2cbe0e38b 100644
--- a/lib/librte_hash/rte_cuckoo_hash.c
+++ b/lib/librte_hash/rte_cuckoo_hash.c
@@ -52,6 +52,11 @@  static struct rte_tailq_elem rte_hash_tailq = {
 };
 EAL_REGISTER_TAILQ(rte_hash_tailq)
 
+struct __rte_hash_rcu_dq_entry {
+	uint32_t key_idx;
+	uint32_t ext_bkt_idx; /**< Extended bkt index */
+};
+
 struct rte_hash *
 rte_hash_find_existing(const char *name)
 {
@@ -210,7 +215,10 @@  rte_hash_create(const struct rte_hash_parameters *params)
 
 	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF) {
 		readwrite_concur_lf_support = 1;
-		/* Enable not freeing internal memory/index on delete */
+		/* Enable not freeing internal memory/index on delete.
+		 * If internal RCU is enabled, freeing of internal memory/index
+		 * is done on delete
+		 */
 		no_free_on_del = 1;
 	}
 
@@ -505,6 +513,10 @@  rte_hash_free(struct rte_hash *h)
 
 	rte_mcfg_tailq_write_unlock();
 
+	/* RCU clean up. */
+	if (h->dq)
+		rte_rcu_qsbr_dq_delete(h->dq);
+
 	if (h->use_local_cache)
 		rte_free(h->local_free_slots);
 	if (h->writer_takes_lock)
@@ -612,6 +624,16 @@  rte_hash_reset(struct rte_hash *h)
 		return;
 
 	__hash_rw_writer_lock(h);
+
+	/* RCU clean up. */
+	if (h->hash_rcu_cfg->v) {
+		rte_rcu_qsbr_dq_delete(h->dq);
+		h->dq = NULL;
+		if (rte_hash_rcu_qsbr_add(h, h->hash_rcu_cfg) < 0) {
+			RTE_LOG(ERR, HASH, "RCU init failed\n");
+			return;
+		}
+	}
 	memset(h->buckets, 0, h->num_buckets * sizeof(struct rte_hash_bucket));
 	memset(h->key_store, 0, h->key_entry_size * (h->entries + 1));
 	*h->tbl_chng_cnt = 0;
@@ -952,6 +974,37 @@  rte_hash_cuckoo_make_space_mw(const struct rte_hash *h,
 	return -ENOSPC;
 }
 
+static inline uint32_t
+alloc_slot(const struct rte_hash *h, struct lcore_cache *cached_free_slots)
+{
+	unsigned int  n_slots;
+	uint32_t slot_id;
+	if (h->use_local_cache) {
+		/* Try to get a free slot from the local cache */
+		if (cached_free_slots->len == 0) {
+			/* Need to get another burst of free slots from global ring */
+			n_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots,
+					cached_free_slots->objs,
+					sizeof(uint32_t),
+					LCORE_CACHE_SIZE, NULL);
+			if (n_slots == 0)
+				return EMPTY_SLOT;
+
+			cached_free_slots->len += n_slots;
+		}
+
+		/* Get a free slot from the local cache */
+		cached_free_slots->len--;
+		slot_id = cached_free_slots->objs[cached_free_slots->len];
+	} else {
+		if (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id,
+						sizeof(uint32_t)) != 0)
+			return EMPTY_SLOT;
+	}
+
+	return slot_id;
+}
+
 static inline int32_t
 __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 						hash_sig_t sig, void *data)
@@ -963,7 +1016,6 @@  __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 	uint32_t ext_bkt_id = 0;
 	uint32_t slot_id;
 	int ret;
-	unsigned n_slots;
 	unsigned lcore_id;
 	unsigned int i;
 	struct lcore_cache *cached_free_slots = NULL;
@@ -1001,28 +1053,19 @@  __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 	if (h->use_local_cache) {
 		lcore_id = rte_lcore_id();
 		cached_free_slots = &h->local_free_slots[lcore_id];
-		/* Try to get a free slot from the local cache */
-		if (cached_free_slots->len == 0) {
-			/* Need to get another burst of free slots from global ring */
-			n_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots,
-					cached_free_slots->objs,
-					sizeof(uint32_t),
-					LCORE_CACHE_SIZE, NULL);
-			if (n_slots == 0) {
-				return -ENOSPC;
+	}
+	slot_id = alloc_slot(h, cached_free_slots);
+	if (slot_id == EMPTY_SLOT) {
+		if (h->hash_rcu_cfg->v) {
+			if (rte_rcu_qsbr_dq_reclaim(h->dq,
+						    h->hash_rcu_cfg->reclaim_max,
+						    NULL, NULL, NULL)
+					== 0) {
+				slot_id = alloc_slot(h, cached_free_slots);
 			}
-
-			cached_free_slots->len += n_slots;
 		}
-
-		/* Get a free slot from the local cache */
-		cached_free_slots->len--;
-		slot_id = cached_free_slots->objs[cached_free_slots->len];
-	} else {
-		if (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id,
-						sizeof(uint32_t)) != 0) {
+		if (slot_id == EMPTY_SLOT)
 			return -ENOSPC;
-		}
 	}
 
 	new_k = RTE_PTR_ADD(keys, slot_id * h->key_entry_size);
@@ -1118,8 +1161,20 @@  __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 	if (rte_ring_sc_dequeue_elem(h->free_ext_bkts, &ext_bkt_id,
 						sizeof(uint32_t)) != 0 ||
 					ext_bkt_id == 0) {
-		ret = -ENOSPC;
-		goto failure;
+		if (h->hash_rcu_cfg->v) {
+			if (rte_rcu_qsbr_dq_reclaim(h->dq,
+						    h->hash_rcu_cfg->reclaim_max,
+						    NULL, NULL, NULL)
+					== 0) {
+				rte_ring_sc_dequeue_elem(h->free_ext_bkts,
+							 &ext_bkt_id,
+							 sizeof(uint32_t));
+			}
+		}
+		if (ext_bkt_id == 0) {
+			ret = -ENOSPC;
+			goto failure;
+		}
 	}
 
 	/* Use the first location of the new bucket */
@@ -1395,12 +1450,12 @@  rte_hash_lookup_data(const struct rte_hash *h, const void *key, void **data)
 	return __rte_hash_lookup_with_hash(h, key, rte_hash_hash(h, key), data);
 }
 
-static inline void
-remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
+static int
+free_slot(const struct rte_hash *h, uint32_t slot_id)
 {
 	unsigned lcore_id, n_slots;
 	struct lcore_cache *cached_free_slots;
-
+	/* Return key indexes to free slot ring */
 	if (h->use_local_cache) {
 		lcore_id = rte_lcore_id();
 		cached_free_slots = &h->local_free_slots[lcore_id];
@@ -1411,18 +1466,122 @@  remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
 						cached_free_slots->objs,
 						sizeof(uint32_t),
 						LCORE_CACHE_SIZE, NULL);
-			ERR_IF_TRUE((n_slots == 0),
-				"%s: could not enqueue free slots in global ring\n",
-				__func__);
+			RETURN_IF_TRUE((n_slots == 0), -EFAULT);
 			cached_free_slots->len -= n_slots;
 		}
-		/* Put index of new free slot in cache. */
-		cached_free_slots->objs[cached_free_slots->len] =
-							bkt->key_idx[i];
-		cached_free_slots->len++;
+	}
+
+	enqueue_slot_back(h, cached_free_slots, slot_id);
+	return 0;
+}
+
+static void
+__hash_rcu_qsbr_free_resource(void *p, void *e, unsigned int n)
+{
+	void *key_data = NULL;
+	int ret;
+	struct rte_hash_key *keys, *k;
+	struct rte_hash *h = (struct rte_hash *)p;
+	struct __rte_hash_rcu_dq_entry rcu_dq_entry =
+			*((struct __rte_hash_rcu_dq_entry *)e);
+
+	RTE_SET_USED(n);
+	keys = h->key_store;
+
+	k = (struct rte_hash_key *) ((char *)keys +
+				rcu_dq_entry.key_idx * h->key_entry_size);
+	key_data = k->pdata;
+	if (h->hash_rcu_cfg->free_key_data_func)
+		h->hash_rcu_cfg->free_key_data_func(h->hash_rcu_cfg->key_data_ptr,
+						    key_data);
+
+	if (h->ext_table_support && rcu_dq_entry.ext_bkt_idx != EMPTY_SLOT)
+		/* Recycle empty ext bkt to free list. */
+		rte_ring_sp_enqueue_elem(h->free_ext_bkts,
+			&rcu_dq_entry.ext_bkt_idx, sizeof(uint32_t));
+
+	/* Return key indexes to free slot ring */
+	ret = free_slot(h, rcu_dq_entry.key_idx);
+	if (ret < 0) {
+		RTE_LOG(ERR, HASH,
+			"%s: could not enqueue free slots in global ring\n",
+				__func__);
+	}
+}
+
+int
+rte_hash_rcu_qsbr_add(struct rte_hash *h,
+				struct rte_hash_rcu_config *cfg)
+{
+	struct rte_rcu_qsbr_dq_parameters params = {0};
+	char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
+	struct rte_hash_rcu_config *hash_rcu_cfg = NULL;
+
+	const uint32_t total_entries = h->use_local_cache ?
+		h->entries + (RTE_MAX_LCORE - 1) * (LCORE_CACHE_SIZE - 1) + 1
+							: h->entries + 1;
+
+	if ((h == NULL) || cfg == NULL) {
+		rte_errno = EINVAL;
+		return 1;
+	}
+
+	hash_rcu_cfg = rte_zmalloc(NULL, sizeof(struct rte_hash_rcu_config), 0);
+	if (hash_rcu_cfg == NULL) {
+		RTE_LOG(ERR, HASH, "memory allocation failed\n");
+		return 1;
+	}
+
+	if (cfg->mode == RTE_HASH_QSBR_MODE_SYNC) {
+		/* No other things to do. */
+	} else if (cfg->mode == RTE_HASH_QSBR_MODE_DQ) {
+		/* Init QSBR defer queue. */
+		snprintf(rcu_dq_name, sizeof(rcu_dq_name),
+					"HASH_RCU_%s", h->name);
+		params.name = rcu_dq_name;
+		params.size = cfg->dq_size;
+		if (params.size == 0)
+			params.size = total_entries;
+		params.trigger_reclaim_limit = cfg->reclaim_thd;
+		if (params.max_reclaim_size == 0)
+			params.max_reclaim_size = RTE_HASH_RCU_DQ_RECLAIM_MAX;
+		params.esize = sizeof(struct __rte_hash_rcu_dq_entry);
+		params.free_fn = __hash_rcu_qsbr_free_resource;
+		params.p = h;
+		params.v = cfg->v;
+		h->dq = rte_rcu_qsbr_dq_create(&params);
+		if (h->dq == NULL) {
+			rte_free(hash_rcu_cfg);
+			RTE_LOG(ERR, HASH, "HASH defer queue creation failed\n");
+			return 1;
+		}
 	} else {
-		rte_ring_sp_enqueue_elem(h->free_slots,
-				&bkt->key_idx[i], sizeof(uint32_t));
+		rte_free(hash_rcu_cfg);
+		rte_errno = EINVAL;
+		return 1;
+	}
+
+	hash_rcu_cfg->v = cfg->v;
+	hash_rcu_cfg->mode = cfg->mode;
+	hash_rcu_cfg->dq_size = cfg->dq_size;
+	hash_rcu_cfg->reclaim_thd = cfg->reclaim_thd;
+	hash_rcu_cfg->reclaim_max = cfg->reclaim_max;
+	hash_rcu_cfg->free_key_data_func = cfg->free_key_data_func;
+	hash_rcu_cfg->key_data_ptr = cfg->key_data_ptr;
+
+	h->hash_rcu_cfg = hash_rcu_cfg;
+
+	return 0;
+}
+
+static inline void
+remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
+{
+	int ret = free_slot(h, bkt->key_idx[i]);
+	if (ret < 0) {
+		RTE_LOG(ERR, HASH,
+			"%s: could not enqueue free slots in global ring\n",
+				__func__);
 	}
 }
 
@@ -1521,6 +1680,8 @@  __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 	int pos;
 	int32_t ret, i;
 	uint16_t short_sig;
+	uint32_t index = EMPTY_SLOT;
+	struct __rte_hash_rcu_dq_entry rcu_dq_entry;
 
 	short_sig = get_short_sig(sig);
 	prim_bucket_idx = get_prim_bucket_index(h, sig);
@@ -1555,10 +1716,9 @@  __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 
 /* Search last bucket to see if empty to be recycled */
 return_bkt:
-	if (!last_bkt) {
-		__hash_rw_writer_unlock(h);
-		return ret;
-	}
+	if (!last_bkt)
+		goto return_key;
+
 	while (last_bkt->next) {
 		prev_bkt = last_bkt;
 		last_bkt = last_bkt->next;
@@ -1571,11 +1731,11 @@  __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 	/* found empty bucket and recycle */
 	if (i == RTE_HASH_BUCKET_ENTRIES) {
 		prev_bkt->next = NULL;
-		uint32_t index = last_bkt - h->buckets_ext + 1;
+		index = last_bkt - h->buckets_ext + 1;
 		/* Recycle the empty bkt if
 		 * no_free_on_del is disabled.
 		 */
-		if (h->no_free_on_del)
+		if (h->no_free_on_del) {
 			/* Store index of an empty ext bkt to be recycled
 			 * on calling rte_hash_del_xxx APIs.
 			 * When lock free read-write concurrency is enabled,
@@ -1583,12 +1743,32 @@  __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 			 * immediately (as readers might be using it still).
 			 * Hence freeing of the ext bkt is piggy-backed to
 			 * freeing of the key index.
+			 * If using external RCU, store this index in an array.
 			 */
-			h->ext_bkt_to_free[ret] = index;
-		else
+			if (h->hash_rcu_cfg->v == NULL)
+				h->ext_bkt_to_free[ret] = index;
+		} else
 			rte_ring_sp_enqueue_elem(h->free_ext_bkts, &index,
 							sizeof(uint32_t));
 	}
+
+return_key:
+	/* Using internal RCU QSBR */
+	if (h->hash_rcu_cfg->v) {
+		/* Key index where key is stored, adding the first dummy index */
+		rcu_dq_entry.key_idx = ret + 1;
+		rcu_dq_entry.ext_bkt_idx = index;
+		if (h->hash_rcu_cfg->mode == RTE_HASH_QSBR_MODE_SYNC) {
+			/* Wait for quiescent state change. */
+			rte_rcu_qsbr_synchronize(h->hash_rcu_cfg->v,
+						 RTE_QSBR_THRID_INVALID);
+			__hash_rcu_qsbr_free_resource((void *)((uintptr_t)h),
+						      &rcu_dq_entry, 1);
+		} else if (h->hash_rcu_cfg->mode == RTE_HASH_QSBR_MODE_DQ)
+			/* Push into QSBR FIFO. */
+			if (rte_rcu_qsbr_dq_enqueue(h->dq, &rcu_dq_entry) != 0)
+				RTE_LOG(ERR, HASH, "Failed to push QSBR FIFO\n");
+	}
 	__hash_rw_writer_unlock(h);
 	return ret;
 }
@@ -1637,8 +1817,6 @@  rte_hash_free_key_with_position(const struct rte_hash *h,
 
 	RETURN_IF_TRUE(((h == NULL) || (key_idx == EMPTY_SLOT)), -EINVAL);
 
-	unsigned int lcore_id, n_slots;
-	struct lcore_cache *cached_free_slots;
 	const uint32_t total_entries = h->use_local_cache ?
 		h->entries + (RTE_MAX_LCORE - 1) * (LCORE_CACHE_SIZE - 1) + 1
 							: h->entries + 1;
@@ -1656,28 +1834,9 @@  rte_hash_free_key_with_position(const struct rte_hash *h,
 		}
 	}
 
-	if (h->use_local_cache) {
-		lcore_id = rte_lcore_id();
-		cached_free_slots = &h->local_free_slots[lcore_id];
-		/* Cache full, need to free it. */
-		if (cached_free_slots->len == LCORE_CACHE_SIZE) {
-			/* Need to enqueue the free slots in global ring. */
-			n_slots = rte_ring_mp_enqueue_burst_elem(h->free_slots,
-						cached_free_slots->objs,
-						sizeof(uint32_t),
-						LCORE_CACHE_SIZE, NULL);
-			RETURN_IF_TRUE((n_slots == 0), -EFAULT);
-			cached_free_slots->len -= n_slots;
-		}
-		/* Put index of new free slot in cache. */
-		cached_free_slots->objs[cached_free_slots->len] = key_idx;
-		cached_free_slots->len++;
-	} else {
-		rte_ring_sp_enqueue_elem(h->free_slots, &key_idx,
-						sizeof(uint32_t));
-	}
+	/* Enqueue slot to cache/ring of free slots. */
+	return free_slot(h, key_idx);
 
-	return 0;
 }
 
 static inline void
diff --git a/lib/librte_hash/rte_cuckoo_hash.h b/lib/librte_hash/rte_cuckoo_hash.h
index 345de6bf9cfd..85be49d3bbe7 100644
--- a/lib/librte_hash/rte_cuckoo_hash.h
+++ b/lib/librte_hash/rte_cuckoo_hash.h
@@ -168,6 +168,11 @@  struct rte_hash {
 	struct lcore_cache *local_free_slots;
 	/**< Local cache per lcore, storing some indexes of the free slots */
 
+	/* RCU config */
+	struct rte_hash_rcu_config *hash_rcu_cfg;
+	/**< HASH RCU QSBR configuration structure */
+	struct rte_rcu_qsbr_dq *dq;	/**< RCU QSBR defer queue. */
+
 	/* Fields used in lookup */
 
 	uint32_t key_len __rte_cache_aligned;
@@ -230,4 +235,7 @@  struct queue_node {
 	int prev_slot;               /* Parent(slot) in search path */
 };
 
+/** @internal Default RCU defer queue entries to reclaim in one go. */
+#define RTE_HASH_RCU_DQ_RECLAIM_MAX	16
+
 #endif
diff --git a/lib/librte_hash/rte_hash.h b/lib/librte_hash/rte_hash.h
index bff40251bc98..5431bcf4aeb1 100644
--- a/lib/librte_hash/rte_hash.h
+++ b/lib/librte_hash/rte_hash.h
@@ -15,6 +15,7 @@ 
 #include <stddef.h>
 
 #include <rte_compat.h>
+#include <rte_rcu_qsbr.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -45,7 +46,8 @@  extern "C" {
 /** Flag to disable freeing of key index on hash delete.
  * Refer to rte_hash_del_xxx APIs for more details.
  * This is enabled by default when RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF
- * is enabled.
+ * is enabled. However, if internal RCU is enabled, freeing of internal
+ * memory/index is done on delete
  */
 #define RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL 0x10
 
@@ -67,6 +69,13 @@  typedef uint32_t (*rte_hash_function)(const void *key, uint32_t key_len,
 /** Type of function used to compare the hash key. */
 typedef int (*rte_hash_cmp_eq_t)(const void *key1, const void *key2, size_t key_len);
 
+/**
+ * Type of function used to free data stored in the key.
+ * Required when using internal RCU to allow application to free key-data once
+ * the key is returned to the the ring of free key-slots.
+ */
+typedef void (*rte_hash_free_key_data)(void *p, void *key_data);
+
 /**
  * Parameters used when creating the hash table.
  */
@@ -81,6 +90,39 @@  struct rte_hash_parameters {
 	uint8_t extra_flag;		/**< Indicate if additional parameters are present. */
 };
 
+/** RCU reclamation modes */
+enum rte_hash_qsbr_mode {
+	/** Create defer queue for reclaim. */
+	RTE_HASH_QSBR_MODE_DQ = 0,
+	/** Use blocking mode reclaim. No defer queue created. */
+	RTE_HASH_QSBR_MODE_SYNC
+};
+
+/** HASH RCU QSBR configuration structure. */
+struct rte_hash_rcu_config {
+	struct rte_rcu_qsbr *v;		/**< RCU QSBR variable. */
+	enum rte_hash_qsbr_mode mode;
+	/**< Mode of RCU QSBR. RTE_HASH_QSBR_MODE_xxx
+	 * '0' for default: create defer queue for reclaim.
+	 */
+	uint32_t dq_size;
+	/**< RCU defer queue size.
+	 * default: total hash table entries.
+	 */
+	uint32_t reclaim_thd;	/**< Threshold to trigger auto reclaim. */
+	uint32_t reclaim_max;
+	/**< Max entries to reclaim in one go.
+	 * default: RTE_HASH_RCU_DQ_RECLAIM_MAX.
+	 */
+	void *key_data_ptr;
+	/**< Pointer passed to the free function. Typically, this is the
+	 * pointer to the data structure to which the resource to free
+	 * (key-data) belongs. This can be NULL.
+	 */
+	rte_hash_free_key_data free_key_data_func;
+	/**< Function to call to free the resource (key-data). */
+};
+
 /** @internal A hash table structure. */
 struct rte_hash;
 
@@ -287,7 +329,8 @@  rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, hash_sig_t
  * Thread safety can be enabled by setting flag during
  * table creation.
  * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
- * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and
+ * internal RCU is NOT enabled,
  * the key index returned by rte_hash_add_key_xxx APIs will not be
  * freed by this API. rte_hash_free_key_with_position API must be called
  * additionally to free the index associated with the key.
@@ -316,7 +359,8 @@  rte_hash_del_key(const struct rte_hash *h, const void *key);
  * Thread safety can be enabled by setting flag during
  * table creation.
  * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
- * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and
+ * internal RCU is NOT enabled,
  * the key index returned by rte_hash_add_key_xxx APIs will not be
  * freed by this API. rte_hash_free_key_with_position API must be called
  * additionally to free the index associated with the key.
@@ -370,7 +414,8 @@  rte_hash_get_key_with_position(const struct rte_hash *h, const int32_t position,
  * only be called from one thread by default. Thread safety
  * can be enabled by setting flag during table creation.
  * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
- * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and
+ * internal RCU is NOT enabled,
  * the key index returned by rte_hash_del_key_xxx APIs must be freed
  * using this API. This API should be called after all the readers
  * have stopped referencing the entry corresponding to this key.
@@ -625,6 +670,28 @@  rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
  */
 int32_t
 rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32_t *next);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Associate RCU QSBR variable with an Hash object.
+ *
+ * @param h
+ *   the hash object to add RCU QSBR
+ * @param cfg
+ *   RCU QSBR configuration
+ * @return
+ *   On success - 0
+ *   On error - 1 with error code set in rte_errno.
+ *   Possible rte_errno codes are:
+ *   - EINVAL - invalid pointer
+ *   - EEXIST - already added QSBR
+ *   - ENOMEM - memory allocation failure
+ */
+__rte_experimental
+int rte_hash_rcu_qsbr_add(struct rte_hash *h,
+				struct rte_hash_rcu_config *cfg);
 #ifdef __cplusplus
 }
 #endif
diff --git a/lib/librte_hash/rte_hash_version.map b/lib/librte_hash/rte_hash_version.map
index c0db81014ff9..c6d73080f478 100644
--- a/lib/librte_hash/rte_hash_version.map
+++ b/lib/librte_hash/rte_hash_version.map
@@ -36,5 +36,5 @@  EXPERIMENTAL {
 	rte_hash_lookup_with_hash_bulk;
 	rte_hash_lookup_with_hash_bulk_data;
 	rte_hash_max_key_id;
-
+	rte_hash_rcu_qsbr_add;
 };