[v4,1/4] lib/rcu: add resource reclamation APIs
Checks
Commit Message
Add resource reclamation APIs to make it simple for applications
and libraries to integrate rte_rcu library.
Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Reviewed-by: Ola Liljedhal <ola.liljedhal@arm.com>
Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
---
lib/librte_rcu/Makefile | 2 +-
lib/librte_rcu/meson.build | 2 +
lib/librte_rcu/rcu_qsbr_pvt.h | 57 +++++++
lib/librte_rcu/rte_rcu_qsbr.c | 243 ++++++++++++++++++++++++++++-
lib/librte_rcu/rte_rcu_qsbr.h | 188 ++++++++++++++++++++++
lib/librte_rcu/rte_rcu_version.map | 4 +
lib/meson.build | 6 +-
7 files changed, 498 insertions(+), 4 deletions(-)
create mode 100644 lib/librte_rcu/rcu_qsbr_pvt.h
Comments
> Add resource reclamation APIs to make it simple for applications
> and libraries to integrate rte_rcu library.
Few nits, thoughts, please see below.
Apart from that - LGTM.
Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
>
> Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> Reviewed-by: Ola Liljedhal <ola.liljedhal@arm.com>
> Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
> ---
> lib/librte_rcu/Makefile | 2 +-
> lib/librte_rcu/meson.build | 2 +
> lib/librte_rcu/rcu_qsbr_pvt.h | 57 +++++++
> lib/librte_rcu/rte_rcu_qsbr.c | 243 ++++++++++++++++++++++++++++-
> lib/librte_rcu/rte_rcu_qsbr.h | 188 ++++++++++++++++++++++
> lib/librte_rcu/rte_rcu_version.map | 4 +
> lib/meson.build | 6 +-
> 7 files changed, 498 insertions(+), 4 deletions(-)
> create mode 100644 lib/librte_rcu/rcu_qsbr_pvt.h
>
> diff --git a/lib/librte_rcu/Makefile b/lib/librte_rcu/Makefile
> index c4bb28d77..95f8a57e2 100644
> --- a/lib/librte_rcu/Makefile
> +++ b/lib/librte_rcu/Makefile
> @@ -8,7 +8,7 @@ LIB = librte_rcu.a
>
> CFLAGS += -DALLOW_EXPERIMENTAL_API
> CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
> -LDLIBS += -lrte_eal
> +LDLIBS += -lrte_eal -lrte_ring
>
> EXPORT_MAP := rte_rcu_version.map
>
> diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build
> index 62920ba02..e280b29c1 100644
> --- a/lib/librte_rcu/meson.build
> +++ b/lib/librte_rcu/meson.build
> @@ -10,3 +10,5 @@ headers = files('rte_rcu_qsbr.h')
> if cc.get_id() == 'clang' and dpdk_conf.get('RTE_ARCH_64') == false
> ext_deps += cc.find_library('atomic')
> endif
> +
> +deps += ['ring']
> diff --git a/lib/librte_rcu/rcu_qsbr_pvt.h b/lib/librte_rcu/rcu_qsbr_pvt.h
> new file mode 100644
> index 000000000..413f28587
> --- /dev/null
> +++ b/lib/librte_rcu/rcu_qsbr_pvt.h
> @@ -0,0 +1,57 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright (c) 2019 Arm Limited
> + */
> +
> +#ifndef _RTE_RCU_QSBR_PVT_H_
> +#define _RTE_RCU_QSBR_PVT_H_
> +
> +/**
> + * This file is private to the RCU library. It should not be included
> + * by the user of this library.
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <rte_ring.h>
> +#include <rte_ring_elem.h>
> +
> +#include "rte_rcu_qsbr.h"
> +
> +/* RTE defer queue structure.
> + * This structure holds the defer queue. The defer queue is used to
> + * hold the deleted entries from the data structure that are not
> + * yet freed.
> + */
> +struct rte_rcu_qsbr_dq {
> + struct rte_rcu_qsbr *v; /**< RCU QSBR variable used by this queue.*/
> + struct rte_ring *r; /**< RCU QSBR defer queue. */
> + uint32_t size;
> + /**< Number of elements in the defer queue */
> + uint32_t esize;
> + /**< Size (in bytes) of data, including the token, stored on the
> + * defer queue.
> + */
> + uint32_t trigger_reclaim_limit;
> + /**< Trigger automatic reclamation after the defer queue
> + * has atleast these many resources waiting.
> + */
> + uint32_t max_reclaim_size;
> + /**< Reclaim at the max these many resources during auto
> + * reclamation.
> + */
> + rte_rcu_qsbr_free_resource_t free_fn;
> + /**< Function to call to free the resource. */
> + void *p;
> + /**< Pointer passed to the free function. Typically, this is the
> + * pointer to the data structure to which the resource to free
> + * belongs.
> + */
> +};
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _RTE_RCU_QSBR_PVT_H_ */
> diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c
> index 2f3fad776..e8c1e386f 100644
> --- a/lib/librte_rcu/rte_rcu_qsbr.c
> +++ b/lib/librte_rcu/rte_rcu_qsbr.c
> @@ -1,6 +1,6 @@
> /* SPDX-License-Identifier: BSD-3-Clause
> *
> - * Copyright (c) 2018 Arm Limited
> + * Copyright (c) 2018-2019 Arm Limited
> */
>
> #include <stdio.h>
> @@ -18,8 +18,10 @@
> #include <rte_per_lcore.h>
> #include <rte_lcore.h>
> #include <rte_errno.h>
> +#include <rte_ring_elem.h>
>
> #include "rte_rcu_qsbr.h"
> +#include "rcu_qsbr_pvt.h"
>
> /* Get the memory size of QSBR variable */
> size_t
> @@ -270,6 +272,245 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
> return 0;
> }
>
> +/* Create a queue used to store the data structure elements that can
> + * be freed later. This queue is referred to as 'defer queue'.
> + */
> +struct rte_rcu_qsbr_dq *
> +rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
> +{
> + struct rte_rcu_qsbr_dq *dq;
> + uint32_t qs_fifo_size;
> + unsigned int flags;
> +
> + if (params == NULL || params->free_fn == NULL ||
> + params->v == NULL || params->name == NULL ||
> + params->size == 0 || params->esize == 0 ||
> + (params->esize % 4 != 0)) {
> + rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> + "%s(): Invalid input parameter\n", __func__);
> + rte_errno = EINVAL;
> +
> + return NULL;
> + }
> + /* If auto reclamation is configured, reclaim limit
> + * should be a valid value.
> + */
> + if ((params->trigger_reclaim_limit <= params->size) &&
> + (params->max_reclaim_size == 0)) {
> + rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> + "%s(): Invalid input parameter, size = %u, trigger_reclaim_limit = %u, max_reclaim_size = %u\n",
> + __func__, params->size, params->trigger_reclaim_limit,
> + params->max_reclaim_size);
> + rte_errno = EINVAL;
> +
> + return NULL;
> + }
> +
> + dq = rte_zmalloc(NULL, sizeof(struct rte_rcu_qsbr_dq),
> + RTE_CACHE_LINE_SIZE);
> + if (dq == NULL) {
> + rte_errno = ENOMEM;
> +
> + return NULL;
> + }
> +
> + /* Decide the flags for the ring.
> + * If MT safety is requested, use RTS for ring enqueue as most
> + * use cases involve dq-enqueue happening on the control plane.
> + * Ring dequeue is always HTS due to the possibility of revert.
> + */
> + flags = RING_F_MP_RTS_ENQ;
> + if (params->flags & RTE_RCU_QSBR_DQ_MT_UNSAFE)
> + flags = RING_F_SP_ENQ;
> + flags |= RING_F_MC_HTS_DEQ;
> + /* round up qs_fifo_size to next power of two that is not less than
> + * max_size.
> + */
> + qs_fifo_size = rte_align32pow2(params->size + 1);
> + /* Add token size to ring element size */
> + dq->r = rte_ring_create_elem(params->name,
> + __RTE_QSBR_TOKEN_SIZE + params->esize,
> + qs_fifo_size, SOCKET_ID_ANY, flags);
> + if (dq->r == NULL) {
> + rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> + "%s(): defer queue create failed\n", __func__);
> + rte_free(dq);
> + return NULL;
> + }
> +
> + dq->v = params->v;
> + dq->size = params->size;
> + dq->esize = __RTE_QSBR_TOKEN_SIZE + params->esize;
> + dq->trigger_reclaim_limit = params->trigger_reclaim_limit;
> + dq->max_reclaim_size = params->max_reclaim_size;
> + dq->free_fn = params->free_fn;
> + dq->p = params->p;
> +
> + return dq;
> +}
> +
> +/* Enqueue one resource to the defer queue to free after the grace
> + * period is over.
> + */
> +int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
> +{
> + uint64_t token;
> + uint32_t cur_size, free_size;
> +
> + if (dq == NULL || e == NULL) {
> + rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> + "%s(): Invalid input parameter\n", __func__);
> + rte_errno = EINVAL;
> +
> + return 1;
> + }
> +
> + /* Start the grace period */
> + token = rte_rcu_qsbr_start(dq->v);
> +
> + /* Reclaim resources if the queue is 1/8th full. This helps
Comment about 1/8 is probably left from older version?
As I understand now it is configurable parameter.
> + * the queue from growing too large and allows time for reader
> + * threads to report their quiescent state.
> + */
> + cur_size = rte_ring_count(dq->r);
> + if (cur_size > dq->trigger_reclaim_limit) {
> + rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> + "%s(): Triggering reclamation\n", __func__);
> + rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size, NULL, NULL);
> + }
> +
> + /* Check if there is space for atleast 1 resource */
> + free_size = rte_ring_free_count(dq->r);
> + if (!free_size) {
Is there any point to do this check at all?
You are doing enqueue below and handle situation with
not enough space in the ring anyway.
> + rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> + "%s(): Defer queue is full\n", __func__);
> + /* Note that the token generated above is not used.
> + * Other than wasting tokens, it should not cause any
> + * other issues.
> + */
> + rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> + "%s(): Skipped enqueuing token = %"PRIu64"\n",
> + __func__, token);
> +
> + rte_errno = ENOSPC;
> + return 1;
> + }
> +
> + /* Enqueue the token and resource. Generating the token
> + * and enqueuing (token + resource) on the queue is not an
> + * atomic operation. This might result in tokens enqueued
> + * out of order on the queue. So, some tokens might wait
> + * longer than they are required to be reclaimed.
> + */
> + char data[dq->esize];
> + memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);
> + memcpy(data + __RTE_QSBR_TOKEN_SIZE, e,
> + dq->esize - __RTE_QSBR_TOKEN_SIZE);
> + /* Check the status as enqueue might fail since the other thread
> + * might have used up the freed space.
> + * Enqueue uses the configured flags when the DQ was created.
> + */
> + if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {
> + rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> + "%s(): Enqueue failed\n", __func__);
> + /* Note that the token generated above is not used.
> + * Other than wasting tokens, it should not cause any
> + * other issues.
> + */
> + rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> + "%s(): Skipped enqueuing token = %"PRIu64"\n",
> + __func__, token);
> +
> + rte_errno = ENOSPC;
> + return 1;
> + }
Just as a thought: in theory if we'll use MP_HTS(/SP) ring we can avoid
wasting RCU tokens:
if (rte_ring_enqueue_elem_bulk_start(dq->r, 1, NULL) != 0) {
token = rte_rcu_qsbr_start(dq->v);
memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);
rte_ring_enqueue_elem_finish(dq->r, data, dq->esize, 1);
}
Though it might slowdown things if we'll have a lot of
parallel dq_enqueue.
So not sure is it worth it or not.
> +
> + rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> + "%s(): Enqueued token = %"PRIu64"\n", __func__, token);
> +
> + return 0;
> +}
> +
> +/* Reclaim resources from the defer queue. */
> +int
> +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
> + unsigned int *freed, unsigned int *pending)
> +{
> + uint32_t cnt;
> + uint64_t token;
> +
> + if (dq == NULL || n == 0) {
> + rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> + "%s(): Invalid input parameter\n", __func__);
> + rte_errno = EINVAL;
> +
> + return 1;
> + }
> +
> + cnt = 0;
> +
> + char e[dq->esize];
> + /* Check reader threads quiescent state and reclaim resources */
> + while ((cnt < n) &&
> + (rte_ring_dequeue_bulk_elem_start(dq->r, e,
> + dq->esize, 1, NULL) != 0)) {
Another thought - any point to use burst_elem_start() here to retrieve more
then 1 elem in one go? Something like:
char e[32][dq->size];
while ((cnt < n) {
k = RTE_MAX(32, cnt - n);
k = rte_ring_dequeue_burst_elem_start(dq->r, e, dq->esize, k, NULL);
if (k = 0)
break;
for (i = 0; i != k; i++) {
memcpy(&token, e[i], sizeof(uint64_t));
if (rte_rcu_qsbr_check(dq->v, token, false) != 1)
break;
}
k = i;
rte_ring_dequeue_elem_finish(dq->r, k);
for (i = 0; i != k; i++)
dq->free_fn(dq->p, e[i] + __RTE_QSBR_TOKEN_SIZE);
n += k;
if (k == 0)
break;
?
Also if at enqueue we guarantee strict ordrer (via enqueue_start/enqueue_finish),
then here we probably can do _check_ from the last retrieved token here?
In theory that might help to minimize number of checks.
I.E. do:
for (i = k; i-- !=0; ) {
memcpy(&token, e[i], sizeof(uint64_t));
if (rte_rcu_qsbr_check(dq->v, token, false) != 1)
break;
}
k = i + 1;
...
> + memcpy(&token, e, sizeof(uint64_t));
> +
> + /* Reclaim the resource */
> + if (rte_rcu_qsbr_check(dq->v, token, false) != 1) {
> + rte_ring_dequeue_finish(dq->r, 0);
> + break;
> + }
> + rte_ring_dequeue_finish(dq->r, 1);
> +
> + rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> + "%s(): Reclaimed token = %"PRIu64"\n",
> + __func__, *(uint64_t *)e);
> +
> + dq->free_fn(dq->p, e + __RTE_QSBR_TOKEN_SIZE);
> +
> + cnt++;
> + }
> +
> + rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> + "%s(): Reclaimed %u resources\n", __func__, cnt);
> +
> + if (freed != NULL)
> + *freed = cnt;
> + if (pending != NULL)
> + *pending = rte_ring_count(dq->r);
> +
> + return 0;
> +}
> +
> +/* Delete a defer queue. */
> +int
> +rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
> +{
> + unsigned int pending;
> +
> + if (dq == NULL) {
> + rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> + "%s(): Invalid input parameter\n", __func__);
> + rte_errno = EINVAL;
> +
> + return 1;
> + }
> +
> + /* Reclaim all the resources */
> + rte_rcu_qsbr_dq_reclaim(dq, ~0, NULL, &pending);
> + if (pending != 0) {
> + rte_errno = EAGAIN;
> +
> + return 1;
> + }
> +
> + rte_ring_free(dq->r);
> + rte_free(dq);
> +
> + return 0;
> +}
> +
> int rte_rcu_log_type;
>
> RTE_INIT(rte_rcu_register)
> diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h
> index 0b5585925..213f9b029 100644
> --- a/lib/librte_rcu/rte_rcu_qsbr.h
> +++ b/lib/librte_rcu/rte_rcu_qsbr.h
> @@ -34,6 +34,7 @@ extern "C" {
> #include <rte_lcore.h>
> #include <rte_debug.h>
> #include <rte_atomic.h>
> +#include <rte_ring.h>
>
> extern int rte_rcu_log_type;
>
> @@ -84,6 +85,7 @@ struct rte_rcu_qsbr_cnt {
> #define __RTE_QSBR_CNT_THR_OFFLINE 0
> #define __RTE_QSBR_CNT_INIT 1
> #define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
> +#define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
>
> /* RTE Quiescent State variable structure.
> * This structure has two elements that vary in size based on the
> @@ -114,6 +116,84 @@ struct rte_rcu_qsbr {
> */
> } __rte_cache_aligned;
>
> +/**
> + * Call back function called to free the resources.
> + *
> + * @param p
> + * Pointer provided while creating the defer queue
> + * @param e
> + * Pointer to the resource data stored on the defer queue
> + *
> + * @return
> + * None
> + */
> +typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e);
> +
> +#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
> +
> +/**
> + * Various flags supported.
> + */
> +/**< Enqueue and reclaim operations are multi-thread safe by default.
> + * The call back functions registered to free the resources are
> + * assumed to be multi-thread safe.
> + * Set this flag is multi-thread safety is not required.
> + */
> +#define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
> +
> +/**
> + * Parameters used when creating the defer queue.
> + */
> +struct rte_rcu_qsbr_dq_parameters {
> + const char *name;
> + /**< Name of the queue. */
> + uint32_t flags;
> + /**< Flags to control API behaviors */
> + uint32_t size;
> + /**< Number of entries in queue. Typically, this will be
> + * the same as the maximum number of entries supported in the
> + * lock free data structure.
> + * Data structures with unbounded number of entries is not
> + * supported currently.
> + */
> + uint32_t esize;
> + /**< Size (in bytes) of each element in the defer queue.
> + * This has to be multiple of 4B.
> + */
> + uint32_t trigger_reclaim_limit;
> + /**< Trigger automatic reclamation after the defer queue
> + * has atleast these many resources waiting. This auto
> + * reclamation is triggered in rte_rcu_qsbr_dq_enqueue API
> + * call.
> + * If this is greater than 'size', auto reclamation is
> + * not triggered.
> + * If this is set to 0, auto reclamation is triggered
> + * in every call to rte_rcu_qsbr_dq_enqueue API.
> + */
> + uint32_t max_reclaim_size;
> + /**< When automatic reclamation is enabled, reclaim at the max
> + * these many resources. This should contain a valid value, if
> + * auto reclamation is on. Setting this to 'size' or greater will
> + * reclaim all possible resources currently on the defer queue.
> + */
> + rte_rcu_qsbr_free_resource_t free_fn;
> + /**< Function to call to free the resource. */
> + void *p;
> + /**< Pointer passed to the free function. Typically, this is the
> + * pointer to the data structure to which the resource to free
> + * belongs. This can be NULL.
> + */
> + struct rte_rcu_qsbr *v;
> + /**< RCU QSBR variable to use for this defer queue */
> +};
> +
> +/* RTE defer queue structure.
> + * This structure holds the defer queue. The defer queue is used to
> + * hold the deleted entries from the data structure that are not
> + * yet freed.
> + */
> +struct rte_rcu_qsbr_dq;
> +
> /**
> * @warning
> * @b EXPERIMENTAL: this API may change without prior notice
> @@ -692,6 +772,114 @@ __rte_experimental
> int
> rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
>
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Create a queue used to store the data structure elements that can
> + * be freed later. This queue is referred to as 'defer queue'.
> + *
> + * @param params
> + * Parameters to create a defer queue.
> + * @return
> + * On success - Valid pointer to defer queue
> + * On error - NULL
> + * Possible rte_errno codes are:
> + * - EINVAL - NULL parameters are passed
> + * - ENOMEM - Not enough memory
> + */
> +__rte_experimental
> +struct rte_rcu_qsbr_dq *
> +rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Enqueue one resource to the defer queue and start the grace period.
> + * The resource will be freed later after at least one grace period
> + * is over.
> + *
> + * If the defer queue is full, it will attempt to reclaim resources.
> + * It will also reclaim resources at regular intervals to avoid
> + * the defer queue from growing too big.
> + *
> + * Multi-thread safety is provided as the defer queue configuration.
> + * When multi-thread safety is requested, it is possible that the
> + * resources are not stored in their order of deletion. This results
> + * in resources being held in the defer queue longer than they should.
> + *
> + * @param dq
> + * Defer queue to allocate an entry from.
> + * @param e
> + * Pointer to resource data to copy to the defer queue. The size of
> + * the data to copy is equal to the element size provided when the
> + * defer queue was created.
> + * @return
> + * On success - 0
> + * On error - 1 with rte_errno set to
> + * - EINVAL - NULL parameters are passed
> + * - ENOSPC - Defer queue is full. This condition can not happen
> + * if the defer queue size is equal (or larger) than the
> + * number of elements in the data structure.
> + */
> +__rte_experimental
> +int
> +rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Free quesed resources from the defer queue.
> + *
> + * This API is multi-thread safe.
> + *
> + * @param dq
> + * Defer queue to free an entry from.
> + * @param n
> + * Maximum number of resources to free.
> + * @param freed
> + * Number of resources that were freed.
> + * @param pending
> + * Number of resources pending on the defer queue. This number might not
> + * be acurate if multi-thread safety is configured.
> + * @return
> + * On successful reclamation of at least 1 resource - 0
> + * On error - 1 with rte_errno set to
> + * - EINVAL - NULL parameters are passed
> + */
> +__rte_experimental
> +int
> +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
> + unsigned int *freed, unsigned int *pending);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Delete a defer queue.
> + *
> + * It tries to reclaim all the resources on the defer queue.
> + * If any of the resources have not completed the grace period
> + * the reclamation stops and returns immediately. The rest of
> + * the resources are not reclaimed and the defer queue is not
> + * freed.
> + *
> + * @param dq
> + * Defer queue to delete.
> + * @return
> + * On success - 0
> + * On error - 1
> + * Possible rte_errno codes are:
> + * - EINVAL - NULL parameters are passed
> + * - EAGAIN - Some of the resources have not completed at least 1 grace
> + * period, try again.
> + */
> +__rte_experimental
> +int
> +rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
> +
> #ifdef __cplusplus
> }
> #endif
> diff --git a/lib/librte_rcu/rte_rcu_version.map b/lib/librte_rcu/rte_rcu_version.map
> index f8b9ef2ab..dfac88a37 100644
> --- a/lib/librte_rcu/rte_rcu_version.map
> +++ b/lib/librte_rcu/rte_rcu_version.map
> @@ -8,6 +8,10 @@ EXPERIMENTAL {
> rte_rcu_qsbr_synchronize;
> rte_rcu_qsbr_thread_register;
> rte_rcu_qsbr_thread_unregister;
> + rte_rcu_qsbr_dq_create;
> + rte_rcu_qsbr_dq_enqueue;
> + rte_rcu_qsbr_dq_reclaim;
> + rte_rcu_qsbr_dq_delete;
>
> local: *;
> };
> diff --git a/lib/meson.build b/lib/meson.build
> index 9c3cc55d5..15e91a303 100644
> --- a/lib/meson.build
> +++ b/lib/meson.build
> @@ -11,7 +11,9 @@
> libraries = [
> 'kvargs', # eal depends on kvargs
> 'eal', # everything depends on eal
> - 'ring', 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
> + 'ring',
> + 'rcu', # rcu depends on ring
> + 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
> 'cmdline',
> 'metrics', # bitrate/latency stats depends on this
> 'hash', # efd depends on this
> @@ -22,7 +24,7 @@ libraries = [
> 'gro', 'gso', 'ip_frag', 'jobstats',
> 'kni', 'latencystats', 'lpm', 'member',
> 'power', 'pdump', 'rawdev',
> - 'rcu', 'rib', 'reorder', 'sched', 'security', 'stack', 'vhost',
> + 'rib', 'reorder', 'sched', 'security', 'stack', 'vhost',
> # ipsec lib depends on net, crypto and security
> 'ipsec',
> #fib lib depends on rib
> --
> 2.17.1
<snip>
>
> > Add resource reclamation APIs to make it simple for applications and
> > libraries to integrate rte_rcu library.
>
> Few nits, thoughts, please see below.
> Apart from that - LGTM.
> Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
>
> >
> > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> > Reviewed-by: Ola Liljedhal <ola.liljedhal@arm.com>
> > Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
> > ---
> > lib/librte_rcu/Makefile | 2 +-
> > lib/librte_rcu/meson.build | 2 +
> > lib/librte_rcu/rcu_qsbr_pvt.h | 57 +++++++
> > lib/librte_rcu/rte_rcu_qsbr.c | 243 ++++++++++++++++++++++++++++-
> > lib/librte_rcu/rte_rcu_qsbr.h | 188 ++++++++++++++++++++++
> > lib/librte_rcu/rte_rcu_version.map | 4 +
> > lib/meson.build | 6 +-
> > 7 files changed, 498 insertions(+), 4 deletions(-) create mode
> > 100644 lib/librte_rcu/rcu_qsbr_pvt.h
> >
> > diff --git a/lib/librte_rcu/Makefile b/lib/librte_rcu/Makefile index
> > c4bb28d77..95f8a57e2 100644
> > --- a/lib/librte_rcu/Makefile
> > +++ b/lib/librte_rcu/Makefile
> > @@ -8,7 +8,7 @@ LIB = librte_rcu.a
> >
> > CFLAGS += -DALLOW_EXPERIMENTAL_API
> > CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 -LDLIBS += -lrte_eal
> > +LDLIBS += -lrte_eal -lrte_ring
> >
> > EXPORT_MAP := rte_rcu_version.map
> >
> > diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build
> > index 62920ba02..e280b29c1 100644
> > --- a/lib/librte_rcu/meson.build
> > +++ b/lib/librte_rcu/meson.build
> > @@ -10,3 +10,5 @@ headers = files('rte_rcu_qsbr.h') if cc.get_id() ==
> > 'clang' and dpdk_conf.get('RTE_ARCH_64') == false
> > ext_deps += cc.find_library('atomic') endif
> > +
> > +deps += ['ring']
> > diff --git a/lib/librte_rcu/rcu_qsbr_pvt.h
> > b/lib/librte_rcu/rcu_qsbr_pvt.h new file mode 100644 index
> > 000000000..413f28587
> > --- /dev/null
> > +++ b/lib/librte_rcu/rcu_qsbr_pvt.h
> > @@ -0,0 +1,57 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + * Copyright (c) 2019 Arm Limited
> > + */
> > +
> > +#ifndef _RTE_RCU_QSBR_PVT_H_
> > +#define _RTE_RCU_QSBR_PVT_H_
> > +
> > +/**
> > + * This file is private to the RCU library. It should not be included
> > + * by the user of this library.
> > + */
> > +
> > +#ifdef __cplusplus
> > +extern "C" {
> > +#endif
> > +
> > +#include <rte_ring.h>
> > +#include <rte_ring_elem.h>
> > +
> > +#include "rte_rcu_qsbr.h"
> > +
> > +/* RTE defer queue structure.
> > + * This structure holds the defer queue. The defer queue is used to
> > + * hold the deleted entries from the data structure that are not
> > + * yet freed.
> > + */
> > +struct rte_rcu_qsbr_dq {
> > + struct rte_rcu_qsbr *v; /**< RCU QSBR variable used by this queue.*/
> > + struct rte_ring *r; /**< RCU QSBR defer queue. */
> > + uint32_t size;
> > + /**< Number of elements in the defer queue */
> > + uint32_t esize;
> > + /**< Size (in bytes) of data, including the token, stored on the
> > + * defer queue.
> > + */
> > + uint32_t trigger_reclaim_limit;
> > + /**< Trigger automatic reclamation after the defer queue
> > + * has atleast these many resources waiting.
> > + */
> > + uint32_t max_reclaim_size;
> > + /**< Reclaim at the max these many resources during auto
> > + * reclamation.
> > + */
> > + rte_rcu_qsbr_free_resource_t free_fn;
> > + /**< Function to call to free the resource. */
> > + void *p;
> > + /**< Pointer passed to the free function. Typically, this is the
> > + * pointer to the data structure to which the resource to free
> > + * belongs.
> > + */
> > +};
> > +
> > +#ifdef __cplusplus
> > +}
> > +#endif
> > +
> > +#endif /* _RTE_RCU_QSBR_PVT_H_ */
> > diff --git a/lib/librte_rcu/rte_rcu_qsbr.c
> > b/lib/librte_rcu/rte_rcu_qsbr.c index 2f3fad776..e8c1e386f 100644
> > --- a/lib/librte_rcu/rte_rcu_qsbr.c
> > +++ b/lib/librte_rcu/rte_rcu_qsbr.c
> > @@ -1,6 +1,6 @@
> > /* SPDX-License-Identifier: BSD-3-Clause
> > *
> > - * Copyright (c) 2018 Arm Limited
> > + * Copyright (c) 2018-2019 Arm Limited
> > */
> >
> > #include <stdio.h>
> > @@ -18,8 +18,10 @@
> > #include <rte_per_lcore.h>
> > #include <rte_lcore.h>
> > #include <rte_errno.h>
> > +#include <rte_ring_elem.h>
> >
> > #include "rte_rcu_qsbr.h"
> > +#include "rcu_qsbr_pvt.h"
> >
> > /* Get the memory size of QSBR variable */ size_t @@ -270,6 +272,245
> > @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
> > return 0;
> > }
> >
> > +/* Create a queue used to store the data structure elements that can
> > + * be freed later. This queue is referred to as 'defer queue'.
> > + */
> > +struct rte_rcu_qsbr_dq *
> > +rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters
> > +*params) {
> > + struct rte_rcu_qsbr_dq *dq;
> > + uint32_t qs_fifo_size;
> > + unsigned int flags;
> > +
> > + if (params == NULL || params->free_fn == NULL ||
> > + params->v == NULL || params->name == NULL ||
> > + params->size == 0 || params->esize == 0 ||
> > + (params->esize % 4 != 0)) {
> > + rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > + "%s(): Invalid input parameter\n", __func__);
> > + rte_errno = EINVAL;
> > +
> > + return NULL;
> > + }
> > + /* If auto reclamation is configured, reclaim limit
> > + * should be a valid value.
> > + */
> > + if ((params->trigger_reclaim_limit <= params->size) &&
> > + (params->max_reclaim_size == 0)) {
> > + rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > + "%s(): Invalid input parameter, size = %u,
> trigger_reclaim_limit = %u, max_reclaim_size = %u\n",
> > + __func__, params->size, params-
> >trigger_reclaim_limit,
> > + params->max_reclaim_size);
> > + rte_errno = EINVAL;
> > +
> > + return NULL;
> > + }
> > +
> > + dq = rte_zmalloc(NULL, sizeof(struct rte_rcu_qsbr_dq),
> > + RTE_CACHE_LINE_SIZE);
> > + if (dq == NULL) {
> > + rte_errno = ENOMEM;
> > +
> > + return NULL;
> > + }
> > +
> > + /* Decide the flags for the ring.
> > + * If MT safety is requested, use RTS for ring enqueue as most
> > + * use cases involve dq-enqueue happening on the control plane.
> > + * Ring dequeue is always HTS due to the possibility of revert.
> > + */
> > + flags = RING_F_MP_RTS_ENQ;
> > + if (params->flags & RTE_RCU_QSBR_DQ_MT_UNSAFE)
> > + flags = RING_F_SP_ENQ;
> > + flags |= RING_F_MC_HTS_DEQ;
> > + /* round up qs_fifo_size to next power of two that is not less than
> > + * max_size.
> > + */
> > + qs_fifo_size = rte_align32pow2(params->size + 1);
> > + /* Add token size to ring element size */
> > + dq->r = rte_ring_create_elem(params->name,
> > + __RTE_QSBR_TOKEN_SIZE + params->esize,
> > + qs_fifo_size, SOCKET_ID_ANY, flags);
> > + if (dq->r == NULL) {
> > + rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > + "%s(): defer queue create failed\n", __func__);
> > + rte_free(dq);
> > + return NULL;
> > + }
> > +
> > + dq->v = params->v;
> > + dq->size = params->size;
> > + dq->esize = __RTE_QSBR_TOKEN_SIZE + params->esize;
> > + dq->trigger_reclaim_limit = params->trigger_reclaim_limit;
> > + dq->max_reclaim_size = params->max_reclaim_size;
> > + dq->free_fn = params->free_fn;
> > + dq->p = params->p;
> > +
> > + return dq;
> > +}
> > +
> > +/* Enqueue one resource to the defer queue to free after the grace
> > + * period is over.
> > + */
> > +int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e) {
> > + uint64_t token;
> > + uint32_t cur_size, free_size;
> > +
> > + if (dq == NULL || e == NULL) {
> > + rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > + "%s(): Invalid input parameter\n", __func__);
> > + rte_errno = EINVAL;
> > +
> > + return 1;
> > + }
> > +
> > + /* Start the grace period */
> > + token = rte_rcu_qsbr_start(dq->v);
> > +
> > + /* Reclaim resources if the queue is 1/8th full. This helps
>
> Comment about 1/8 is probably left from older version?
> As I understand now it is configurable parameter.
Ack, will correct this.
>
> > + * the queue from growing too large and allows time for reader
> > + * threads to report their quiescent state.
> > + */
> > + cur_size = rte_ring_count(dq->r);
> > + if (cur_size > dq->trigger_reclaim_limit) {
> > + rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> > + "%s(): Triggering reclamation\n", __func__);
> > + rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size, NULL,
> NULL);
> > + }
> > +
> > + /* Check if there is space for atleast 1 resource */
> > + free_size = rte_ring_free_count(dq->r);
> > + if (!free_size) {
>
> Is there any point to do this check at all?
> You are doing enqueue below and handle situation with not enough space in
> the ring anyway.
Ack
>
> > + rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > + "%s(): Defer queue is full\n", __func__);
> > + /* Note that the token generated above is not used.
> > + * Other than wasting tokens, it should not cause any
> > + * other issues.
> > + */
> > + rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> > + "%s(): Skipped enqueuing token = %"PRIu64"\n",
> > + __func__, token);
> > +
> > + rte_errno = ENOSPC;
> > + return 1;
> > + }
> > +
> > + /* Enqueue the token and resource. Generating the token
> > + * and enqueuing (token + resource) on the queue is not an
> > + * atomic operation. This might result in tokens enqueued
> > + * out of order on the queue. So, some tokens might wait
> > + * longer than they are required to be reclaimed.
> > + */
> > + char data[dq->esize];
> > + memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);
> > + memcpy(data + __RTE_QSBR_TOKEN_SIZE, e,
> > + dq->esize - __RTE_QSBR_TOKEN_SIZE);
> > + /* Check the status as enqueue might fail since the other thread
> > + * might have used up the freed space.
> > + * Enqueue uses the configured flags when the DQ was created.
> > + */
> > + if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {
> > + rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > + "%s(): Enqueue failed\n", __func__);
> > + /* Note that the token generated above is not used.
> > + * Other than wasting tokens, it should not cause any
> > + * other issues.
> > + */
> > + rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> > + "%s(): Skipped enqueuing token = %"PRIu64"\n",
> > + __func__, token);
> > +
> > + rte_errno = ENOSPC;
> > + return 1;
> > + }
>
>
> Just as a thought: in theory if we'll use MP_HTS(/SP) ring we can avoid
> wasting RCU tokens:
>
> if (rte_ring_enqueue_elem_bulk_start(dq->r, 1, NULL) != 0) {
> token = rte_rcu_qsbr_start(dq->v);
> memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);
> rte_ring_enqueue_elem_finish(dq->r, data, dq->esize, 1); }
>
> Though it might slowdown things if we'll have a lot of parallel dq_enqueue.
> So not sure is it worth it or not.
Adding peek APIs for RTS would be better. That should take care of the parallel dw_enqueue. Not sure if I gave you the comment. My ring patch supported these APIs.
>
> > +
> > + rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> > + "%s(): Enqueued token = %"PRIu64"\n", __func__, token);
> > +
> > + return 0;
> > +}
> > +
> > +/* Reclaim resources from the defer queue. */ int
> > +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
> > + unsigned int *freed, unsigned int *pending) {
> > + uint32_t cnt;
> > + uint64_t token;
> > +
> > + if (dq == NULL || n == 0) {
> > + rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > + "%s(): Invalid input parameter\n", __func__);
> > + rte_errno = EINVAL;
> > +
> > + return 1;
> > + }
> > +
> > + cnt = 0;
> > +
> > + char e[dq->esize];
> > + /* Check reader threads quiescent state and reclaim resources */
> > + while ((cnt < n) &&
> > + (rte_ring_dequeue_bulk_elem_start(dq->r, e,
> > + dq->esize, 1, NULL) != 0)) {
>
> Another thought - any point to use burst_elem_start() here to retrieve more
> then 1 elem in one go? Something like:
I think it makes sense.
> char e[32][dq->size];
> while ((cnt < n) {
> k = RTE_MAX(32, cnt - n);
> k = rte_ring_dequeue_burst_elem_start(dq->r, e, dq->esize, k, NULL);
> if (k = 0)
> break;
> for (i = 0; i != k; i++) {
> memcpy(&token, e[i], sizeof(uint64_t));
> if (rte_rcu_qsbr_check(dq->v, token, false) != 1)
> break;
> }
> k = i;
> rte_ring_dequeue_elem_finish(dq->r, k);
> for (i = 0; i != k; i++)
> dq->free_fn(dq->p, e[i] + __RTE_QSBR_TOKEN_SIZE);
I think it also makes sense to change the free_fn to take 'n' number of tokens.
> n += k;
> if (k == 0)
> break;
>
> ?
> Also if at enqueue we guarantee strict ordrer (via
> enqueue_start/enqueue_finish), then here we probably can do _check_ from
> the last retrieved token here?
> In theory that might help to minimize number of checks.
> I.E. do:
> for (i = k; i-- !=0; ) {
> memcpy(&token, e[i], sizeof(uint64_t));
> if (rte_rcu_qsbr_check(dq->v, token, false) != 1)
There is a higher chance that later tokens are not acked. This introduces more polling of the counters.
The rte_rcu_qsbr_check has an optimization. While acking the current token, it will also caches the greatest token acked. It uses the cached token for the subsequent calls. I think this provides a better optimization.
> break;
> }
> k = i + 1;
> ...
>
> > + memcpy(&token, e, sizeof(uint64_t));
> > +
> > + /* Reclaim the resource */
> > + if (rte_rcu_qsbr_check(dq->v, token, false) != 1) {
> > + rte_ring_dequeue_finish(dq->r, 0);
> > + break;
> > + }
> > + rte_ring_dequeue_finish(dq->r, 1);
> > +
> > + rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> > + "%s(): Reclaimed token = %"PRIu64"\n",
> > + __func__, *(uint64_t *)e);
> > +
> > + dq->free_fn(dq->p, e + __RTE_QSBR_TOKEN_SIZE);
> > +
> > + cnt++;
> > + }
> > +
> > + rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> > + "%s(): Reclaimed %u resources\n", __func__, cnt);
> > +
> > + if (freed != NULL)
> > + *freed = cnt;
> > + if (pending != NULL)
> > + *pending = rte_ring_count(dq->r);
> > +
> > + return 0;
> > +}
> > +
> > +/* Delete a defer queue. */
> > +int
> > +rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq) {
> > + unsigned int pending;
> > +
> > + if (dq == NULL) {
> > + rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > + "%s(): Invalid input parameter\n", __func__);
> > + rte_errno = EINVAL;
> > +
> > + return 1;
> > + }
> > +
> > + /* Reclaim all the resources */
> > + rte_rcu_qsbr_dq_reclaim(dq, ~0, NULL, &pending);
> > + if (pending != 0) {
> > + rte_errno = EAGAIN;
> > +
> > + return 1;
> > + }
> > +
> > + rte_ring_free(dq->r);
> > + rte_free(dq);
> > +
> > + return 0;
> > +}
> > +
> > int rte_rcu_log_type;
> >
> > RTE_INIT(rte_rcu_register)
> > diff --git a/lib/librte_rcu/rte_rcu_qsbr.h
> > b/lib/librte_rcu/rte_rcu_qsbr.h index 0b5585925..213f9b029 100644
> > --- a/lib/librte_rcu/rte_rcu_qsbr.h
> > +++ b/lib/librte_rcu/rte_rcu_qsbr.h
> > @@ -34,6 +34,7 @@ extern "C" {
> > #include <rte_lcore.h>
> > #include <rte_debug.h>
> > #include <rte_atomic.h>
> > +#include <rte_ring.h>
> >
> > extern int rte_rcu_log_type;
> >
> > @@ -84,6 +85,7 @@ struct rte_rcu_qsbr_cnt { #define
> > __RTE_QSBR_CNT_THR_OFFLINE 0 #define __RTE_QSBR_CNT_INIT 1
> #define
> > __RTE_QSBR_CNT_MAX ((uint64_t)~0)
> > +#define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
> >
> > /* RTE Quiescent State variable structure.
> > * This structure has two elements that vary in size based on the @@
> > -114,6 +116,84 @@ struct rte_rcu_qsbr {
> > */
> > } __rte_cache_aligned;
> >
> > +/**
> > + * Call back function called to free the resources.
> > + *
> > + * @param p
> > + * Pointer provided while creating the defer queue
> > + * @param e
> > + * Pointer to the resource data stored on the defer queue
> > + *
> > + * @return
> > + * None
> > + */
> > +typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e);
> > +
> > +#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
> > +
> > +/**
> > + * Various flags supported.
> > + */
> > +/**< Enqueue and reclaim operations are multi-thread safe by default.
> > + * The call back functions registered to free the resources are
> > + * assumed to be multi-thread safe.
> > + * Set this flag is multi-thread safety is not required.
> > + */
> > +#define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
> > +
> > +/**
> > + * Parameters used when creating the defer queue.
> > + */
> > +struct rte_rcu_qsbr_dq_parameters {
> > + const char *name;
> > + /**< Name of the queue. */
> > + uint32_t flags;
> > + /**< Flags to control API behaviors */
> > + uint32_t size;
> > + /**< Number of entries in queue. Typically, this will be
> > + * the same as the maximum number of entries supported in the
> > + * lock free data structure.
> > + * Data structures with unbounded number of entries is not
> > + * supported currently.
> > + */
> > + uint32_t esize;
> > + /**< Size (in bytes) of each element in the defer queue.
> > + * This has to be multiple of 4B.
> > + */
> > + uint32_t trigger_reclaim_limit;
> > + /**< Trigger automatic reclamation after the defer queue
> > + * has atleast these many resources waiting. This auto
> > + * reclamation is triggered in rte_rcu_qsbr_dq_enqueue API
> > + * call.
> > + * If this is greater than 'size', auto reclamation is
> > + * not triggered.
> > + * If this is set to 0, auto reclamation is triggered
> > + * in every call to rte_rcu_qsbr_dq_enqueue API.
> > + */
> > + uint32_t max_reclaim_size;
> > + /**< When automatic reclamation is enabled, reclaim at the max
> > + * these many resources. This should contain a valid value, if
> > + * auto reclamation is on. Setting this to 'size' or greater will
> > + * reclaim all possible resources currently on the defer queue.
> > + */
> > + rte_rcu_qsbr_free_resource_t free_fn;
> > + /**< Function to call to free the resource. */
> > + void *p;
> > + /**< Pointer passed to the free function. Typically, this is the
> > + * pointer to the data structure to which the resource to free
> > + * belongs. This can be NULL.
> > + */
> > + struct rte_rcu_qsbr *v;
> > + /**< RCU QSBR variable to use for this defer queue */ };
> > +
> > +/* RTE defer queue structure.
> > + * This structure holds the defer queue. The defer queue is used to
> > + * hold the deleted entries from the data structure that are not
> > + * yet freed.
> > + */
> > +struct rte_rcu_qsbr_dq;
> > +
> > /**
> > * @warning
> > * @b EXPERIMENTAL: this API may change without prior notice @@
> > -692,6 +772,114 @@ __rte_experimental int rte_rcu_qsbr_dump(FILE *f,
> > struct rte_rcu_qsbr *v);
> >
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Create a queue used to store the data structure elements that can
> > + * be freed later. This queue is referred to as 'defer queue'.
> > + *
> > + * @param params
> > + * Parameters to create a defer queue.
> > + * @return
> > + * On success - Valid pointer to defer queue
> > + * On error - NULL
> > + * Possible rte_errno codes are:
> > + * - EINVAL - NULL parameters are passed
> > + * - ENOMEM - Not enough memory
> > + */
> > +__rte_experimental
> > +struct rte_rcu_qsbr_dq *
> > +rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters
> > +*params);
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Enqueue one resource to the defer queue and start the grace period.
> > + * The resource will be freed later after at least one grace period
> > + * is over.
> > + *
> > + * If the defer queue is full, it will attempt to reclaim resources.
> > + * It will also reclaim resources at regular intervals to avoid
> > + * the defer queue from growing too big.
> > + *
> > + * Multi-thread safety is provided as the defer queue configuration.
> > + * When multi-thread safety is requested, it is possible that the
> > + * resources are not stored in their order of deletion. This results
> > + * in resources being held in the defer queue longer than they should.
> > + *
> > + * @param dq
> > + * Defer queue to allocate an entry from.
> > + * @param e
> > + * Pointer to resource data to copy to the defer queue. The size of
> > + * the data to copy is equal to the element size provided when the
> > + * defer queue was created.
> > + * @return
> > + * On success - 0
> > + * On error - 1 with rte_errno set to
> > + * - EINVAL - NULL parameters are passed
> > + * - ENOSPC - Defer queue is full. This condition can not happen
> > + * if the defer queue size is equal (or larger) than the
> > + * number of elements in the data structure.
> > + */
> > +__rte_experimental
> > +int
> > +rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Free quesed resources from the defer queue.
> > + *
> > + * This API is multi-thread safe.
> > + *
> > + * @param dq
> > + * Defer queue to free an entry from.
> > + * @param n
> > + * Maximum number of resources to free.
> > + * @param freed
> > + * Number of resources that were freed.
> > + * @param pending
> > + * Number of resources pending on the defer queue. This number might
> not
> > + * be acurate if multi-thread safety is configured.
> > + * @return
> > + * On successful reclamation of at least 1 resource - 0
> > + * On error - 1 with rte_errno set to
> > + * - EINVAL - NULL parameters are passed
> > + */
> > +__rte_experimental
> > +int
> > +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
> > + unsigned int *freed, unsigned int *pending);
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Delete a defer queue.
> > + *
> > + * It tries to reclaim all the resources on the defer queue.
> > + * If any of the resources have not completed the grace period
> > + * the reclamation stops and returns immediately. The rest of
> > + * the resources are not reclaimed and the defer queue is not
> > + * freed.
> > + *
> > + * @param dq
> > + * Defer queue to delete.
> > + * @return
> > + * On success - 0
> > + * On error - 1
> > + * Possible rte_errno codes are:
> > + * - EINVAL - NULL parameters are passed
> > + * - EAGAIN - Some of the resources have not completed at least 1 grace
> > + * period, try again.
> > + */
> > +__rte_experimental
> > +int
> > +rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
> > +
> > #ifdef __cplusplus
> > }
> > #endif
> > diff --git a/lib/librte_rcu/rte_rcu_version.map
> > b/lib/librte_rcu/rte_rcu_version.map
> > index f8b9ef2ab..dfac88a37 100644
> > --- a/lib/librte_rcu/rte_rcu_version.map
> > +++ b/lib/librte_rcu/rte_rcu_version.map
> > @@ -8,6 +8,10 @@ EXPERIMENTAL {
> > rte_rcu_qsbr_synchronize;
> > rte_rcu_qsbr_thread_register;
> > rte_rcu_qsbr_thread_unregister;
> > + rte_rcu_qsbr_dq_create;
> > + rte_rcu_qsbr_dq_enqueue;
> > + rte_rcu_qsbr_dq_reclaim;
> > + rte_rcu_qsbr_dq_delete;
> >
> > local: *;
> > };
> > diff --git a/lib/meson.build b/lib/meson.build index
> > 9c3cc55d5..15e91a303 100644
> > --- a/lib/meson.build
> > +++ b/lib/meson.build
> > @@ -11,7 +11,9 @@
> > libraries = [
> > 'kvargs', # eal depends on kvargs
> > 'eal', # everything depends on eal
> > - 'ring', 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
> > + 'ring',
> > + 'rcu', # rcu depends on ring
> > + 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
> > 'cmdline',
> > 'metrics', # bitrate/latency stats depends on this
> > 'hash', # efd depends on this
> > @@ -22,7 +24,7 @@ libraries = [
> > 'gro', 'gso', 'ip_frag', 'jobstats',
> > 'kni', 'latencystats', 'lpm', 'member',
> > 'power', 'pdump', 'rawdev',
> > - 'rcu', 'rib', 'reorder', 'sched', 'security', 'stack', 'vhost',
> > + 'rib', 'reorder', 'sched', 'security', 'stack', 'vhost',
> > # ipsec lib depends on net, crypto and security
> > 'ipsec',
> > #fib lib depends on rib
> > --
> > 2.17.1
> > > +
> > > + /* Enqueue the token and resource. Generating the token
> > > + * and enqueuing (token + resource) on the queue is not an
> > > + * atomic operation. This might result in tokens enqueued
> > > + * out of order on the queue. So, some tokens might wait
> > > + * longer than they are required to be reclaimed.
> > > + */
> > > + char data[dq->esize];
> > > + memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);
> > > + memcpy(data + __RTE_QSBR_TOKEN_SIZE, e,
> > > + dq->esize - __RTE_QSBR_TOKEN_SIZE);
> > > + /* Check the status as enqueue might fail since the other thread
> > > + * might have used up the freed space.
> > > + * Enqueue uses the configured flags when the DQ was created.
> > > + */
> > > + if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {
> > > + rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > > + "%s(): Enqueue failed\n", __func__);
> > > + /* Note that the token generated above is not used.
> > > + * Other than wasting tokens, it should not cause any
> > > + * other issues.
> > > + */
> > > + rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> > > + "%s(): Skipped enqueuing token = %"PRIu64"\n",
> > > + __func__, token);
> > > +
> > > + rte_errno = ENOSPC;
> > > + return 1;
> > > + }
> >
> >
> > Just as a thought: in theory if we'll use MP_HTS(/SP) ring we can avoid
> > wasting RCU tokens:
> >
> > if (rte_ring_enqueue_elem_bulk_start(dq->r, 1, NULL) != 0) {
> > token = rte_rcu_qsbr_start(dq->v);
> > memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);
> > rte_ring_enqueue_elem_finish(dq->r, data, dq->esize, 1); }
> >
> > Though it might slowdown things if we'll have a lot of parallel dq_enqueue.
> > So not sure is it worth it or not.
> Adding peek APIs for RTS would be better. That should take care of the parallel dw_enqueue. Not sure if I gave you the comment. My ring
> patch supported these APIs.
AFAIK, peek API is not possible for RTS mode.
Probably you are talking about Scatter-Gather API introduced in your RFC
(_reserve_; update ring entries manually; _commit_)?
Anyway, if there is no much value in my idea above, then feel free to drop it.
>
> >
> > > +
> > > + rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> > > + "%s(): Enqueued token = %"PRIu64"\n", __func__, token);
> > > +
> > > + return 0;
> > > +}
> > > +
> > > +/* Reclaim resources from the defer queue. */ int
> > > +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
> > > + unsigned int *freed, unsigned int *pending) {
> > > + uint32_t cnt;
> > > + uint64_t token;
> > > +
> > > + if (dq == NULL || n == 0) {
> > > + rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > > + "%s(): Invalid input parameter\n", __func__);
> > > + rte_errno = EINVAL;
> > > +
> > > + return 1;
> > > + }
> > > +
> > > + cnt = 0;
> > > +
> > > + char e[dq->esize];
> > > + /* Check reader threads quiescent state and reclaim resources */
> > > + while ((cnt < n) &&
> > > + (rte_ring_dequeue_bulk_elem_start(dq->r, e,
> > > + dq->esize, 1, NULL) != 0)) {
> >
> > Another thought - any point to use burst_elem_start() here to retrieve more
> > then 1 elem in one go? Something like:
> I think it makes sense.
>
> > char e[32][dq->size];
> > while ((cnt < n) {
> > k = RTE_MAX(32, cnt - n);
> > k = rte_ring_dequeue_burst_elem_start(dq->r, e, dq->esize, k, NULL);
> > if (k = 0)
> > break;
> > for (i = 0; i != k; i++) {
> > memcpy(&token, e[i], sizeof(uint64_t));
> > if (rte_rcu_qsbr_check(dq->v, token, false) != 1)
> > break;
> > }
> > k = i;
> > rte_ring_dequeue_elem_finish(dq->r, k);
> > for (i = 0; i != k; i++)
> > dq->free_fn(dq->p, e[i] + __RTE_QSBR_TOKEN_SIZE);
> I think it also makes sense to change the free_fn to take 'n' number of tokens.
>
> > n += k;
> > if (k == 0)
> > break;
> >
> > ?
> > Also if at enqueue we guarantee strict ordrer (via
> > enqueue_start/enqueue_finish), then here we probably can do _check_ from
> > the last retrieved token here?
> > In theory that might help to minimize number of checks.
> > I.E. do:
> > for (i = k; i-- !=0; ) {
> > memcpy(&token, e[i], sizeof(uint64_t));
> > if (rte_rcu_qsbr_check(dq->v, token, false) != 1)
> There is a higher chance that later tokens are not acked. This introduces more polling of the counters.
> The rte_rcu_qsbr_check has an optimization. While acking the current token, it will also caches the greatest token acked. It uses the cached
> token for the subsequent calls. I think this provides a better optimization.
Ok.
@@ -8,7 +8,7 @@ LIB = librte_rcu.a
CFLAGS += -DALLOW_EXPERIMENTAL_API
CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
-LDLIBS += -lrte_eal
+LDLIBS += -lrte_eal -lrte_ring
EXPORT_MAP := rte_rcu_version.map
@@ -10,3 +10,5 @@ headers = files('rte_rcu_qsbr.h')
if cc.get_id() == 'clang' and dpdk_conf.get('RTE_ARCH_64') == false
ext_deps += cc.find_library('atomic')
endif
+
+deps += ['ring']
new file mode 100644
@@ -0,0 +1,57 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright (c) 2019 Arm Limited
+ */
+
+#ifndef _RTE_RCU_QSBR_PVT_H_
+#define _RTE_RCU_QSBR_PVT_H_
+
+/**
+ * This file is private to the RCU library. It should not be included
+ * by the user of this library.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <rte_ring.h>
+#include <rte_ring_elem.h>
+
+#include "rte_rcu_qsbr.h"
+
+/* RTE defer queue structure.
+ * This structure holds the defer queue. The defer queue is used to
+ * hold the deleted entries from the data structure that are not
+ * yet freed.
+ */
+struct rte_rcu_qsbr_dq {
+ struct rte_rcu_qsbr *v; /**< RCU QSBR variable used by this queue.*/
+ struct rte_ring *r; /**< RCU QSBR defer queue. */
+ uint32_t size;
+ /**< Number of elements in the defer queue */
+ uint32_t esize;
+ /**< Size (in bytes) of data, including the token, stored on the
+ * defer queue.
+ */
+ uint32_t trigger_reclaim_limit;
+ /**< Trigger automatic reclamation after the defer queue
+ * has atleast these many resources waiting.
+ */
+ uint32_t max_reclaim_size;
+ /**< Reclaim at the max these many resources during auto
+ * reclamation.
+ */
+ rte_rcu_qsbr_free_resource_t free_fn;
+ /**< Function to call to free the resource. */
+ void *p;
+ /**< Pointer passed to the free function. Typically, this is the
+ * pointer to the data structure to which the resource to free
+ * belongs.
+ */
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_RCU_QSBR_PVT_H_ */
@@ -1,6 +1,6 @@
/* SPDX-License-Identifier: BSD-3-Clause
*
- * Copyright (c) 2018 Arm Limited
+ * Copyright (c) 2018-2019 Arm Limited
*/
#include <stdio.h>
@@ -18,8 +18,10 @@
#include <rte_per_lcore.h>
#include <rte_lcore.h>
#include <rte_errno.h>
+#include <rte_ring_elem.h>
#include "rte_rcu_qsbr.h"
+#include "rcu_qsbr_pvt.h"
/* Get the memory size of QSBR variable */
size_t
@@ -270,6 +272,245 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
return 0;
}
+/* Create a queue used to store the data structure elements that can
+ * be freed later. This queue is referred to as 'defer queue'.
+ */
+struct rte_rcu_qsbr_dq *
+rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
+{
+ struct rte_rcu_qsbr_dq *dq;
+ uint32_t qs_fifo_size;
+ unsigned int flags;
+
+ if (params == NULL || params->free_fn == NULL ||
+ params->v == NULL || params->name == NULL ||
+ params->size == 0 || params->esize == 0 ||
+ (params->esize % 4 != 0)) {
+ rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+ "%s(): Invalid input parameter\n", __func__);
+ rte_errno = EINVAL;
+
+ return NULL;
+ }
+ /* If auto reclamation is configured, reclaim limit
+ * should be a valid value.
+ */
+ if ((params->trigger_reclaim_limit <= params->size) &&
+ (params->max_reclaim_size == 0)) {
+ rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+ "%s(): Invalid input parameter, size = %u, trigger_reclaim_limit = %u, max_reclaim_size = %u\n",
+ __func__, params->size, params->trigger_reclaim_limit,
+ params->max_reclaim_size);
+ rte_errno = EINVAL;
+
+ return NULL;
+ }
+
+ dq = rte_zmalloc(NULL, sizeof(struct rte_rcu_qsbr_dq),
+ RTE_CACHE_LINE_SIZE);
+ if (dq == NULL) {
+ rte_errno = ENOMEM;
+
+ return NULL;
+ }
+
+ /* Decide the flags for the ring.
+ * If MT safety is requested, use RTS for ring enqueue as most
+ * use cases involve dq-enqueue happening on the control plane.
+ * Ring dequeue is always HTS due to the possibility of revert.
+ */
+ flags = RING_F_MP_RTS_ENQ;
+ if (params->flags & RTE_RCU_QSBR_DQ_MT_UNSAFE)
+ flags = RING_F_SP_ENQ;
+ flags |= RING_F_MC_HTS_DEQ;
+ /* round up qs_fifo_size to next power of two that is not less than
+ * max_size.
+ */
+ qs_fifo_size = rte_align32pow2(params->size + 1);
+ /* Add token size to ring element size */
+ dq->r = rte_ring_create_elem(params->name,
+ __RTE_QSBR_TOKEN_SIZE + params->esize,
+ qs_fifo_size, SOCKET_ID_ANY, flags);
+ if (dq->r == NULL) {
+ rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+ "%s(): defer queue create failed\n", __func__);
+ rte_free(dq);
+ return NULL;
+ }
+
+ dq->v = params->v;
+ dq->size = params->size;
+ dq->esize = __RTE_QSBR_TOKEN_SIZE + params->esize;
+ dq->trigger_reclaim_limit = params->trigger_reclaim_limit;
+ dq->max_reclaim_size = params->max_reclaim_size;
+ dq->free_fn = params->free_fn;
+ dq->p = params->p;
+
+ return dq;
+}
+
+/* Enqueue one resource to the defer queue to free after the grace
+ * period is over.
+ */
+int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
+{
+ uint64_t token;
+ uint32_t cur_size, free_size;
+
+ if (dq == NULL || e == NULL) {
+ rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+ "%s(): Invalid input parameter\n", __func__);
+ rte_errno = EINVAL;
+
+ return 1;
+ }
+
+ /* Start the grace period */
+ token = rte_rcu_qsbr_start(dq->v);
+
+ /* Reclaim resources if the queue is 1/8th full. This helps
+ * the queue from growing too large and allows time for reader
+ * threads to report their quiescent state.
+ */
+ cur_size = rte_ring_count(dq->r);
+ if (cur_size > dq->trigger_reclaim_limit) {
+ rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+ "%s(): Triggering reclamation\n", __func__);
+ rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size, NULL, NULL);
+ }
+
+ /* Check if there is space for atleast 1 resource */
+ free_size = rte_ring_free_count(dq->r);
+ if (!free_size) {
+ rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+ "%s(): Defer queue is full\n", __func__);
+ /* Note that the token generated above is not used.
+ * Other than wasting tokens, it should not cause any
+ * other issues.
+ */
+ rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+ "%s(): Skipped enqueuing token = %"PRIu64"\n",
+ __func__, token);
+
+ rte_errno = ENOSPC;
+ return 1;
+ }
+
+ /* Enqueue the token and resource. Generating the token
+ * and enqueuing (token + resource) on the queue is not an
+ * atomic operation. This might result in tokens enqueued
+ * out of order on the queue. So, some tokens might wait
+ * longer than they are required to be reclaimed.
+ */
+ char data[dq->esize];
+ memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);
+ memcpy(data + __RTE_QSBR_TOKEN_SIZE, e,
+ dq->esize - __RTE_QSBR_TOKEN_SIZE);
+ /* Check the status as enqueue might fail since the other thread
+ * might have used up the freed space.
+ * Enqueue uses the configured flags when the DQ was created.
+ */
+ if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {
+ rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+ "%s(): Enqueue failed\n", __func__);
+ /* Note that the token generated above is not used.
+ * Other than wasting tokens, it should not cause any
+ * other issues.
+ */
+ rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+ "%s(): Skipped enqueuing token = %"PRIu64"\n",
+ __func__, token);
+
+ rte_errno = ENOSPC;
+ return 1;
+ }
+
+ rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+ "%s(): Enqueued token = %"PRIu64"\n", __func__, token);
+
+ return 0;
+}
+
+/* Reclaim resources from the defer queue. */
+int
+rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
+ unsigned int *freed, unsigned int *pending)
+{
+ uint32_t cnt;
+ uint64_t token;
+
+ if (dq == NULL || n == 0) {
+ rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+ "%s(): Invalid input parameter\n", __func__);
+ rte_errno = EINVAL;
+
+ return 1;
+ }
+
+ cnt = 0;
+
+ char e[dq->esize];
+ /* Check reader threads quiescent state and reclaim resources */
+ while ((cnt < n) &&
+ (rte_ring_dequeue_bulk_elem_start(dq->r, e,
+ dq->esize, 1, NULL) != 0)) {
+ memcpy(&token, e, sizeof(uint64_t));
+
+ /* Reclaim the resource */
+ if (rte_rcu_qsbr_check(dq->v, token, false) != 1) {
+ rte_ring_dequeue_finish(dq->r, 0);
+ break;
+ }
+ rte_ring_dequeue_finish(dq->r, 1);
+
+ rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+ "%s(): Reclaimed token = %"PRIu64"\n",
+ __func__, *(uint64_t *)e);
+
+ dq->free_fn(dq->p, e + __RTE_QSBR_TOKEN_SIZE);
+
+ cnt++;
+ }
+
+ rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+ "%s(): Reclaimed %u resources\n", __func__, cnt);
+
+ if (freed != NULL)
+ *freed = cnt;
+ if (pending != NULL)
+ *pending = rte_ring_count(dq->r);
+
+ return 0;
+}
+
+/* Delete a defer queue. */
+int
+rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
+{
+ unsigned int pending;
+
+ if (dq == NULL) {
+ rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+ "%s(): Invalid input parameter\n", __func__);
+ rte_errno = EINVAL;
+
+ return 1;
+ }
+
+ /* Reclaim all the resources */
+ rte_rcu_qsbr_dq_reclaim(dq, ~0, NULL, &pending);
+ if (pending != 0) {
+ rte_errno = EAGAIN;
+
+ return 1;
+ }
+
+ rte_ring_free(dq->r);
+ rte_free(dq);
+
+ return 0;
+}
+
int rte_rcu_log_type;
RTE_INIT(rte_rcu_register)
@@ -34,6 +34,7 @@ extern "C" {
#include <rte_lcore.h>
#include <rte_debug.h>
#include <rte_atomic.h>
+#include <rte_ring.h>
extern int rte_rcu_log_type;
@@ -84,6 +85,7 @@ struct rte_rcu_qsbr_cnt {
#define __RTE_QSBR_CNT_THR_OFFLINE 0
#define __RTE_QSBR_CNT_INIT 1
#define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
+#define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
/* RTE Quiescent State variable structure.
* This structure has two elements that vary in size based on the
@@ -114,6 +116,84 @@ struct rte_rcu_qsbr {
*/
} __rte_cache_aligned;
+/**
+ * Call back function called to free the resources.
+ *
+ * @param p
+ * Pointer provided while creating the defer queue
+ * @param e
+ * Pointer to the resource data stored on the defer queue
+ *
+ * @return
+ * None
+ */
+typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e);
+
+#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
+
+/**
+ * Various flags supported.
+ */
+/**< Enqueue and reclaim operations are multi-thread safe by default.
+ * The call back functions registered to free the resources are
+ * assumed to be multi-thread safe.
+ * Set this flag is multi-thread safety is not required.
+ */
+#define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
+
+/**
+ * Parameters used when creating the defer queue.
+ */
+struct rte_rcu_qsbr_dq_parameters {
+ const char *name;
+ /**< Name of the queue. */
+ uint32_t flags;
+ /**< Flags to control API behaviors */
+ uint32_t size;
+ /**< Number of entries in queue. Typically, this will be
+ * the same as the maximum number of entries supported in the
+ * lock free data structure.
+ * Data structures with unbounded number of entries is not
+ * supported currently.
+ */
+ uint32_t esize;
+ /**< Size (in bytes) of each element in the defer queue.
+ * This has to be multiple of 4B.
+ */
+ uint32_t trigger_reclaim_limit;
+ /**< Trigger automatic reclamation after the defer queue
+ * has atleast these many resources waiting. This auto
+ * reclamation is triggered in rte_rcu_qsbr_dq_enqueue API
+ * call.
+ * If this is greater than 'size', auto reclamation is
+ * not triggered.
+ * If this is set to 0, auto reclamation is triggered
+ * in every call to rte_rcu_qsbr_dq_enqueue API.
+ */
+ uint32_t max_reclaim_size;
+ /**< When automatic reclamation is enabled, reclaim at the max
+ * these many resources. This should contain a valid value, if
+ * auto reclamation is on. Setting this to 'size' or greater will
+ * reclaim all possible resources currently on the defer queue.
+ */
+ rte_rcu_qsbr_free_resource_t free_fn;
+ /**< Function to call to free the resource. */
+ void *p;
+ /**< Pointer passed to the free function. Typically, this is the
+ * pointer to the data structure to which the resource to free
+ * belongs. This can be NULL.
+ */
+ struct rte_rcu_qsbr *v;
+ /**< RCU QSBR variable to use for this defer queue */
+};
+
+/* RTE defer queue structure.
+ * This structure holds the defer queue. The defer queue is used to
+ * hold the deleted entries from the data structure that are not
+ * yet freed.
+ */
+struct rte_rcu_qsbr_dq;
+
/**
* @warning
* @b EXPERIMENTAL: this API may change without prior notice
@@ -692,6 +772,114 @@ __rte_experimental
int
rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Create a queue used to store the data structure elements that can
+ * be freed later. This queue is referred to as 'defer queue'.
+ *
+ * @param params
+ * Parameters to create a defer queue.
+ * @return
+ * On success - Valid pointer to defer queue
+ * On error - NULL
+ * Possible rte_errno codes are:
+ * - EINVAL - NULL parameters are passed
+ * - ENOMEM - Not enough memory
+ */
+__rte_experimental
+struct rte_rcu_qsbr_dq *
+rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Enqueue one resource to the defer queue and start the grace period.
+ * The resource will be freed later after at least one grace period
+ * is over.
+ *
+ * If the defer queue is full, it will attempt to reclaim resources.
+ * It will also reclaim resources at regular intervals to avoid
+ * the defer queue from growing too big.
+ *
+ * Multi-thread safety is provided as the defer queue configuration.
+ * When multi-thread safety is requested, it is possible that the
+ * resources are not stored in their order of deletion. This results
+ * in resources being held in the defer queue longer than they should.
+ *
+ * @param dq
+ * Defer queue to allocate an entry from.
+ * @param e
+ * Pointer to resource data to copy to the defer queue. The size of
+ * the data to copy is equal to the element size provided when the
+ * defer queue was created.
+ * @return
+ * On success - 0
+ * On error - 1 with rte_errno set to
+ * - EINVAL - NULL parameters are passed
+ * - ENOSPC - Defer queue is full. This condition can not happen
+ * if the defer queue size is equal (or larger) than the
+ * number of elements in the data structure.
+ */
+__rte_experimental
+int
+rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Free quesed resources from the defer queue.
+ *
+ * This API is multi-thread safe.
+ *
+ * @param dq
+ * Defer queue to free an entry from.
+ * @param n
+ * Maximum number of resources to free.
+ * @param freed
+ * Number of resources that were freed.
+ * @param pending
+ * Number of resources pending on the defer queue. This number might not
+ * be acurate if multi-thread safety is configured.
+ * @return
+ * On successful reclamation of at least 1 resource - 0
+ * On error - 1 with rte_errno set to
+ * - EINVAL - NULL parameters are passed
+ */
+__rte_experimental
+int
+rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
+ unsigned int *freed, unsigned int *pending);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Delete a defer queue.
+ *
+ * It tries to reclaim all the resources on the defer queue.
+ * If any of the resources have not completed the grace period
+ * the reclamation stops and returns immediately. The rest of
+ * the resources are not reclaimed and the defer queue is not
+ * freed.
+ *
+ * @param dq
+ * Defer queue to delete.
+ * @return
+ * On success - 0
+ * On error - 1
+ * Possible rte_errno codes are:
+ * - EINVAL - NULL parameters are passed
+ * - EAGAIN - Some of the resources have not completed at least 1 grace
+ * period, try again.
+ */
+__rte_experimental
+int
+rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
+
#ifdef __cplusplus
}
#endif
@@ -8,6 +8,10 @@ EXPERIMENTAL {
rte_rcu_qsbr_synchronize;
rte_rcu_qsbr_thread_register;
rte_rcu_qsbr_thread_unregister;
+ rte_rcu_qsbr_dq_create;
+ rte_rcu_qsbr_dq_enqueue;
+ rte_rcu_qsbr_dq_reclaim;
+ rte_rcu_qsbr_dq_delete;
local: *;
};
@@ -11,7 +11,9 @@
libraries = [
'kvargs', # eal depends on kvargs
'eal', # everything depends on eal
- 'ring', 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
+ 'ring',
+ 'rcu', # rcu depends on ring
+ 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
'cmdline',
'metrics', # bitrate/latency stats depends on this
'hash', # efd depends on this
@@ -22,7 +24,7 @@ libraries = [
'gro', 'gso', 'ip_frag', 'jobstats',
'kni', 'latencystats', 'lpm', 'member',
'power', 'pdump', 'rawdev',
- 'rcu', 'rib', 'reorder', 'sched', 'security', 'stack', 'vhost',
+ 'rib', 'reorder', 'sched', 'security', 'stack', 'vhost',
# ipsec lib depends on net, crypto and security
'ipsec',
#fib lib depends on rib