[v4,1/4] lib/rcu: add resource reclamation APIs

Message ID 20200403184142.7729-2-honnappa.nagarahalli@arm.com (mailing list archive)
State Superseded, archived
Delegated to: David Marchand
Headers
Series Add RCU reclamation APIs |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Performance-Testing fail build patch failure
ci/Intel-compilation fail Compilation issues

Commit Message

Honnappa Nagarahalli April 3, 2020, 6:41 p.m. UTC
  Add resource reclamation APIs to make it simple for applications
and libraries to integrate rte_rcu library.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Reviewed-by: Ola Liljedhal <ola.liljedhal@arm.com>
Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
---
 lib/librte_rcu/Makefile            |   2 +-
 lib/librte_rcu/meson.build         |   2 +
 lib/librte_rcu/rcu_qsbr_pvt.h      |  57 +++++++
 lib/librte_rcu/rte_rcu_qsbr.c      | 243 ++++++++++++++++++++++++++++-
 lib/librte_rcu/rte_rcu_qsbr.h      | 188 ++++++++++++++++++++++
 lib/librte_rcu/rte_rcu_version.map |   4 +
 lib/meson.build                    |   6 +-
 7 files changed, 498 insertions(+), 4 deletions(-)
 create mode 100644 lib/librte_rcu/rcu_qsbr_pvt.h
  

Comments

Ananyev, Konstantin April 7, 2020, 5:39 p.m. UTC | #1
> Add resource reclamation APIs to make it simple for applications
> and libraries to integrate rte_rcu library.

Few nits, thoughts, please see below.
Apart from that - LGTM.
Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>

> 
> Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> Reviewed-by: Ola Liljedhal <ola.liljedhal@arm.com>
> Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
> ---
>  lib/librte_rcu/Makefile            |   2 +-
>  lib/librte_rcu/meson.build         |   2 +
>  lib/librte_rcu/rcu_qsbr_pvt.h      |  57 +++++++
>  lib/librte_rcu/rte_rcu_qsbr.c      | 243 ++++++++++++++++++++++++++++-
>  lib/librte_rcu/rte_rcu_qsbr.h      | 188 ++++++++++++++++++++++
>  lib/librte_rcu/rte_rcu_version.map |   4 +
>  lib/meson.build                    |   6 +-
>  7 files changed, 498 insertions(+), 4 deletions(-)
>  create mode 100644 lib/librte_rcu/rcu_qsbr_pvt.h
> 
> diff --git a/lib/librte_rcu/Makefile b/lib/librte_rcu/Makefile
> index c4bb28d77..95f8a57e2 100644
> --- a/lib/librte_rcu/Makefile
> +++ b/lib/librte_rcu/Makefile
> @@ -8,7 +8,7 @@ LIB = librte_rcu.a
> 
>  CFLAGS += -DALLOW_EXPERIMENTAL_API
>  CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
> -LDLIBS += -lrte_eal
> +LDLIBS += -lrte_eal -lrte_ring
> 
>  EXPORT_MAP := rte_rcu_version.map
> 
> diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build
> index 62920ba02..e280b29c1 100644
> --- a/lib/librte_rcu/meson.build
> +++ b/lib/librte_rcu/meson.build
> @@ -10,3 +10,5 @@ headers = files('rte_rcu_qsbr.h')
>  if cc.get_id() == 'clang' and dpdk_conf.get('RTE_ARCH_64') == false
>  	ext_deps += cc.find_library('atomic')
>  endif
> +
> +deps += ['ring']
> diff --git a/lib/librte_rcu/rcu_qsbr_pvt.h b/lib/librte_rcu/rcu_qsbr_pvt.h
> new file mode 100644
> index 000000000..413f28587
> --- /dev/null
> +++ b/lib/librte_rcu/rcu_qsbr_pvt.h
> @@ -0,0 +1,57 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright (c) 2019 Arm Limited
> + */
> +
> +#ifndef _RTE_RCU_QSBR_PVT_H_
> +#define _RTE_RCU_QSBR_PVT_H_
> +
> +/**
> + * This file is private to the RCU library. It should not be included
> + * by the user of this library.
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <rte_ring.h>
> +#include <rte_ring_elem.h>
> +
> +#include "rte_rcu_qsbr.h"
> +
> +/* RTE defer queue structure.
> + * This structure holds the defer queue. The defer queue is used to
> + * hold the deleted entries from the data structure that are not
> + * yet freed.
> + */
> +struct rte_rcu_qsbr_dq {
> +	struct rte_rcu_qsbr *v; /**< RCU QSBR variable used by this queue.*/
> +	struct rte_ring *r;     /**< RCU QSBR defer queue. */
> +	uint32_t size;
> +	/**< Number of elements in the defer queue */
> +	uint32_t esize;
> +	/**< Size (in bytes) of data, including the token, stored on the
> +	 *   defer queue.
> +	 */
> +	uint32_t trigger_reclaim_limit;
> +	/**< Trigger automatic reclamation after the defer queue
> +	 *   has atleast these many resources waiting.
> +	 */
> +	uint32_t max_reclaim_size;
> +	/**< Reclaim at the max these many resources during auto
> +	 *   reclamation.
> +	 */
> +	rte_rcu_qsbr_free_resource_t free_fn;
> +	/**< Function to call to free the resource. */
> +	void *p;
> +	/**< Pointer passed to the free function. Typically, this is the
> +	 *   pointer to the data structure to which the resource to free
> +	 *   belongs.
> +	 */
> +};
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _RTE_RCU_QSBR_PVT_H_ */
> diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c
> index 2f3fad776..e8c1e386f 100644
> --- a/lib/librte_rcu/rte_rcu_qsbr.c
> +++ b/lib/librte_rcu/rte_rcu_qsbr.c
> @@ -1,6 +1,6 @@
>  /* SPDX-License-Identifier: BSD-3-Clause
>   *
> - * Copyright (c) 2018 Arm Limited
> + * Copyright (c) 2018-2019 Arm Limited
>   */
> 
>  #include <stdio.h>
> @@ -18,8 +18,10 @@
>  #include <rte_per_lcore.h>
>  #include <rte_lcore.h>
>  #include <rte_errno.h>
> +#include <rte_ring_elem.h>
> 
>  #include "rte_rcu_qsbr.h"
> +#include "rcu_qsbr_pvt.h"
> 
>  /* Get the memory size of QSBR variable */
>  size_t
> @@ -270,6 +272,245 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
>  	return 0;
>  }
> 
> +/* Create a queue used to store the data structure elements that can
> + * be freed later. This queue is referred to as 'defer queue'.
> + */
> +struct rte_rcu_qsbr_dq *
> +rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
> +{
> +	struct rte_rcu_qsbr_dq *dq;
> +	uint32_t qs_fifo_size;
> +	unsigned int flags;
> +
> +	if (params == NULL || params->free_fn == NULL ||
> +		params->v == NULL || params->name == NULL ||
> +		params->size == 0 || params->esize == 0 ||
> +		(params->esize % 4 != 0)) {
> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> +			"%s(): Invalid input parameter\n", __func__);
> +		rte_errno = EINVAL;
> +
> +		return NULL;
> +	}
> +	/* If auto reclamation is configured, reclaim limit
> +	 * should be a valid value.
> +	 */
> +	if ((params->trigger_reclaim_limit <= params->size) &&
> +	    (params->max_reclaim_size == 0)) {
> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> +			"%s(): Invalid input parameter, size = %u, trigger_reclaim_limit = %u, max_reclaim_size = %u\n",
> +			__func__, params->size, params->trigger_reclaim_limit,
> +			params->max_reclaim_size);
> +		rte_errno = EINVAL;
> +
> +		return NULL;
> +	}
> +
> +	dq = rte_zmalloc(NULL, sizeof(struct rte_rcu_qsbr_dq),
> +			 RTE_CACHE_LINE_SIZE);
> +	if (dq == NULL) {
> +		rte_errno = ENOMEM;
> +
> +		return NULL;
> +	}
> +
> +	/* Decide the flags for the ring.
> +	 * If MT safety is requested, use RTS for ring enqueue as most
> +	 * use cases involve dq-enqueue happening on the control plane.
> +	 * Ring dequeue is always HTS due to the possibility of revert.
> +	 */
> +	flags = RING_F_MP_RTS_ENQ;
> +	if (params->flags & RTE_RCU_QSBR_DQ_MT_UNSAFE)
> +		flags = RING_F_SP_ENQ;
> +	flags |= RING_F_MC_HTS_DEQ;
> +	/* round up qs_fifo_size to next power of two that is not less than
> +	 * max_size.
> +	 */
> +	qs_fifo_size = rte_align32pow2(params->size + 1);
> +	/* Add token size to ring element size */
> +	dq->r = rte_ring_create_elem(params->name,
> +			__RTE_QSBR_TOKEN_SIZE + params->esize,
> +			qs_fifo_size, SOCKET_ID_ANY, flags);
> +	if (dq->r == NULL) {
> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> +			"%s(): defer queue create failed\n", __func__);
> +		rte_free(dq);
> +		return NULL;
> +	}
> +
> +	dq->v = params->v;
> +	dq->size = params->size;
> +	dq->esize = __RTE_QSBR_TOKEN_SIZE + params->esize;
> +	dq->trigger_reclaim_limit = params->trigger_reclaim_limit;
> +	dq->max_reclaim_size = params->max_reclaim_size;
> +	dq->free_fn = params->free_fn;
> +	dq->p = params->p;
> +
> +	return dq;
> +}
> +
> +/* Enqueue one resource to the defer queue to free after the grace
> + * period is over.
> + */
> +int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
> +{
> +	uint64_t token;
> +	uint32_t cur_size, free_size;
> +
> +	if (dq == NULL || e == NULL) {
> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> +			"%s(): Invalid input parameter\n", __func__);
> +		rte_errno = EINVAL;
> +
> +		return 1;
> +	}
> +
> +	/* Start the grace period */
> +	token = rte_rcu_qsbr_start(dq->v);
> +
> +	/* Reclaim resources if the queue is 1/8th full. This helps

Comment about 1/8 is probably left from older version?
As I understand now it is configurable parameter.

> +	 * the queue from growing too large and allows time for reader
> +	 * threads to report their quiescent state.
> +	 */
> +	cur_size = rte_ring_count(dq->r);
> +	if (cur_size > dq->trigger_reclaim_limit) {
> +		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> +			"%s(): Triggering reclamation\n", __func__);
> +		rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size, NULL, NULL);
> +	}
> +
> +	/* Check if there is space for atleast 1 resource */
> +	free_size = rte_ring_free_count(dq->r);
> +	if (!free_size) {

Is there any point to do this check at all?
You are doing enqueue below and handle situation with
not enough space in the ring anyway.

> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> +			"%s(): Defer queue is full\n", __func__);
> +		/* Note that the token generated above is not used.
> +		 * Other than wasting tokens, it should not cause any
> +		 * other issues.
> +		 */
> +		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> +			"%s(): Skipped enqueuing token = %"PRIu64"\n",
> +			__func__, token);
> +
> +		rte_errno = ENOSPC;
> +		return 1;
> +	}
> +
> +	/* Enqueue the token and resource. Generating the token
> +	 * and enqueuing (token + resource) on the queue is not an
> +	 * atomic operation. This might result in tokens enqueued
> +	 * out of order on the queue. So, some tokens might wait
> +	 * longer than they are required to be reclaimed.
> +	 */
> +	char data[dq->esize];
> +	memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);
> +	memcpy(data + __RTE_QSBR_TOKEN_SIZE, e,
> +		dq->esize - __RTE_QSBR_TOKEN_SIZE);
> +	/* Check the status as enqueue might fail since the other thread
> +	 * might have used up the freed space.
> +	 * Enqueue uses the configured flags when the DQ was created.
> +	 */
> +	if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {
> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> +			"%s(): Enqueue failed\n", __func__);
> +		/* Note that the token generated above is not used.
> +		 * Other than wasting tokens, it should not cause any
> +		 * other issues.
> +		 */
> +		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> +			"%s(): Skipped enqueuing token = %"PRIu64"\n",
> +			__func__, token);
> +
> +		rte_errno = ENOSPC;
> +		return 1;
> +	}


Just as a thought: in theory if we'll use MP_HTS(/SP) ring we can avoid
wasting RCU tokens:

if (rte_ring_enqueue_elem_bulk_start(dq->r, 1, NULL) != 0) {
	token = rte_rcu_qsbr_start(dq->v);
	memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);
	rte_ring_enqueue_elem_finish(dq->r, data, dq->esize, 1);
}

Though it might slowdown things if we'll have a lot of
parallel dq_enqueue. 
So not sure is it worth it or not.

> +
> +	rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> +		"%s(): Enqueued token = %"PRIu64"\n", __func__, token);
> +
> +	return 0;
> +}
> +
> +/* Reclaim resources from the defer queue. */
> +int
> +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
> +				unsigned int *freed, unsigned int *pending)
> +{
> +	uint32_t cnt;
> +	uint64_t token;
> +
> +	if (dq == NULL || n == 0) {
> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> +			"%s(): Invalid input parameter\n", __func__);
> +		rte_errno = EINVAL;
> +
> +		return 1;
> +	}
> +
> +	cnt = 0;
> +
> +	char e[dq->esize];
> +	/* Check reader threads quiescent state and reclaim resources */
> +	while ((cnt < n) &&
> +		(rte_ring_dequeue_bulk_elem_start(dq->r, e,
> +					dq->esize, 1, NULL) != 0)) {

Another thought - any point to use burst_elem_start() here to retrieve more
then 1 elem in one go? Something like:
char e[32][dq->size]; 
while ((cnt < n) {
	k = RTE_MAX(32, cnt - n);
	k = rte_ring_dequeue_burst_elem_start(dq->r, e, dq->esize, k, NULL);
	if (k = 0)
		break;
	for (i = 0; i != k; i++) {
		memcpy(&token, e[i], sizeof(uint64_t));
		if (rte_rcu_qsbr_check(dq->v, token, false) != 1)
			break;
	}
	k = i;
	rte_ring_dequeue_elem_finish(dq->r, k);
	for (i = 0; i != k; i++)
		dq->free_fn(dq->p, e[i] + __RTE_QSBR_TOKEN_SIZE);
	n += k;
	if (k == 0)
		break;

?
Also if at enqueue we guarantee strict ordrer (via enqueue_start/enqueue_finish),
then here we probably can do _check_ from the last retrieved token here?
In theory that might help to minimize number of checks.
I.E. do:
for (i = k; i-- !=0; )  {
	memcpy(&token, e[i], sizeof(uint64_t));
	if (rte_rcu_qsbr_check(dq->v, token, false) != 1)   
		break;
}
k = i + 1;
...

> +		memcpy(&token, e, sizeof(uint64_t));
> +
> +		/* Reclaim the resource */
> +		if (rte_rcu_qsbr_check(dq->v, token, false) != 1) {
> +			rte_ring_dequeue_finish(dq->r, 0);
> +			break;
> +		}
> +		rte_ring_dequeue_finish(dq->r, 1);
> +
> +		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> +			"%s(): Reclaimed token = %"PRIu64"\n",
> +			__func__, *(uint64_t *)e);
> +
> +		dq->free_fn(dq->p, e + __RTE_QSBR_TOKEN_SIZE);
> +
> +		cnt++;
> +	}
> +
> +	rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> +		"%s(): Reclaimed %u resources\n", __func__, cnt);
> +
> +	if (freed != NULL)
> +		*freed = cnt;
> +	if (pending != NULL)
> +		*pending = rte_ring_count(dq->r);
> +
> +	return 0;
> +}
> +
> +/* Delete a defer queue. */
> +int
> +rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
> +{
> +	unsigned int pending;
> +
> +	if (dq == NULL) {
> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> +			"%s(): Invalid input parameter\n", __func__);
> +		rte_errno = EINVAL;
> +
> +		return 1;
> +	}
> +
> +	/* Reclaim all the resources */
> +	rte_rcu_qsbr_dq_reclaim(dq, ~0, NULL, &pending);
> +	if (pending != 0) {
> +		rte_errno = EAGAIN;
> +
> +		return 1;
> +	}
> +
> +	rte_ring_free(dq->r);
> +	rte_free(dq);
> +
> +	return 0;
> +}
> +
>  int rte_rcu_log_type;
> 
>  RTE_INIT(rte_rcu_register)
> diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h
> index 0b5585925..213f9b029 100644
> --- a/lib/librte_rcu/rte_rcu_qsbr.h
> +++ b/lib/librte_rcu/rte_rcu_qsbr.h
> @@ -34,6 +34,7 @@ extern "C" {
>  #include <rte_lcore.h>
>  #include <rte_debug.h>
>  #include <rte_atomic.h>
> +#include <rte_ring.h>
> 
>  extern int rte_rcu_log_type;
> 
> @@ -84,6 +85,7 @@ struct rte_rcu_qsbr_cnt {
>  #define __RTE_QSBR_CNT_THR_OFFLINE 0
>  #define __RTE_QSBR_CNT_INIT 1
>  #define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
> +#define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
> 
>  /* RTE Quiescent State variable structure.
>   * This structure has two elements that vary in size based on the
> @@ -114,6 +116,84 @@ struct rte_rcu_qsbr {
>  	 */
>  } __rte_cache_aligned;
> 
> +/**
> + * Call back function called to free the resources.
> + *
> + * @param p
> + *   Pointer provided while creating the defer queue
> + * @param e
> + *   Pointer to the resource data stored on the defer queue
> + *
> + * @return
> + *   None
> + */
> +typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e);
> +
> +#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
> +
> +/**
> + * Various flags supported.
> + */
> +/**< Enqueue and reclaim operations are multi-thread safe by default.
> + *   The call back functions registered to free the resources are
> + *   assumed to be multi-thread safe.
> + *   Set this flag is multi-thread safety is not required.
> + */
> +#define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
> +
> +/**
> + * Parameters used when creating the defer queue.
> + */
> +struct rte_rcu_qsbr_dq_parameters {
> +	const char *name;
> +	/**< Name of the queue. */
> +	uint32_t flags;
> +	/**< Flags to control API behaviors */
> +	uint32_t size;
> +	/**< Number of entries in queue. Typically, this will be
> +	 *   the same as the maximum number of entries supported in the
> +	 *   lock free data structure.
> +	 *   Data structures with unbounded number of entries is not
> +	 *   supported currently.
> +	 */
> +	uint32_t esize;
> +	/**< Size (in bytes) of each element in the defer queue.
> +	 *   This has to be multiple of 4B.
> +	 */
> +	uint32_t trigger_reclaim_limit;
> +	/**< Trigger automatic reclamation after the defer queue
> +	 *   has atleast these many resources waiting. This auto
> +	 *   reclamation is triggered in rte_rcu_qsbr_dq_enqueue API
> +	 *   call.
> +	 *   If this is greater than 'size', auto reclamation is
> +	 *   not triggered.
> +	 *   If this is set to 0, auto reclamation is triggered
> +	 *   in every call to rte_rcu_qsbr_dq_enqueue API.
> +	 */
> +	uint32_t max_reclaim_size;
> +	/**< When automatic reclamation is enabled, reclaim at the max
> +	 *   these many resources. This should contain a valid value, if
> +	 *   auto reclamation is on. Setting this to 'size' or greater will
> +	 *   reclaim all possible resources currently on the defer queue.
> +	 */
> +	rte_rcu_qsbr_free_resource_t free_fn;
> +	/**< Function to call to free the resource. */
> +	void *p;
> +	/**< Pointer passed to the free function. Typically, this is the
> +	 *   pointer to the data structure to which the resource to free
> +	 *   belongs. This can be NULL.
> +	 */
> +	struct rte_rcu_qsbr *v;
> +	/**< RCU QSBR variable to use for this defer queue */
> +};
> +
> +/* RTE defer queue structure.
> + * This structure holds the defer queue. The defer queue is used to
> + * hold the deleted entries from the data structure that are not
> + * yet freed.
> + */
> +struct rte_rcu_qsbr_dq;
> +
>  /**
>   * @warning
>   * @b EXPERIMENTAL: this API may change without prior notice
> @@ -692,6 +772,114 @@ __rte_experimental
>  int
>  rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
> 
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Create a queue used to store the data structure elements that can
> + * be freed later. This queue is referred to as 'defer queue'.
> + *
> + * @param params
> + *   Parameters to create a defer queue.
> + * @return
> + *   On success - Valid pointer to defer queue
> + *   On error - NULL
> + *   Possible rte_errno codes are:
> + *   - EINVAL - NULL parameters are passed
> + *   - ENOMEM - Not enough memory
> + */
> +__rte_experimental
> +struct rte_rcu_qsbr_dq *
> +rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Enqueue one resource to the defer queue and start the grace period.
> + * The resource will be freed later after at least one grace period
> + * is over.
> + *
> + * If the defer queue is full, it will attempt to reclaim resources.
> + * It will also reclaim resources at regular intervals to avoid
> + * the defer queue from growing too big.
> + *
> + * Multi-thread safety is provided as the defer queue configuration.
> + * When multi-thread safety is requested, it is possible that the
> + * resources are not stored in their order of deletion. This results
> + * in resources being held in the defer queue longer than they should.
> + *
> + * @param dq
> + *   Defer queue to allocate an entry from.
> + * @param e
> + *   Pointer to resource data to copy to the defer queue. The size of
> + *   the data to copy is equal to the element size provided when the
> + *   defer queue was created.
> + * @return
> + *   On success - 0
> + *   On error - 1 with rte_errno set to
> + *   - EINVAL - NULL parameters are passed
> + *   - ENOSPC - Defer queue is full. This condition can not happen
> + *		if the defer queue size is equal (or larger) than the
> + *		number of elements in the data structure.
> + */
> +__rte_experimental
> +int
> +rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Free quesed resources from the defer queue.
> + *
> + * This API is multi-thread safe.
> + *
> + * @param dq
> + *   Defer queue to free an entry from.
> + * @param n
> + *   Maximum number of resources to free.
> + * @param freed
> + *   Number of resources that were freed.
> + * @param pending
> + *   Number of resources pending on the defer queue. This number might not
> + *   be acurate if multi-thread safety is configured.
> + * @return
> + *   On successful reclamation of at least 1 resource - 0
> + *   On error - 1 with rte_errno set to
> + *   - EINVAL - NULL parameters are passed
> + */
> +__rte_experimental
> +int
> +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
> +				unsigned int *freed, unsigned int *pending);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Delete a defer queue.
> + *
> + * It tries to reclaim all the resources on the defer queue.
> + * If any of the resources have not completed the grace period
> + * the reclamation stops and returns immediately. The rest of
> + * the resources are not reclaimed and the defer queue is not
> + * freed.
> + *
> + * @param dq
> + *   Defer queue to delete.
> + * @return
> + *   On success - 0
> + *   On error - 1
> + *   Possible rte_errno codes are:
> + *   - EINVAL - NULL parameters are passed
> + *   - EAGAIN - Some of the resources have not completed at least 1 grace
> + *		period, try again.
> + */
> +__rte_experimental
> +int
> +rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
> +
>  #ifdef __cplusplus
>  }
>  #endif
> diff --git a/lib/librte_rcu/rte_rcu_version.map b/lib/librte_rcu/rte_rcu_version.map
> index f8b9ef2ab..dfac88a37 100644
> --- a/lib/librte_rcu/rte_rcu_version.map
> +++ b/lib/librte_rcu/rte_rcu_version.map
> @@ -8,6 +8,10 @@ EXPERIMENTAL {
>  	rte_rcu_qsbr_synchronize;
>  	rte_rcu_qsbr_thread_register;
>  	rte_rcu_qsbr_thread_unregister;
> +	rte_rcu_qsbr_dq_create;
> +	rte_rcu_qsbr_dq_enqueue;
> +	rte_rcu_qsbr_dq_reclaim;
> +	rte_rcu_qsbr_dq_delete;
> 
>  	local: *;
>  };
> diff --git a/lib/meson.build b/lib/meson.build
> index 9c3cc55d5..15e91a303 100644
> --- a/lib/meson.build
> +++ b/lib/meson.build
> @@ -11,7 +11,9 @@
>  libraries = [
>  	'kvargs', # eal depends on kvargs
>  	'eal', # everything depends on eal
> -	'ring', 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
> +	'ring',
> +	'rcu', # rcu depends on ring
> +	'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
>  	'cmdline',
>  	'metrics', # bitrate/latency stats depends on this
>  	'hash',    # efd depends on this
> @@ -22,7 +24,7 @@ libraries = [
>  	'gro', 'gso', 'ip_frag', 'jobstats',
>  	'kni', 'latencystats', 'lpm', 'member',
>  	'power', 'pdump', 'rawdev',
> -	'rcu', 'rib', 'reorder', 'sched', 'security', 'stack', 'vhost',
> +	'rib', 'reorder', 'sched', 'security', 'stack', 'vhost',
>  	# ipsec lib depends on net, crypto and security
>  	'ipsec',
>  	#fib lib depends on rib
> --
> 2.17.1
  
Honnappa Nagarahalli April 19, 2020, 11:22 p.m. UTC | #2
<snip>

> 
> > Add resource reclamation APIs to make it simple for applications and
> > libraries to integrate rte_rcu library.
> 
> Few nits, thoughts, please see below.
> Apart from that - LGTM.
> Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> 
> >
> > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> > Reviewed-by: Ola Liljedhal <ola.liljedhal@arm.com>
> > Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
> > ---
> >  lib/librte_rcu/Makefile            |   2 +-
> >  lib/librte_rcu/meson.build         |   2 +
> >  lib/librte_rcu/rcu_qsbr_pvt.h      |  57 +++++++
> >  lib/librte_rcu/rte_rcu_qsbr.c      | 243 ++++++++++++++++++++++++++++-
> >  lib/librte_rcu/rte_rcu_qsbr.h      | 188 ++++++++++++++++++++++
> >  lib/librte_rcu/rte_rcu_version.map |   4 +
> >  lib/meson.build                    |   6 +-
> >  7 files changed, 498 insertions(+), 4 deletions(-)  create mode
> > 100644 lib/librte_rcu/rcu_qsbr_pvt.h
> >
> > diff --git a/lib/librte_rcu/Makefile b/lib/librte_rcu/Makefile index
> > c4bb28d77..95f8a57e2 100644
> > --- a/lib/librte_rcu/Makefile
> > +++ b/lib/librte_rcu/Makefile
> > @@ -8,7 +8,7 @@ LIB = librte_rcu.a
> >
> >  CFLAGS += -DALLOW_EXPERIMENTAL_API
> >  CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 -LDLIBS += -lrte_eal
> > +LDLIBS += -lrte_eal -lrte_ring
> >
> >  EXPORT_MAP := rte_rcu_version.map
> >
> > diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build
> > index 62920ba02..e280b29c1 100644
> > --- a/lib/librte_rcu/meson.build
> > +++ b/lib/librte_rcu/meson.build
> > @@ -10,3 +10,5 @@ headers = files('rte_rcu_qsbr.h')  if cc.get_id() ==
> > 'clang' and dpdk_conf.get('RTE_ARCH_64') == false
> >  	ext_deps += cc.find_library('atomic')  endif
> > +
> > +deps += ['ring']
> > diff --git a/lib/librte_rcu/rcu_qsbr_pvt.h
> > b/lib/librte_rcu/rcu_qsbr_pvt.h new file mode 100644 index
> > 000000000..413f28587
> > --- /dev/null
> > +++ b/lib/librte_rcu/rcu_qsbr_pvt.h
> > @@ -0,0 +1,57 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + * Copyright (c) 2019 Arm Limited
> > + */
> > +
> > +#ifndef _RTE_RCU_QSBR_PVT_H_
> > +#define _RTE_RCU_QSBR_PVT_H_
> > +
> > +/**
> > + * This file is private to the RCU library. It should not be included
> > + * by the user of this library.
> > + */
> > +
> > +#ifdef __cplusplus
> > +extern "C" {
> > +#endif
> > +
> > +#include <rte_ring.h>
> > +#include <rte_ring_elem.h>
> > +
> > +#include "rte_rcu_qsbr.h"
> > +
> > +/* RTE defer queue structure.
> > + * This structure holds the defer queue. The defer queue is used to
> > + * hold the deleted entries from the data structure that are not
> > + * yet freed.
> > + */
> > +struct rte_rcu_qsbr_dq {
> > +	struct rte_rcu_qsbr *v; /**< RCU QSBR variable used by this queue.*/
> > +	struct rte_ring *r;     /**< RCU QSBR defer queue. */
> > +	uint32_t size;
> > +	/**< Number of elements in the defer queue */
> > +	uint32_t esize;
> > +	/**< Size (in bytes) of data, including the token, stored on the
> > +	 *   defer queue.
> > +	 */
> > +	uint32_t trigger_reclaim_limit;
> > +	/**< Trigger automatic reclamation after the defer queue
> > +	 *   has atleast these many resources waiting.
> > +	 */
> > +	uint32_t max_reclaim_size;
> > +	/**< Reclaim at the max these many resources during auto
> > +	 *   reclamation.
> > +	 */
> > +	rte_rcu_qsbr_free_resource_t free_fn;
> > +	/**< Function to call to free the resource. */
> > +	void *p;
> > +	/**< Pointer passed to the free function. Typically, this is the
> > +	 *   pointer to the data structure to which the resource to free
> > +	 *   belongs.
> > +	 */
> > +};
> > +
> > +#ifdef __cplusplus
> > +}
> > +#endif
> > +
> > +#endif /* _RTE_RCU_QSBR_PVT_H_ */
> > diff --git a/lib/librte_rcu/rte_rcu_qsbr.c
> > b/lib/librte_rcu/rte_rcu_qsbr.c index 2f3fad776..e8c1e386f 100644
> > --- a/lib/librte_rcu/rte_rcu_qsbr.c
> > +++ b/lib/librte_rcu/rte_rcu_qsbr.c
> > @@ -1,6 +1,6 @@
> >  /* SPDX-License-Identifier: BSD-3-Clause
> >   *
> > - * Copyright (c) 2018 Arm Limited
> > + * Copyright (c) 2018-2019 Arm Limited
> >   */
> >
> >  #include <stdio.h>
> > @@ -18,8 +18,10 @@
> >  #include <rte_per_lcore.h>
> >  #include <rte_lcore.h>
> >  #include <rte_errno.h>
> > +#include <rte_ring_elem.h>
> >
> >  #include "rte_rcu_qsbr.h"
> > +#include "rcu_qsbr_pvt.h"
> >
> >  /* Get the memory size of QSBR variable */  size_t @@ -270,6 +272,245
> > @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
> >  	return 0;
> >  }
> >
> > +/* Create a queue used to store the data structure elements that can
> > + * be freed later. This queue is referred to as 'defer queue'.
> > + */
> > +struct rte_rcu_qsbr_dq *
> > +rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters
> > +*params) {
> > +	struct rte_rcu_qsbr_dq *dq;
> > +	uint32_t qs_fifo_size;
> > +	unsigned int flags;
> > +
> > +	if (params == NULL || params->free_fn == NULL ||
> > +		params->v == NULL || params->name == NULL ||
> > +		params->size == 0 || params->esize == 0 ||
> > +		(params->esize % 4 != 0)) {
> > +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > +			"%s(): Invalid input parameter\n", __func__);
> > +		rte_errno = EINVAL;
> > +
> > +		return NULL;
> > +	}
> > +	/* If auto reclamation is configured, reclaim limit
> > +	 * should be a valid value.
> > +	 */
> > +	if ((params->trigger_reclaim_limit <= params->size) &&
> > +	    (params->max_reclaim_size == 0)) {
> > +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > +			"%s(): Invalid input parameter, size = %u,
> trigger_reclaim_limit = %u, max_reclaim_size = %u\n",
> > +			__func__, params->size, params-
> >trigger_reclaim_limit,
> > +			params->max_reclaim_size);
> > +		rte_errno = EINVAL;
> > +
> > +		return NULL;
> > +	}
> > +
> > +	dq = rte_zmalloc(NULL, sizeof(struct rte_rcu_qsbr_dq),
> > +			 RTE_CACHE_LINE_SIZE);
> > +	if (dq == NULL) {
> > +		rte_errno = ENOMEM;
> > +
> > +		return NULL;
> > +	}
> > +
> > +	/* Decide the flags for the ring.
> > +	 * If MT safety is requested, use RTS for ring enqueue as most
> > +	 * use cases involve dq-enqueue happening on the control plane.
> > +	 * Ring dequeue is always HTS due to the possibility of revert.
> > +	 */
> > +	flags = RING_F_MP_RTS_ENQ;
> > +	if (params->flags & RTE_RCU_QSBR_DQ_MT_UNSAFE)
> > +		flags = RING_F_SP_ENQ;
> > +	flags |= RING_F_MC_HTS_DEQ;
> > +	/* round up qs_fifo_size to next power of two that is not less than
> > +	 * max_size.
> > +	 */
> > +	qs_fifo_size = rte_align32pow2(params->size + 1);
> > +	/* Add token size to ring element size */
> > +	dq->r = rte_ring_create_elem(params->name,
> > +			__RTE_QSBR_TOKEN_SIZE + params->esize,
> > +			qs_fifo_size, SOCKET_ID_ANY, flags);
> > +	if (dq->r == NULL) {
> > +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > +			"%s(): defer queue create failed\n", __func__);
> > +		rte_free(dq);
> > +		return NULL;
> > +	}
> > +
> > +	dq->v = params->v;
> > +	dq->size = params->size;
> > +	dq->esize = __RTE_QSBR_TOKEN_SIZE + params->esize;
> > +	dq->trigger_reclaim_limit = params->trigger_reclaim_limit;
> > +	dq->max_reclaim_size = params->max_reclaim_size;
> > +	dq->free_fn = params->free_fn;
> > +	dq->p = params->p;
> > +
> > +	return dq;
> > +}
> > +
> > +/* Enqueue one resource to the defer queue to free after the grace
> > + * period is over.
> > + */
> > +int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e) {
> > +	uint64_t token;
> > +	uint32_t cur_size, free_size;
> > +
> > +	if (dq == NULL || e == NULL) {
> > +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > +			"%s(): Invalid input parameter\n", __func__);
> > +		rte_errno = EINVAL;
> > +
> > +		return 1;
> > +	}
> > +
> > +	/* Start the grace period */
> > +	token = rte_rcu_qsbr_start(dq->v);
> > +
> > +	/* Reclaim resources if the queue is 1/8th full. This helps
> 
> Comment about 1/8 is probably left from older version?
> As I understand now it is configurable parameter.
Ack, will correct this.

> 
> > +	 * the queue from growing too large and allows time for reader
> > +	 * threads to report their quiescent state.
> > +	 */
> > +	cur_size = rte_ring_count(dq->r);
> > +	if (cur_size > dq->trigger_reclaim_limit) {
> > +		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> > +			"%s(): Triggering reclamation\n", __func__);
> > +		rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size, NULL,
> NULL);
> > +	}
> > +
> > +	/* Check if there is space for atleast 1 resource */
> > +	free_size = rte_ring_free_count(dq->r);
> > +	if (!free_size) {
> 
> Is there any point to do this check at all?
> You are doing enqueue below and handle situation with not enough space in
> the ring anyway.
Ack

> 
> > +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > +			"%s(): Defer queue is full\n", __func__);
> > +		/* Note that the token generated above is not used.
> > +		 * Other than wasting tokens, it should not cause any
> > +		 * other issues.
> > +		 */
> > +		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> > +			"%s(): Skipped enqueuing token = %"PRIu64"\n",
> > +			__func__, token);
> > +
> > +		rte_errno = ENOSPC;
> > +		return 1;
> > +	}
> > +
> > +	/* Enqueue the token and resource. Generating the token
> > +	 * and enqueuing (token + resource) on the queue is not an
> > +	 * atomic operation. This might result in tokens enqueued
> > +	 * out of order on the queue. So, some tokens might wait
> > +	 * longer than they are required to be reclaimed.
> > +	 */
> > +	char data[dq->esize];
> > +	memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);
> > +	memcpy(data + __RTE_QSBR_TOKEN_SIZE, e,
> > +		dq->esize - __RTE_QSBR_TOKEN_SIZE);
> > +	/* Check the status as enqueue might fail since the other thread
> > +	 * might have used up the freed space.
> > +	 * Enqueue uses the configured flags when the DQ was created.
> > +	 */
> > +	if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {
> > +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > +			"%s(): Enqueue failed\n", __func__);
> > +		/* Note that the token generated above is not used.
> > +		 * Other than wasting tokens, it should not cause any
> > +		 * other issues.
> > +		 */
> > +		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> > +			"%s(): Skipped enqueuing token = %"PRIu64"\n",
> > +			__func__, token);
> > +
> > +		rte_errno = ENOSPC;
> > +		return 1;
> > +	}
> 
> 
> Just as a thought: in theory if we'll use MP_HTS(/SP) ring we can avoid
> wasting RCU tokens:
> 
> if (rte_ring_enqueue_elem_bulk_start(dq->r, 1, NULL) != 0) {
> 	token = rte_rcu_qsbr_start(dq->v);
> 	memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);
> 	rte_ring_enqueue_elem_finish(dq->r, data, dq->esize, 1); }
> 
> Though it might slowdown things if we'll have a lot of parallel dq_enqueue.
> So not sure is it worth it or not.
Adding peek APIs for RTS would be better. That should take care of the parallel dw_enqueue. Not sure if I gave you the comment. My ring patch supported these APIs.

> 
> > +
> > +	rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> > +		"%s(): Enqueued token = %"PRIu64"\n", __func__, token);
> > +
> > +	return 0;
> > +}
> > +
> > +/* Reclaim resources from the defer queue. */ int
> > +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
> > +				unsigned int *freed, unsigned int *pending) {
> > +	uint32_t cnt;
> > +	uint64_t token;
> > +
> > +	if (dq == NULL || n == 0) {
> > +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > +			"%s(): Invalid input parameter\n", __func__);
> > +		rte_errno = EINVAL;
> > +
> > +		return 1;
> > +	}
> > +
> > +	cnt = 0;
> > +
> > +	char e[dq->esize];
> > +	/* Check reader threads quiescent state and reclaim resources */
> > +	while ((cnt < n) &&
> > +		(rte_ring_dequeue_bulk_elem_start(dq->r, e,
> > +					dq->esize, 1, NULL) != 0)) {
> 
> Another thought - any point to use burst_elem_start() here to retrieve more
> then 1 elem in one go? Something like:
I think it makes sense.

> char e[32][dq->size];
> while ((cnt < n) {
> 	k = RTE_MAX(32, cnt - n);
> 	k = rte_ring_dequeue_burst_elem_start(dq->r, e, dq->esize, k, NULL);
> 	if (k = 0)
> 		break;
> 	for (i = 0; i != k; i++) {
> 		memcpy(&token, e[i], sizeof(uint64_t));
> 		if (rte_rcu_qsbr_check(dq->v, token, false) != 1)
> 			break;
> 	}
> 	k = i;
> 	rte_ring_dequeue_elem_finish(dq->r, k);
> 	for (i = 0; i != k; i++)
> 		dq->free_fn(dq->p, e[i] + __RTE_QSBR_TOKEN_SIZE);
I think it also makes sense to change the free_fn to take 'n' number of tokens.

> 	n += k;
> 	if (k == 0)
> 		break;
> 
> ?
> Also if at enqueue we guarantee strict ordrer (via
> enqueue_start/enqueue_finish), then here we probably can do _check_ from
> the last retrieved token here?
> In theory that might help to minimize number of checks.
> I.E. do:
> for (i = k; i-- !=0; )  {
> 	memcpy(&token, e[i], sizeof(uint64_t));
> 	if (rte_rcu_qsbr_check(dq->v, token, false) != 1)
There is a higher chance that later tokens are not acked. This introduces more polling of the counters.
The rte_rcu_qsbr_check has an optimization. While acking the current token, it will also caches the greatest token acked. It uses the cached token for the subsequent calls. I think this provides a better optimization.

> 		break;
> }
> k = i + 1;
> ...
> 
> > +		memcpy(&token, e, sizeof(uint64_t));
> > +
> > +		/* Reclaim the resource */
> > +		if (rte_rcu_qsbr_check(dq->v, token, false) != 1) {
> > +			rte_ring_dequeue_finish(dq->r, 0);
> > +			break;
> > +		}
> > +		rte_ring_dequeue_finish(dq->r, 1);
> > +
> > +		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> > +			"%s(): Reclaimed token = %"PRIu64"\n",
> > +			__func__, *(uint64_t *)e);
> > +
> > +		dq->free_fn(dq->p, e + __RTE_QSBR_TOKEN_SIZE);
> > +
> > +		cnt++;
> > +	}
> > +
> > +	rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> > +		"%s(): Reclaimed %u resources\n", __func__, cnt);
> > +
> > +	if (freed != NULL)
> > +		*freed = cnt;
> > +	if (pending != NULL)
> > +		*pending = rte_ring_count(dq->r);
> > +
> > +	return 0;
> > +}
> > +
> > +/* Delete a defer queue. */
> > +int
> > +rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq) {
> > +	unsigned int pending;
> > +
> > +	if (dq == NULL) {
> > +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > +			"%s(): Invalid input parameter\n", __func__);
> > +		rte_errno = EINVAL;
> > +
> > +		return 1;
> > +	}
> > +
> > +	/* Reclaim all the resources */
> > +	rte_rcu_qsbr_dq_reclaim(dq, ~0, NULL, &pending);
> > +	if (pending != 0) {
> > +		rte_errno = EAGAIN;
> > +
> > +		return 1;
> > +	}
> > +
> > +	rte_ring_free(dq->r);
> > +	rte_free(dq);
> > +
> > +	return 0;
> > +}
> > +
> >  int rte_rcu_log_type;
> >
> >  RTE_INIT(rte_rcu_register)
> > diff --git a/lib/librte_rcu/rte_rcu_qsbr.h
> > b/lib/librte_rcu/rte_rcu_qsbr.h index 0b5585925..213f9b029 100644
> > --- a/lib/librte_rcu/rte_rcu_qsbr.h
> > +++ b/lib/librte_rcu/rte_rcu_qsbr.h
> > @@ -34,6 +34,7 @@ extern "C" {
> >  #include <rte_lcore.h>
> >  #include <rte_debug.h>
> >  #include <rte_atomic.h>
> > +#include <rte_ring.h>
> >
> >  extern int rte_rcu_log_type;
> >
> > @@ -84,6 +85,7 @@ struct rte_rcu_qsbr_cnt {  #define
> > __RTE_QSBR_CNT_THR_OFFLINE 0  #define __RTE_QSBR_CNT_INIT 1
> #define
> > __RTE_QSBR_CNT_MAX ((uint64_t)~0)
> > +#define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
> >
> >  /* RTE Quiescent State variable structure.
> >   * This structure has two elements that vary in size based on the @@
> > -114,6 +116,84 @@ struct rte_rcu_qsbr {
> >  	 */
> >  } __rte_cache_aligned;
> >
> > +/**
> > + * Call back function called to free the resources.
> > + *
> > + * @param p
> > + *   Pointer provided while creating the defer queue
> > + * @param e
> > + *   Pointer to the resource data stored on the defer queue
> > + *
> > + * @return
> > + *   None
> > + */
> > +typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e);
> > +
> > +#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
> > +
> > +/**
> > + * Various flags supported.
> > + */
> > +/**< Enqueue and reclaim operations are multi-thread safe by default.
> > + *   The call back functions registered to free the resources are
> > + *   assumed to be multi-thread safe.
> > + *   Set this flag is multi-thread safety is not required.
> > + */
> > +#define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
> > +
> > +/**
> > + * Parameters used when creating the defer queue.
> > + */
> > +struct rte_rcu_qsbr_dq_parameters {
> > +	const char *name;
> > +	/**< Name of the queue. */
> > +	uint32_t flags;
> > +	/**< Flags to control API behaviors */
> > +	uint32_t size;
> > +	/**< Number of entries in queue. Typically, this will be
> > +	 *   the same as the maximum number of entries supported in the
> > +	 *   lock free data structure.
> > +	 *   Data structures with unbounded number of entries is not
> > +	 *   supported currently.
> > +	 */
> > +	uint32_t esize;
> > +	/**< Size (in bytes) of each element in the defer queue.
> > +	 *   This has to be multiple of 4B.
> > +	 */
> > +	uint32_t trigger_reclaim_limit;
> > +	/**< Trigger automatic reclamation after the defer queue
> > +	 *   has atleast these many resources waiting. This auto
> > +	 *   reclamation is triggered in rte_rcu_qsbr_dq_enqueue API
> > +	 *   call.
> > +	 *   If this is greater than 'size', auto reclamation is
> > +	 *   not triggered.
> > +	 *   If this is set to 0, auto reclamation is triggered
> > +	 *   in every call to rte_rcu_qsbr_dq_enqueue API.
> > +	 */
> > +	uint32_t max_reclaim_size;
> > +	/**< When automatic reclamation is enabled, reclaim at the max
> > +	 *   these many resources. This should contain a valid value, if
> > +	 *   auto reclamation is on. Setting this to 'size' or greater will
> > +	 *   reclaim all possible resources currently on the defer queue.
> > +	 */
> > +	rte_rcu_qsbr_free_resource_t free_fn;
> > +	/**< Function to call to free the resource. */
> > +	void *p;
> > +	/**< Pointer passed to the free function. Typically, this is the
> > +	 *   pointer to the data structure to which the resource to free
> > +	 *   belongs. This can be NULL.
> > +	 */
> > +	struct rte_rcu_qsbr *v;
> > +	/**< RCU QSBR variable to use for this defer queue */ };
> > +
> > +/* RTE defer queue structure.
> > + * This structure holds the defer queue. The defer queue is used to
> > + * hold the deleted entries from the data structure that are not
> > + * yet freed.
> > + */
> > +struct rte_rcu_qsbr_dq;
> > +
> >  /**
> >   * @warning
> >   * @b EXPERIMENTAL: this API may change without prior notice @@
> > -692,6 +772,114 @@ __rte_experimental  int  rte_rcu_qsbr_dump(FILE *f,
> > struct rte_rcu_qsbr *v);
> >
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Create a queue used to store the data structure elements that can
> > + * be freed later. This queue is referred to as 'defer queue'.
> > + *
> > + * @param params
> > + *   Parameters to create a defer queue.
> > + * @return
> > + *   On success - Valid pointer to defer queue
> > + *   On error - NULL
> > + *   Possible rte_errno codes are:
> > + *   - EINVAL - NULL parameters are passed
> > + *   - ENOMEM - Not enough memory
> > + */
> > +__rte_experimental
> > +struct rte_rcu_qsbr_dq *
> > +rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters
> > +*params);
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Enqueue one resource to the defer queue and start the grace period.
> > + * The resource will be freed later after at least one grace period
> > + * is over.
> > + *
> > + * If the defer queue is full, it will attempt to reclaim resources.
> > + * It will also reclaim resources at regular intervals to avoid
> > + * the defer queue from growing too big.
> > + *
> > + * Multi-thread safety is provided as the defer queue configuration.
> > + * When multi-thread safety is requested, it is possible that the
> > + * resources are not stored in their order of deletion. This results
> > + * in resources being held in the defer queue longer than they should.
> > + *
> > + * @param dq
> > + *   Defer queue to allocate an entry from.
> > + * @param e
> > + *   Pointer to resource data to copy to the defer queue. The size of
> > + *   the data to copy is equal to the element size provided when the
> > + *   defer queue was created.
> > + * @return
> > + *   On success - 0
> > + *   On error - 1 with rte_errno set to
> > + *   - EINVAL - NULL parameters are passed
> > + *   - ENOSPC - Defer queue is full. This condition can not happen
> > + *		if the defer queue size is equal (or larger) than the
> > + *		number of elements in the data structure.
> > + */
> > +__rte_experimental
> > +int
> > +rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Free quesed resources from the defer queue.
> > + *
> > + * This API is multi-thread safe.
> > + *
> > + * @param dq
> > + *   Defer queue to free an entry from.
> > + * @param n
> > + *   Maximum number of resources to free.
> > + * @param freed
> > + *   Number of resources that were freed.
> > + * @param pending
> > + *   Number of resources pending on the defer queue. This number might
> not
> > + *   be acurate if multi-thread safety is configured.
> > + * @return
> > + *   On successful reclamation of at least 1 resource - 0
> > + *   On error - 1 with rte_errno set to
> > + *   - EINVAL - NULL parameters are passed
> > + */
> > +__rte_experimental
> > +int
> > +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
> > +				unsigned int *freed, unsigned int *pending);
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Delete a defer queue.
> > + *
> > + * It tries to reclaim all the resources on the defer queue.
> > + * If any of the resources have not completed the grace period
> > + * the reclamation stops and returns immediately. The rest of
> > + * the resources are not reclaimed and the defer queue is not
> > + * freed.
> > + *
> > + * @param dq
> > + *   Defer queue to delete.
> > + * @return
> > + *   On success - 0
> > + *   On error - 1
> > + *   Possible rte_errno codes are:
> > + *   - EINVAL - NULL parameters are passed
> > + *   - EAGAIN - Some of the resources have not completed at least 1 grace
> > + *		period, try again.
> > + */
> > +__rte_experimental
> > +int
> > +rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
> > +
> >  #ifdef __cplusplus
> >  }
> >  #endif
> > diff --git a/lib/librte_rcu/rte_rcu_version.map
> > b/lib/librte_rcu/rte_rcu_version.map
> > index f8b9ef2ab..dfac88a37 100644
> > --- a/lib/librte_rcu/rte_rcu_version.map
> > +++ b/lib/librte_rcu/rte_rcu_version.map
> > @@ -8,6 +8,10 @@ EXPERIMENTAL {
> >  	rte_rcu_qsbr_synchronize;
> >  	rte_rcu_qsbr_thread_register;
> >  	rte_rcu_qsbr_thread_unregister;
> > +	rte_rcu_qsbr_dq_create;
> > +	rte_rcu_qsbr_dq_enqueue;
> > +	rte_rcu_qsbr_dq_reclaim;
> > +	rte_rcu_qsbr_dq_delete;
> >
> >  	local: *;
> >  };
> > diff --git a/lib/meson.build b/lib/meson.build index
> > 9c3cc55d5..15e91a303 100644
> > --- a/lib/meson.build
> > +++ b/lib/meson.build
> > @@ -11,7 +11,9 @@
> >  libraries = [
> >  	'kvargs', # eal depends on kvargs
> >  	'eal', # everything depends on eal
> > -	'ring', 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
> > +	'ring',
> > +	'rcu', # rcu depends on ring
> > +	'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
> >  	'cmdline',
> >  	'metrics', # bitrate/latency stats depends on this
> >  	'hash',    # efd depends on this
> > @@ -22,7 +24,7 @@ libraries = [
> >  	'gro', 'gso', 'ip_frag', 'jobstats',
> >  	'kni', 'latencystats', 'lpm', 'member',
> >  	'power', 'pdump', 'rawdev',
> > -	'rcu', 'rib', 'reorder', 'sched', 'security', 'stack', 'vhost',
> > +	'rib', 'reorder', 'sched', 'security', 'stack', 'vhost',
> >  	# ipsec lib depends on net, crypto and security
> >  	'ipsec',
> >  	#fib lib depends on rib
> > --
> > 2.17.1
  
Ananyev, Konstantin April 20, 2020, 8:19 a.m. UTC | #3
> > > +
> > > +	/* Enqueue the token and resource. Generating the token
> > > +	 * and enqueuing (token + resource) on the queue is not an
> > > +	 * atomic operation. This might result in tokens enqueued
> > > +	 * out of order on the queue. So, some tokens might wait
> > > +	 * longer than they are required to be reclaimed.
> > > +	 */
> > > +	char data[dq->esize];
> > > +	memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);
> > > +	memcpy(data + __RTE_QSBR_TOKEN_SIZE, e,
> > > +		dq->esize - __RTE_QSBR_TOKEN_SIZE);
> > > +	/* Check the status as enqueue might fail since the other thread
> > > +	 * might have used up the freed space.
> > > +	 * Enqueue uses the configured flags when the DQ was created.
> > > +	 */
> > > +	if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {
> > > +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > > +			"%s(): Enqueue failed\n", __func__);
> > > +		/* Note that the token generated above is not used.
> > > +		 * Other than wasting tokens, it should not cause any
> > > +		 * other issues.
> > > +		 */
> > > +		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> > > +			"%s(): Skipped enqueuing token = %"PRIu64"\n",
> > > +			__func__, token);
> > > +
> > > +		rte_errno = ENOSPC;
> > > +		return 1;
> > > +	}
> >
> >
> > Just as a thought: in theory if we'll use MP_HTS(/SP) ring we can avoid
> > wasting RCU tokens:
> >
> > if (rte_ring_enqueue_elem_bulk_start(dq->r, 1, NULL) != 0) {
> > 	token = rte_rcu_qsbr_start(dq->v);
> > 	memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);
> > 	rte_ring_enqueue_elem_finish(dq->r, data, dq->esize, 1); }
> >
> > Though it might slowdown things if we'll have a lot of parallel dq_enqueue.
> > So not sure is it worth it or not.
> Adding peek APIs for RTS would be better. That should take care of the parallel dw_enqueue. Not sure if I gave you the comment. My ring
> patch supported these APIs.

AFAIK, peek API is not possible for RTS mode.
Probably you are talking about Scatter-Gather API introduced in your RFC
(_reserve_; update ring entries manually; _commit_)?
Anyway, if there is no much value in my idea above, then feel free to drop it.

> 
> >
> > > +
> > > +	rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> > > +		"%s(): Enqueued token = %"PRIu64"\n", __func__, token);
> > > +
> > > +	return 0;
> > > +}
> > > +
> > > +/* Reclaim resources from the defer queue. */ int
> > > +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
> > > +				unsigned int *freed, unsigned int *pending) {
> > > +	uint32_t cnt;
> > > +	uint64_t token;
> > > +
> > > +	if (dq == NULL || n == 0) {
> > > +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > > +			"%s(): Invalid input parameter\n", __func__);
> > > +		rte_errno = EINVAL;
> > > +
> > > +		return 1;
> > > +	}
> > > +
> > > +	cnt = 0;
> > > +
> > > +	char e[dq->esize];
> > > +	/* Check reader threads quiescent state and reclaim resources */
> > > +	while ((cnt < n) &&
> > > +		(rte_ring_dequeue_bulk_elem_start(dq->r, e,
> > > +					dq->esize, 1, NULL) != 0)) {
> >
> > Another thought - any point to use burst_elem_start() here to retrieve more
> > then 1 elem in one go? Something like:
> I think it makes sense.
> 
> > char e[32][dq->size];
> > while ((cnt < n) {
> > 	k = RTE_MAX(32, cnt - n);
> > 	k = rte_ring_dequeue_burst_elem_start(dq->r, e, dq->esize, k, NULL);
> > 	if (k = 0)
> > 		break;
> > 	for (i = 0; i != k; i++) {
> > 		memcpy(&token, e[i], sizeof(uint64_t));
> > 		if (rte_rcu_qsbr_check(dq->v, token, false) != 1)
> > 			break;
> > 	}
> > 	k = i;
> > 	rte_ring_dequeue_elem_finish(dq->r, k);
> > 	for (i = 0; i != k; i++)
> > 		dq->free_fn(dq->p, e[i] + __RTE_QSBR_TOKEN_SIZE);
> I think it also makes sense to change the free_fn to take 'n' number of tokens.
> 
> > 	n += k;
> > 	if (k == 0)
> > 		break;
> >
> > ?
> > Also if at enqueue we guarantee strict ordrer (via
> > enqueue_start/enqueue_finish), then here we probably can do _check_ from
> > the last retrieved token here?
> > In theory that might help to minimize number of checks.
> > I.E. do:
> > for (i = k; i-- !=0; )  {
> > 	memcpy(&token, e[i], sizeof(uint64_t));
> > 	if (rte_rcu_qsbr_check(dq->v, token, false) != 1)
> There is a higher chance that later tokens are not acked. This introduces more polling of the counters.
> The rte_rcu_qsbr_check has an optimization. While acking the current token, it will also caches the greatest token acked. It uses the cached
> token for the subsequent calls. I think this provides a better optimization.

Ok.
  

Patch

diff --git a/lib/librte_rcu/Makefile b/lib/librte_rcu/Makefile
index c4bb28d77..95f8a57e2 100644
--- a/lib/librte_rcu/Makefile
+++ b/lib/librte_rcu/Makefile
@@ -8,7 +8,7 @@  LIB = librte_rcu.a
 
 CFLAGS += -DALLOW_EXPERIMENTAL_API
 CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
-LDLIBS += -lrte_eal
+LDLIBS += -lrte_eal -lrte_ring
 
 EXPORT_MAP := rte_rcu_version.map
 
diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build
index 62920ba02..e280b29c1 100644
--- a/lib/librte_rcu/meson.build
+++ b/lib/librte_rcu/meson.build
@@ -10,3 +10,5 @@  headers = files('rte_rcu_qsbr.h')
 if cc.get_id() == 'clang' and dpdk_conf.get('RTE_ARCH_64') == false
 	ext_deps += cc.find_library('atomic')
 endif
+
+deps += ['ring']
diff --git a/lib/librte_rcu/rcu_qsbr_pvt.h b/lib/librte_rcu/rcu_qsbr_pvt.h
new file mode 100644
index 000000000..413f28587
--- /dev/null
+++ b/lib/librte_rcu/rcu_qsbr_pvt.h
@@ -0,0 +1,57 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright (c) 2019 Arm Limited
+ */
+
+#ifndef _RTE_RCU_QSBR_PVT_H_
+#define _RTE_RCU_QSBR_PVT_H_
+
+/**
+ * This file is private to the RCU library. It should not be included
+ * by the user of this library.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <rte_ring.h>
+#include <rte_ring_elem.h>
+
+#include "rte_rcu_qsbr.h"
+
+/* RTE defer queue structure.
+ * This structure holds the defer queue. The defer queue is used to
+ * hold the deleted entries from the data structure that are not
+ * yet freed.
+ */
+struct rte_rcu_qsbr_dq {
+	struct rte_rcu_qsbr *v; /**< RCU QSBR variable used by this queue.*/
+	struct rte_ring *r;     /**< RCU QSBR defer queue. */
+	uint32_t size;
+	/**< Number of elements in the defer queue */
+	uint32_t esize;
+	/**< Size (in bytes) of data, including the token, stored on the
+	 *   defer queue.
+	 */
+	uint32_t trigger_reclaim_limit;
+	/**< Trigger automatic reclamation after the defer queue
+	 *   has atleast these many resources waiting.
+	 */
+	uint32_t max_reclaim_size;
+	/**< Reclaim at the max these many resources during auto
+	 *   reclamation.
+	 */
+	rte_rcu_qsbr_free_resource_t free_fn;
+	/**< Function to call to free the resource. */
+	void *p;
+	/**< Pointer passed to the free function. Typically, this is the
+	 *   pointer to the data structure to which the resource to free
+	 *   belongs.
+	 */
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_RCU_QSBR_PVT_H_ */
diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c
index 2f3fad776..e8c1e386f 100644
--- a/lib/librte_rcu/rte_rcu_qsbr.c
+++ b/lib/librte_rcu/rte_rcu_qsbr.c
@@ -1,6 +1,6 @@ 
 /* SPDX-License-Identifier: BSD-3-Clause
  *
- * Copyright (c) 2018 Arm Limited
+ * Copyright (c) 2018-2019 Arm Limited
  */
 
 #include <stdio.h>
@@ -18,8 +18,10 @@ 
 #include <rte_per_lcore.h>
 #include <rte_lcore.h>
 #include <rte_errno.h>
+#include <rte_ring_elem.h>
 
 #include "rte_rcu_qsbr.h"
+#include "rcu_qsbr_pvt.h"
 
 /* Get the memory size of QSBR variable */
 size_t
@@ -270,6 +272,245 @@  rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
 	return 0;
 }
 
+/* Create a queue used to store the data structure elements that can
+ * be freed later. This queue is referred to as 'defer queue'.
+ */
+struct rte_rcu_qsbr_dq *
+rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
+{
+	struct rte_rcu_qsbr_dq *dq;
+	uint32_t qs_fifo_size;
+	unsigned int flags;
+
+	if (params == NULL || params->free_fn == NULL ||
+		params->v == NULL || params->name == NULL ||
+		params->size == 0 || params->esize == 0 ||
+		(params->esize % 4 != 0)) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): Invalid input parameter\n", __func__);
+		rte_errno = EINVAL;
+
+		return NULL;
+	}
+	/* If auto reclamation is configured, reclaim limit
+	 * should be a valid value.
+	 */
+	if ((params->trigger_reclaim_limit <= params->size) &&
+	    (params->max_reclaim_size == 0)) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): Invalid input parameter, size = %u, trigger_reclaim_limit = %u, max_reclaim_size = %u\n",
+			__func__, params->size, params->trigger_reclaim_limit,
+			params->max_reclaim_size);
+		rte_errno = EINVAL;
+
+		return NULL;
+	}
+
+	dq = rte_zmalloc(NULL, sizeof(struct rte_rcu_qsbr_dq),
+			 RTE_CACHE_LINE_SIZE);
+	if (dq == NULL) {
+		rte_errno = ENOMEM;
+
+		return NULL;
+	}
+
+	/* Decide the flags for the ring.
+	 * If MT safety is requested, use RTS for ring enqueue as most
+	 * use cases involve dq-enqueue happening on the control plane.
+	 * Ring dequeue is always HTS due to the possibility of revert.
+	 */
+	flags = RING_F_MP_RTS_ENQ;
+	if (params->flags & RTE_RCU_QSBR_DQ_MT_UNSAFE)
+		flags = RING_F_SP_ENQ;
+	flags |= RING_F_MC_HTS_DEQ;
+	/* round up qs_fifo_size to next power of two that is not less than
+	 * max_size.
+	 */
+	qs_fifo_size = rte_align32pow2(params->size + 1);
+	/* Add token size to ring element size */
+	dq->r = rte_ring_create_elem(params->name,
+			__RTE_QSBR_TOKEN_SIZE + params->esize,
+			qs_fifo_size, SOCKET_ID_ANY, flags);
+	if (dq->r == NULL) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): defer queue create failed\n", __func__);
+		rte_free(dq);
+		return NULL;
+	}
+
+	dq->v = params->v;
+	dq->size = params->size;
+	dq->esize = __RTE_QSBR_TOKEN_SIZE + params->esize;
+	dq->trigger_reclaim_limit = params->trigger_reclaim_limit;
+	dq->max_reclaim_size = params->max_reclaim_size;
+	dq->free_fn = params->free_fn;
+	dq->p = params->p;
+
+	return dq;
+}
+
+/* Enqueue one resource to the defer queue to free after the grace
+ * period is over.
+ */
+int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
+{
+	uint64_t token;
+	uint32_t cur_size, free_size;
+
+	if (dq == NULL || e == NULL) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): Invalid input parameter\n", __func__);
+		rte_errno = EINVAL;
+
+		return 1;
+	}
+
+	/* Start the grace period */
+	token = rte_rcu_qsbr_start(dq->v);
+
+	/* Reclaim resources if the queue is 1/8th full. This helps
+	 * the queue from growing too large and allows time for reader
+	 * threads to report their quiescent state.
+	 */
+	cur_size = rte_ring_count(dq->r);
+	if (cur_size > dq->trigger_reclaim_limit) {
+		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+			"%s(): Triggering reclamation\n", __func__);
+		rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size, NULL, NULL);
+	}
+
+	/* Check if there is space for atleast 1 resource */
+	free_size = rte_ring_free_count(dq->r);
+	if (!free_size) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): Defer queue is full\n", __func__);
+		/* Note that the token generated above is not used.
+		 * Other than wasting tokens, it should not cause any
+		 * other issues.
+		 */
+		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+			"%s(): Skipped enqueuing token = %"PRIu64"\n",
+			__func__, token);
+
+		rte_errno = ENOSPC;
+		return 1;
+	}
+
+	/* Enqueue the token and resource. Generating the token
+	 * and enqueuing (token + resource) on the queue is not an
+	 * atomic operation. This might result in tokens enqueued
+	 * out of order on the queue. So, some tokens might wait
+	 * longer than they are required to be reclaimed.
+	 */
+	char data[dq->esize];
+	memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);
+	memcpy(data + __RTE_QSBR_TOKEN_SIZE, e,
+		dq->esize - __RTE_QSBR_TOKEN_SIZE);
+	/* Check the status as enqueue might fail since the other thread
+	 * might have used up the freed space.
+	 * Enqueue uses the configured flags when the DQ was created.
+	 */
+	if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): Enqueue failed\n", __func__);
+		/* Note that the token generated above is not used.
+		 * Other than wasting tokens, it should not cause any
+		 * other issues.
+		 */
+		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+			"%s(): Skipped enqueuing token = %"PRIu64"\n",
+			__func__, token);
+
+		rte_errno = ENOSPC;
+		return 1;
+	}
+
+	rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+		"%s(): Enqueued token = %"PRIu64"\n", __func__, token);
+
+	return 0;
+}
+
+/* Reclaim resources from the defer queue. */
+int
+rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
+				unsigned int *freed, unsigned int *pending)
+{
+	uint32_t cnt;
+	uint64_t token;
+
+	if (dq == NULL || n == 0) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): Invalid input parameter\n", __func__);
+		rte_errno = EINVAL;
+
+		return 1;
+	}
+
+	cnt = 0;
+
+	char e[dq->esize];
+	/* Check reader threads quiescent state and reclaim resources */
+	while ((cnt < n) &&
+		(rte_ring_dequeue_bulk_elem_start(dq->r, e,
+					dq->esize, 1, NULL) != 0)) {
+		memcpy(&token, e, sizeof(uint64_t));
+
+		/* Reclaim the resource */
+		if (rte_rcu_qsbr_check(dq->v, token, false) != 1) {
+			rte_ring_dequeue_finish(dq->r, 0);
+			break;
+		}
+		rte_ring_dequeue_finish(dq->r, 1);
+
+		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+			"%s(): Reclaimed token = %"PRIu64"\n",
+			__func__, *(uint64_t *)e);
+
+		dq->free_fn(dq->p, e + __RTE_QSBR_TOKEN_SIZE);
+
+		cnt++;
+	}
+
+	rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+		"%s(): Reclaimed %u resources\n", __func__, cnt);
+
+	if (freed != NULL)
+		*freed = cnt;
+	if (pending != NULL)
+		*pending = rte_ring_count(dq->r);
+
+	return 0;
+}
+
+/* Delete a defer queue. */
+int
+rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
+{
+	unsigned int pending;
+
+	if (dq == NULL) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): Invalid input parameter\n", __func__);
+		rte_errno = EINVAL;
+
+		return 1;
+	}
+
+	/* Reclaim all the resources */
+	rte_rcu_qsbr_dq_reclaim(dq, ~0, NULL, &pending);
+	if (pending != 0) {
+		rte_errno = EAGAIN;
+
+		return 1;
+	}
+
+	rte_ring_free(dq->r);
+	rte_free(dq);
+
+	return 0;
+}
+
 int rte_rcu_log_type;
 
 RTE_INIT(rte_rcu_register)
diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h
index 0b5585925..213f9b029 100644
--- a/lib/librte_rcu/rte_rcu_qsbr.h
+++ b/lib/librte_rcu/rte_rcu_qsbr.h
@@ -34,6 +34,7 @@  extern "C" {
 #include <rte_lcore.h>
 #include <rte_debug.h>
 #include <rte_atomic.h>
+#include <rte_ring.h>
 
 extern int rte_rcu_log_type;
 
@@ -84,6 +85,7 @@  struct rte_rcu_qsbr_cnt {
 #define __RTE_QSBR_CNT_THR_OFFLINE 0
 #define __RTE_QSBR_CNT_INIT 1
 #define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
+#define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
 
 /* RTE Quiescent State variable structure.
  * This structure has two elements that vary in size based on the
@@ -114,6 +116,84 @@  struct rte_rcu_qsbr {
 	 */
 } __rte_cache_aligned;
 
+/**
+ * Call back function called to free the resources.
+ *
+ * @param p
+ *   Pointer provided while creating the defer queue
+ * @param e
+ *   Pointer to the resource data stored on the defer queue
+ *
+ * @return
+ *   None
+ */
+typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e);
+
+#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
+
+/**
+ * Various flags supported.
+ */
+/**< Enqueue and reclaim operations are multi-thread safe by default.
+ *   The call back functions registered to free the resources are
+ *   assumed to be multi-thread safe.
+ *   Set this flag is multi-thread safety is not required.
+ */
+#define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
+
+/**
+ * Parameters used when creating the defer queue.
+ */
+struct rte_rcu_qsbr_dq_parameters {
+	const char *name;
+	/**< Name of the queue. */
+	uint32_t flags;
+	/**< Flags to control API behaviors */
+	uint32_t size;
+	/**< Number of entries in queue. Typically, this will be
+	 *   the same as the maximum number of entries supported in the
+	 *   lock free data structure.
+	 *   Data structures with unbounded number of entries is not
+	 *   supported currently.
+	 */
+	uint32_t esize;
+	/**< Size (in bytes) of each element in the defer queue.
+	 *   This has to be multiple of 4B.
+	 */
+	uint32_t trigger_reclaim_limit;
+	/**< Trigger automatic reclamation after the defer queue
+	 *   has atleast these many resources waiting. This auto
+	 *   reclamation is triggered in rte_rcu_qsbr_dq_enqueue API
+	 *   call.
+	 *   If this is greater than 'size', auto reclamation is
+	 *   not triggered.
+	 *   If this is set to 0, auto reclamation is triggered
+	 *   in every call to rte_rcu_qsbr_dq_enqueue API.
+	 */
+	uint32_t max_reclaim_size;
+	/**< When automatic reclamation is enabled, reclaim at the max
+	 *   these many resources. This should contain a valid value, if
+	 *   auto reclamation is on. Setting this to 'size' or greater will
+	 *   reclaim all possible resources currently on the defer queue.
+	 */
+	rte_rcu_qsbr_free_resource_t free_fn;
+	/**< Function to call to free the resource. */
+	void *p;
+	/**< Pointer passed to the free function. Typically, this is the
+	 *   pointer to the data structure to which the resource to free
+	 *   belongs. This can be NULL.
+	 */
+	struct rte_rcu_qsbr *v;
+	/**< RCU QSBR variable to use for this defer queue */
+};
+
+/* RTE defer queue structure.
+ * This structure holds the defer queue. The defer queue is used to
+ * hold the deleted entries from the data structure that are not
+ * yet freed.
+ */
+struct rte_rcu_qsbr_dq;
+
 /**
  * @warning
  * @b EXPERIMENTAL: this API may change without prior notice
@@ -692,6 +772,114 @@  __rte_experimental
 int
 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
 
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Create a queue used to store the data structure elements that can
+ * be freed later. This queue is referred to as 'defer queue'.
+ *
+ * @param params
+ *   Parameters to create a defer queue.
+ * @return
+ *   On success - Valid pointer to defer queue
+ *   On error - NULL
+ *   Possible rte_errno codes are:
+ *   - EINVAL - NULL parameters are passed
+ *   - ENOMEM - Not enough memory
+ */
+__rte_experimental
+struct rte_rcu_qsbr_dq *
+rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Enqueue one resource to the defer queue and start the grace period.
+ * The resource will be freed later after at least one grace period
+ * is over.
+ *
+ * If the defer queue is full, it will attempt to reclaim resources.
+ * It will also reclaim resources at regular intervals to avoid
+ * the defer queue from growing too big.
+ *
+ * Multi-thread safety is provided as the defer queue configuration.
+ * When multi-thread safety is requested, it is possible that the
+ * resources are not stored in their order of deletion. This results
+ * in resources being held in the defer queue longer than they should.
+ *
+ * @param dq
+ *   Defer queue to allocate an entry from.
+ * @param e
+ *   Pointer to resource data to copy to the defer queue. The size of
+ *   the data to copy is equal to the element size provided when the
+ *   defer queue was created.
+ * @return
+ *   On success - 0
+ *   On error - 1 with rte_errno set to
+ *   - EINVAL - NULL parameters are passed
+ *   - ENOSPC - Defer queue is full. This condition can not happen
+ *		if the defer queue size is equal (or larger) than the
+ *		number of elements in the data structure.
+ */
+__rte_experimental
+int
+rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Free quesed resources from the defer queue.
+ *
+ * This API is multi-thread safe.
+ *
+ * @param dq
+ *   Defer queue to free an entry from.
+ * @param n
+ *   Maximum number of resources to free.
+ * @param freed
+ *   Number of resources that were freed.
+ * @param pending
+ *   Number of resources pending on the defer queue. This number might not
+ *   be acurate if multi-thread safety is configured.
+ * @return
+ *   On successful reclamation of at least 1 resource - 0
+ *   On error - 1 with rte_errno set to
+ *   - EINVAL - NULL parameters are passed
+ */
+__rte_experimental
+int
+rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
+				unsigned int *freed, unsigned int *pending);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Delete a defer queue.
+ *
+ * It tries to reclaim all the resources on the defer queue.
+ * If any of the resources have not completed the grace period
+ * the reclamation stops and returns immediately. The rest of
+ * the resources are not reclaimed and the defer queue is not
+ * freed.
+ *
+ * @param dq
+ *   Defer queue to delete.
+ * @return
+ *   On success - 0
+ *   On error - 1
+ *   Possible rte_errno codes are:
+ *   - EINVAL - NULL parameters are passed
+ *   - EAGAIN - Some of the resources have not completed at least 1 grace
+ *		period, try again.
+ */
+__rte_experimental
+int
+rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/lib/librte_rcu/rte_rcu_version.map b/lib/librte_rcu/rte_rcu_version.map
index f8b9ef2ab..dfac88a37 100644
--- a/lib/librte_rcu/rte_rcu_version.map
+++ b/lib/librte_rcu/rte_rcu_version.map
@@ -8,6 +8,10 @@  EXPERIMENTAL {
 	rte_rcu_qsbr_synchronize;
 	rte_rcu_qsbr_thread_register;
 	rte_rcu_qsbr_thread_unregister;
+	rte_rcu_qsbr_dq_create;
+	rte_rcu_qsbr_dq_enqueue;
+	rte_rcu_qsbr_dq_reclaim;
+	rte_rcu_qsbr_dq_delete;
 
 	local: *;
 };
diff --git a/lib/meson.build b/lib/meson.build
index 9c3cc55d5..15e91a303 100644
--- a/lib/meson.build
+++ b/lib/meson.build
@@ -11,7 +11,9 @@ 
 libraries = [
 	'kvargs', # eal depends on kvargs
 	'eal', # everything depends on eal
-	'ring', 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
+	'ring',
+	'rcu', # rcu depends on ring
+	'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
 	'cmdline',
 	'metrics', # bitrate/latency stats depends on this
 	'hash',    # efd depends on this
@@ -22,7 +24,7 @@  libraries = [
 	'gro', 'gso', 'ip_frag', 'jobstats',
 	'kni', 'latencystats', 'lpm', 'member',
 	'power', 'pdump', 'rawdev',
-	'rcu', 'rib', 'reorder', 'sched', 'security', 'stack', 'vhost',
+	'rib', 'reorder', 'sched', 'security', 'stack', 'vhost',
 	# ipsec lib depends on net, crypto and security
 	'ipsec',
 	#fib lib depends on rib