[1/3] rcu: add RCU library supporting QSBR mechanism

Message ID 20190319045228.46879-2-honnappa.nagarahalli@arm.com
State Superseded, archived
Delegated to: Thomas Monjalon
Headers show
Series
  • lib/rcu: add RCU library supporting QSBR mechanism
Related show

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK
ci/intel-Performance-Testing success Performance Testing PASS
ci/mellanox-Performance-Testing success Performance Testing PASS

Commit Message

Honnappa Nagarahalli March 19, 2019, 4:52 a.m.
Add RCU library supporting quiescent state based memory reclamation method.
This library helps identify the quiescent state of the reader threads so
that the writers can free the memory associated with the lock less data
structures.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Reviewed-by: Steve Capper <steve.capper@arm.com>
Reviewed-by: Gavin Hu <gavin.hu@arm.com>
Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com>
---
 MAINTAINERS                        |   5 +
 config/common_base                 |   6 +
 lib/Makefile                       |   2 +
 lib/librte_rcu/Makefile            |  23 ++
 lib/librte_rcu/meson.build         |   5 +
 lib/librte_rcu/rte_rcu_qsbr.c      |  99 ++++++
 lib/librte_rcu/rte_rcu_qsbr.h      | 511 +++++++++++++++++++++++++++++
 lib/librte_rcu/rte_rcu_version.map |   9 +
 lib/meson.build                    |   2 +-
 mk/rte.app.mk                      |   1 +
 10 files changed, 662 insertions(+), 1 deletion(-)
 create mode 100644 lib/librte_rcu/Makefile
 create mode 100644 lib/librte_rcu/meson.build
 create mode 100644 lib/librte_rcu/rte_rcu_qsbr.c
 create mode 100644 lib/librte_rcu/rte_rcu_qsbr.h
 create mode 100644 lib/librte_rcu/rte_rcu_version.map

Comments

Ananyev, Konstantin March 22, 2019, 4:42 p.m. | #1
Hi Honnappa,

> diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c
> new file mode 100644
> index 000000000..0fc4515ea
> --- /dev/null
> +++ b/lib/librte_rcu/rte_rcu_qsbr.c
> @@ -0,0 +1,99 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2018 Arm Limited
> + */
> +
> +#include <stdio.h>
> +#include <string.h>
> +#include <stdint.h>
> +#include <errno.h>
> +
> +#include <rte_common.h>
> +#include <rte_log.h>
> +#include <rte_memory.h>
> +#include <rte_malloc.h>
> +#include <rte_eal.h>
> +#include <rte_eal_memconfig.h>
> +#include <rte_atomic.h>
> +#include <rte_per_lcore.h>
> +#include <rte_lcore.h>
> +#include <rte_errno.h>
> +
> +#include "rte_rcu_qsbr.h"
> +
> +/* Get the memory size of QSBR variable */
> +size_t __rte_experimental
> +rte_rcu_qsbr_get_memsize(uint32_t max_threads)
> +{
> +	size_t sz;
> +
> +	RTE_ASSERT(max_threads == 0);

Here and in all similar places:
assert() will abort when its condition will be evaluated to false.
So it should be max_threads != 0.
Also it a public and non-datapath function.
Calling assert() for invalid input parameter - seems way too extreme.
Why not just return error to the caller? 

> +
> +	sz = sizeof(struct rte_rcu_qsbr);
> +
> +	/* Add the size of quiescent state counter array */
> +	sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;
> +
> +	return RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
> +}
> +
> +/* Initialize a quiescent state variable */
> +void __rte_experimental
> +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
> +{
> +	RTE_ASSERT(v == NULL);
> +
> +	memset(v, 0, rte_rcu_qsbr_get_memsize(max_threads));
> +	v->m_threads = max_threads;
> +	v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,
> +			RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
> +			RTE_QSBR_THRID_ARRAY_ELM_SIZE;
> +	v->token = RTE_QSBR_CNT_INIT;
> +}
> +
> +/* Dump the details of a single quiescent state variable to a file. */
> +void __rte_experimental
> +rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
> +{
> +	uint64_t bmap;
> +	uint32_t i, t;
> +
> +	RTE_ASSERT(v == NULL || f == NULL);
> +
> +	fprintf(f, "\nQuiescent State Variable @%p\n", v);
> +
> +	fprintf(f, "  QS variable memory size = %lu\n",
> +				rte_rcu_qsbr_get_memsize(v->m_threads));
> +	fprintf(f, "  Given # max threads = %u\n", v->m_threads);
> +
> +	fprintf(f, "  Registered thread ID mask = 0x");
> +	for (i = 0; i < v->num_elems; i++)
> +		fprintf(f, "%lx", __atomic_load_n(&v->reg_thread_id[i],
> +					__ATOMIC_ACQUIRE));
> +	fprintf(f, "\n");
> +
> +	fprintf(f, "  Token = %lu\n",
> +			__atomic_load_n(&v->token, __ATOMIC_ACQUIRE));
> +
> +	fprintf(f, "Quiescent State Counts for readers:\n");
> +	for (i = 0; i < v->num_elems; i++) {
> +		bmap = __atomic_load_n(&v->reg_thread_id[i], __ATOMIC_ACQUIRE);
> +		while (bmap) {
> +			t = __builtin_ctzl(bmap);
> +			fprintf(f, "thread ID = %d, count = %lu\n", t,
> +				__atomic_load_n(
> +					&RTE_QSBR_CNT_ARRAY_ELM(v, i)->cnt,
> +					__ATOMIC_RELAXED));
> +			bmap &= ~(1UL << t);
> +		}
> +	}
> +}
> +
> +int rcu_log_type;
> +
> +RTE_INIT(rte_rcu_register)
> +{
> +	rcu_log_type = rte_log_register("lib.rcu");
> +	if (rcu_log_type >= 0)
> +		rte_log_set_level(rcu_log_type, RTE_LOG_ERR);
> +}
> diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h
> new file mode 100644
> index 000000000..83943f751
> --- /dev/null
> +++ b/lib/librte_rcu/rte_rcu_qsbr.h
> @@ -0,0 +1,511 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright (c) 2018 Arm Limited
> + */
> +
> +#ifndef _RTE_RCU_QSBR_H_
> +#define _RTE_RCU_QSBR_H_
> +
> +/**
> + * @file
> + * RTE Quiescent State Based Reclamation (QSBR)
> + *
> + * Quiescent State (QS) is any point in the thread execution
> + * where the thread does not hold a reference to a data structure
> + * in shared memory. While using lock-less data structures, the writer
> + * can safely free memory once all the reader threads have entered
> + * quiescent state.
> + *
> + * This library provides the ability for the readers to report quiescent
> + * state and for the writers to identify when all the readers have
> + * entered quiescent state.
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdio.h>
> +#include <stdint.h>
> +#include <errno.h>
> +#include <rte_common.h>
> +#include <rte_memory.h>
> +#include <rte_lcore.h>
> +#include <rte_debug.h>
> +
> +extern int rcu_log_type;
> +
> +#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
> +#define RCU_DP_LOG(level, fmt, args...) \
> +	rte_log(RTE_LOG_ ## level, rcu_log_type, \
> +		"%s(): " fmt "\n", __func__, ## args)
> +#else
> +#define RCU_DP_LOG(level, fmt, args...)
> +#endif

Why do you need that?
Can't you use RTE_LOG_DP() instead?

> +
> +/* Registered thread IDs are stored as a bitmap of 64b element array.
> + * Given thread id needs to be converted to index into the array and
> + * the id within the array element.
> + */
> +#define RTE_RCU_MAX_THREADS 1024
> +#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
> +#define RTE_QSBR_THRID_ARRAY_ELEMS \
> +	(RTE_ALIGN_MUL_CEIL(RTE_RCU_MAX_THREADS, \
> +	 RTE_QSBR_THRID_ARRAY_ELM_SIZE) / RTE_QSBR_THRID_ARRAY_ELM_SIZE)
> +#define RTE_QSBR_THRID_INDEX_SHIFT 6
> +#define RTE_QSBR_THRID_MASK 0x3f
> +#define RTE_QSBR_THRID_INVALID 0xffffffff
> +
> +/* Worker thread counter */
> +struct rte_rcu_qsbr_cnt {
> +	uint64_t cnt;
> +	/**< Quiescent state counter. Value 0 indicates the thread is offline */
> +} __rte_cache_aligned;
> +
> +#define RTE_QSBR_CNT_ARRAY_ELM(v, i) (((struct rte_rcu_qsbr_cnt *)(v + 1)) + i)

You can probably add
struct rte_rcu_qsbr_cnt cnt[0];
at the end of struct rte_rcu_qsbr, then wouldn't need macro above.

> +#define RTE_QSBR_CNT_THR_OFFLINE 0
> +#define RTE_QSBR_CNT_INIT 1
> +
> +/**
> + * RTE thread Quiescent State structure.
> + * Quiescent state counter array (array of 'struct rte_rcu_qsbr_cnt'),
> + * whose size is dependent on the maximum number of reader threads
> + * (m_threads) using this variable is stored immediately following
> + * this structure.
> + */
> +struct rte_rcu_qsbr {
> +	uint64_t token __rte_cache_aligned;
> +	/**< Counter to allow for multiple simultaneous QS queries */
> +
> +	uint32_t num_elems __rte_cache_aligned;
> +	/**< Number of elements in the thread ID array */
> +	uint32_t m_threads;
> +	/**< Maximum number of threads this RCU variable will use */
> +
> +	uint64_t reg_thread_id[RTE_QSBR_THRID_ARRAY_ELEMS] __rte_cache_aligned;
> +	/**< Registered thread IDs are stored in a bitmap array */


As I understand you ended up with fixed size array to avoid 2 variable size arrays in this struct?
Is that big penalty for register/unregister() to either store a pointer to bitmap, or calculate it based on num_elems value?
As another thought - do we really need bitmap at all?
Might it is possible to sotre register value for each thread inside it's rte_rcu_qsbr_cnt:
struct rte_rcu_qsbr_cnt {uint64_t cnt; uint32_t register;} __rte_cache_aligned;
?
That would cause check() to walk through all elems in rte_rcu_qsbr_cnt array,
but from other side would help to avoid cache conflicts for register/unregister. 

> +} __rte_cache_aligned;
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Return the size of the memory occupied by a Quiescent State variable.
> + *
> + * @param max_threads
> + *   Maximum number of threads reporting quiescent state on this variable.
> + * @return
> + *   Size of memory in bytes required for this QS variable.
> + */
> +size_t __rte_experimental
> +rte_rcu_qsbr_get_memsize(uint32_t max_threads);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Initialize a Quiescent State (QS) variable.
> + *
> + * @param v
> + *   QS variable
> + * @param max_threads
> + *   Maximum number of threads reporting QS on this variable.
> + *
> + */
> +void __rte_experimental
> +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Register a reader thread to report its quiescent state
> + * on a QS variable.
> + *
> + * This is implemented as a lock-free function. It is multi-thread
> + * safe.
> + * Any reader thread that wants to report its quiescent state must
> + * call this API. This can be called during initialization or as part
> + * of the packet processing loop.
> + *
> + * Note that rte_rcu_qsbr_thread_online must be called before the
> + * thread updates its QS using rte_rcu_qsbr_update.
> + *
> + * @param v
> + *   QS variable
> + * @param thread_id
> + *   Reader thread with this thread ID will report its quiescent state on
> + *   the QS variable.
> + */
> +static __rte_always_inline void __rte_experimental
> +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
> +{
> +	unsigned int i, id;
> +
> +	RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
> +
> +	id = thread_id & RTE_QSBR_THRID_MASK;
> +	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;
> +
> +	/* Release the new register thread ID to other threads
> +	 * calling rte_rcu_qsbr_check.
> +	 */
> +	__atomic_fetch_or(&v->reg_thread_id[i], 1UL << id, __ATOMIC_RELEASE);
> +}
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Remove a reader thread, from the list of threads reporting their
> + * quiescent state on a QS variable.
> + *
> + * This is implemented as a lock-free function. It is multi-thread safe.
> + * This API can be called from the reader threads during shutdown.
> + * Ongoing QS queries will stop waiting for the status from this
> + * unregistered reader thread.
> + *
> + * @param v
> + *   QS variable
> + * @param thread_id
> + *   Reader thread with this thread ID will stop reporting its quiescent
> + *   state on the QS variable.
> + */
> +static __rte_always_inline void __rte_experimental
> +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
> +{
> +	unsigned int i, id;
> +
> +	RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
> +
> +	id = thread_id & RTE_QSBR_THRID_MASK;
> +	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;
> +
> +	/* Make sure the removal of the thread from the list of
> +	 * reporting threads is visible before the thread
> +	 * does anything else.
> +	 */
> +	__atomic_fetch_and(&v->reg_thread_id[i],
> +				~(1UL << id), __ATOMIC_RELEASE);
> +}
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Add a registered reader thread, to the list of threads reporting their
> + * quiescent state on a QS variable.
> + *
> + * This is implemented as a lock-free function. It is multi-thread
> + * safe.
> + *
> + * Any registered reader thread that wants to report its quiescent state must
> + * call this API before calling rte_rcu_qsbr_update. This can be called
> + * during initialization or as part of the packet processing loop.
> + *
> + * The reader thread must call rte_rcu_thread_offline API, before
> + * calling any functions that block, to ensure that rte_rcu_qsbr_check
> + * API does not wait indefinitely for the reader thread to update its QS.
> + *
> + * The reader thread must call rte_rcu_thread_online API, after the blocking
> + * function call returns, to ensure that rte_rcu_qsbr_check API
> + * waits for the reader thread to update its QS.
> + *
> + * @param v
> + *   QS variable
> + * @param thread_id
> + *   Reader thread with this thread ID will report its quiescent state on
> + *   the QS variable.
> + */
> +static __rte_always_inline void __rte_experimental
> +rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
> +{
> +	uint64_t t;
> +
> +	RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
> +
> +	/* Copy the current value of token.
> +	 * The fence at the end of the function will ensure that
> +	 * the following will not move down after the load of any shared
> +	 * data structure.
> +	 */
> +	t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
> +
> +	/* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
> +	 * 'cnt' (64b) is accessed atomically.
> +	 */
> +	__atomic_store_n(&RTE_QSBR_CNT_ARRAY_ELM(v, thread_id)->cnt,
> +		t, __ATOMIC_RELAXED);
> +
> +	/* The subsequent load of the data structure should not
> +	 * move above the store. Hence a store-load barrier
> +	 * is required.
> +	 * If the load of the data structure moves above the store,
> +	 * writer might not see that the reader is online, even though
> +	 * the reader is referencing the shared data structure.
> +	 */
> +	__atomic_thread_fence(__ATOMIC_SEQ_CST);

If it has to generate a proper memory-barrier here anyway,
could it use rte_smp_mb() here?
At least for IA it would generate more lightweight one. 
Konstantin

> +}
> +
Honnappa Nagarahalli March 26, 2019, 4:35 a.m. | #2
> Hi Honnappa,
> 
> > diff --git a/lib/librte_rcu/rte_rcu_qsbr.c
> > b/lib/librte_rcu/rte_rcu_qsbr.c new file mode 100644 index
> > 000000000..0fc4515ea
> > --- /dev/null
> > +++ b/lib/librte_rcu/rte_rcu_qsbr.c
> > @@ -0,0 +1,99 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + *
> > + * Copyright (c) 2018 Arm Limited
> > + */
> > +
> > +#include <stdio.h>
> > +#include <string.h>
> > +#include <stdint.h>
> > +#include <errno.h>
> > +
> > +#include <rte_common.h>
> > +#include <rte_log.h>
> > +#include <rte_memory.h>
> > +#include <rte_malloc.h>
> > +#include <rte_eal.h>
> > +#include <rte_eal_memconfig.h>
> > +#include <rte_atomic.h>
> > +#include <rte_per_lcore.h>
> > +#include <rte_lcore.h>
> > +#include <rte_errno.h>
> > +
> > +#include "rte_rcu_qsbr.h"
> > +
> > +/* Get the memory size of QSBR variable */ size_t __rte_experimental
> > +rte_rcu_qsbr_get_memsize(uint32_t max_threads) {
> > +	size_t sz;
> > +
> > +	RTE_ASSERT(max_threads == 0);
> 
> Here and in all similar places:
> assert() will abort when its condition will be evaluated to false.
> So it should be max_threads != 0.
Thanks for this comment. Enabling RTE_ENABLE_ASSERT resulted in more problems. I will fix in the next version.

> Also it a public and non-datapath function.
> Calling assert() for invalid input parameter - seems way too extreme.
> Why not just return error to the caller?
Ok, I will change it.

> 
> > +
> > +	sz = sizeof(struct rte_rcu_qsbr);
> > +
> > +	/* Add the size of quiescent state counter array */
> > +	sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;
> > +
> > +	return RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); }
> > +
> > +/* Initialize a quiescent state variable */ void __rte_experimental
> > +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads) {
> > +	RTE_ASSERT(v == NULL);
> > +
> > +	memset(v, 0, rte_rcu_qsbr_get_memsize(max_threads));
> > +	v->m_threads = max_threads;
> > +	v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,
> > +			RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
> > +			RTE_QSBR_THRID_ARRAY_ELM_SIZE;
> > +	v->token = RTE_QSBR_CNT_INIT;
> > +}
> > +
> > +/* Dump the details of a single quiescent state variable to a file.
> > +*/ void __rte_experimental rte_rcu_qsbr_dump(FILE *f, struct
> > +rte_rcu_qsbr *v) {
> > +	uint64_t bmap;
> > +	uint32_t i, t;
> > +
> > +	RTE_ASSERT(v == NULL || f == NULL);
> > +
> > +	fprintf(f, "\nQuiescent State Variable @%p\n", v);
> > +
> > +	fprintf(f, "  QS variable memory size = %lu\n",
> > +				rte_rcu_qsbr_get_memsize(v->m_threads));
> > +	fprintf(f, "  Given # max threads = %u\n", v->m_threads);
> > +
> > +	fprintf(f, "  Registered thread ID mask = 0x");
> > +	for (i = 0; i < v->num_elems; i++)
> > +		fprintf(f, "%lx", __atomic_load_n(&v->reg_thread_id[i],
> > +					__ATOMIC_ACQUIRE));
> > +	fprintf(f, "\n");
> > +
> > +	fprintf(f, "  Token = %lu\n",
> > +			__atomic_load_n(&v->token, __ATOMIC_ACQUIRE));
> > +
> > +	fprintf(f, "Quiescent State Counts for readers:\n");
> > +	for (i = 0; i < v->num_elems; i++) {
> > +		bmap = __atomic_load_n(&v->reg_thread_id[i],
> __ATOMIC_ACQUIRE);
> > +		while (bmap) {
> > +			t = __builtin_ctzl(bmap);
> > +			fprintf(f, "thread ID = %d, count = %lu\n", t,
> > +				__atomic_load_n(
> > +					&RTE_QSBR_CNT_ARRAY_ELM(v, i)-
> >cnt,
> > +					__ATOMIC_RELAXED));
> > +			bmap &= ~(1UL << t);
> > +		}
> > +	}
> > +}
> > +
> > +int rcu_log_type;
> > +
> > +RTE_INIT(rte_rcu_register)
> > +{
> > +	rcu_log_type = rte_log_register("lib.rcu");
> > +	if (rcu_log_type >= 0)
> > +		rte_log_set_level(rcu_log_type, RTE_LOG_ERR); }
> > diff --git a/lib/librte_rcu/rte_rcu_qsbr.h
> > b/lib/librte_rcu/rte_rcu_qsbr.h new file mode 100644 index
> > 000000000..83943f751
> > --- /dev/null
> > +++ b/lib/librte_rcu/rte_rcu_qsbr.h
> > @@ -0,0 +1,511 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + * Copyright (c) 2018 Arm Limited
> > + */
> > +
> > +#ifndef _RTE_RCU_QSBR_H_
> > +#define _RTE_RCU_QSBR_H_
> > +
> > +/**
> > + * @file
> > + * RTE Quiescent State Based Reclamation (QSBR)
> > + *
> > + * Quiescent State (QS) is any point in the thread execution
> > + * where the thread does not hold a reference to a data structure
> > + * in shared memory. While using lock-less data structures, the
> > +writer
> > + * can safely free memory once all the reader threads have entered
> > + * quiescent state.
> > + *
> > + * This library provides the ability for the readers to report
> > +quiescent
> > + * state and for the writers to identify when all the readers have
> > + * entered quiescent state.
> > + */
> > +
> > +#ifdef __cplusplus
> > +extern "C" {
> > +#endif
> > +
> > +#include <stdio.h>
> > +#include <stdint.h>
> > +#include <errno.h>
> > +#include <rte_common.h>
> > +#include <rte_memory.h>
> > +#include <rte_lcore.h>
> > +#include <rte_debug.h>
> > +
> > +extern int rcu_log_type;
> > +
> > +#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG #define RCU_DP_LOG(level,
> fmt,
> > +args...) \
> > +	rte_log(RTE_LOG_ ## level, rcu_log_type, \
> > +		"%s(): " fmt "\n", __func__, ## args) #else #define
> > +RCU_DP_LOG(level, fmt, args...) #endif
> 
> Why do you need that?
> Can't you use RTE_LOG_DP() instead?
RTE_LOG_DP is for static log types such as RTE_LOGTYPE_EAL, RTE_LOGTYPE_MBUF etc. Use of static log type in RCU was rejected earlier. Hence, I am using the dynamic log types.

> 
> > +
> > +/* Registered thread IDs are stored as a bitmap of 64b element array.
> > + * Given thread id needs to be converted to index into the array and
> > + * the id within the array element.
> > + */
> > +#define RTE_RCU_MAX_THREADS 1024
> > +#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8) #define
> > +RTE_QSBR_THRID_ARRAY_ELEMS \
> > +	(RTE_ALIGN_MUL_CEIL(RTE_RCU_MAX_THREADS, \
> > +	 RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
> RTE_QSBR_THRID_ARRAY_ELM_SIZE)
> > +#define RTE_QSBR_THRID_INDEX_SHIFT 6 #define RTE_QSBR_THRID_MASK
> 0x3f
> > +#define RTE_QSBR_THRID_INVALID 0xffffffff
> > +
> > +/* Worker thread counter */
> > +struct rte_rcu_qsbr_cnt {
> > +	uint64_t cnt;
> > +	/**< Quiescent state counter. Value 0 indicates the thread is
> > +offline */ } __rte_cache_aligned;
> > +
> > +#define RTE_QSBR_CNT_ARRAY_ELM(v, i) (((struct rte_rcu_qsbr_cnt *)(v
> > ++ 1)) + i)
> 
> You can probably add
> struct rte_rcu_qsbr_cnt cnt[0];
> at the end of struct rte_rcu_qsbr, then wouldn't need macro above.
ok

> 
> > +#define RTE_QSBR_CNT_THR_OFFLINE 0
> > +#define RTE_QSBR_CNT_INIT 1
> > +
> > +/**
> > + * RTE thread Quiescent State structure.
> > + * Quiescent state counter array (array of 'struct
> > +rte_rcu_qsbr_cnt'),
> > + * whose size is dependent on the maximum number of reader threads
> > + * (m_threads) using this variable is stored immediately following
> > + * this structure.
> > + */
> > +struct rte_rcu_qsbr {
> > +	uint64_t token __rte_cache_aligned;
> > +	/**< Counter to allow for multiple simultaneous QS queries */
> > +
> > +	uint32_t num_elems __rte_cache_aligned;
> > +	/**< Number of elements in the thread ID array */
> > +	uint32_t m_threads;
> > +	/**< Maximum number of threads this RCU variable will use */
> > +
> > +	uint64_t reg_thread_id[RTE_QSBR_THRID_ARRAY_ELEMS]
> __rte_cache_aligned;
> > +	/**< Registered thread IDs are stored in a bitmap array */
> 
> 
> As I understand you ended up with fixed size array to avoid 2 variable size
> arrays in this struct?
Yes

> Is that big penalty for register/unregister() to either store a pointer to bitmap,
> or calculate it based on num_elems value?
In the last RFC I sent out [1], I tested the impact of having non-fixed size array. There 'was' a performance degradation in most of the performance tests. The issue was with calculating the address of per thread QSBR counters (not with the address calculation of the bitmap). With the current patch, I do not see the performance difference (the difference between the RFC and this patch are the memory orderings, they are masking any perf gain from having a fixed array). However, I have kept the fixed size array as the generated code does not have additional calculations to get the address of qsbr counter array elements.

[1] http://mails.dpdk.org/archives/dev/2019-February/125029.html

> As another thought - do we really need bitmap at all?
The bit map is helping avoid accessing all the elements in rte_rcu_qsbr_cnt array (as you have mentioned below). This provides the ability to scale the number of threads dynamically. For ex: an application can create a qsbr variable with 48 max threads, but currently only 2 threads are active (due to traffic conditions).

> Might it is possible to sotre register value for each thread inside it's
> rte_rcu_qsbr_cnt:
> struct rte_rcu_qsbr_cnt {uint64_t cnt; uint32_t register;}
> __rte_cache_aligned; ?
> That would cause check() to walk through all elems in rte_rcu_qsbr_cnt array,
> but from other side would help to avoid cache conflicts for register/unregister.
With the addition of rte_rcu_qsbr_thread_online/offline APIs, the register/unregister APIs are not in critical path anymore. Hence, the cache conflicts are fine. The online/offline APIs work on thread specific cache lines and these are in the critical path.

> 
> > +} __rte_cache_aligned;
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Return the size of the memory occupied by a Quiescent State variable.
> > + *
> > + * @param max_threads
> > + *   Maximum number of threads reporting quiescent state on this variable.
> > + * @return
> > + *   Size of memory in bytes required for this QS variable.
> > + */
> > +size_t __rte_experimental
> > +rte_rcu_qsbr_get_memsize(uint32_t max_threads);
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Initialize a Quiescent State (QS) variable.
> > + *
> > + * @param v
> > + *   QS variable
> > + * @param max_threads
> > + *   Maximum number of threads reporting QS on this variable.
> > + *
> > + */
> > +void __rte_experimental
> > +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Register a reader thread to report its quiescent state
> > + * on a QS variable.
> > + *
> > + * This is implemented as a lock-free function. It is multi-thread
> > + * safe.
> > + * Any reader thread that wants to report its quiescent state must
> > + * call this API. This can be called during initialization or as part
> > + * of the packet processing loop.
> > + *
> > + * Note that rte_rcu_qsbr_thread_online must be called before the
> > + * thread updates its QS using rte_rcu_qsbr_update.
> > + *
> > + * @param v
> > + *   QS variable
> > + * @param thread_id
> > + *   Reader thread with this thread ID will report its quiescent state on
> > + *   the QS variable.
> > + */
> > +static __rte_always_inline void __rte_experimental
> > +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int
> > +thread_id) {
> > +	unsigned int i, id;
> > +
> > +	RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
> > +
> > +	id = thread_id & RTE_QSBR_THRID_MASK;
> > +	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;
> > +
> > +	/* Release the new register thread ID to other threads
> > +	 * calling rte_rcu_qsbr_check.
> > +	 */
> > +	__atomic_fetch_or(&v->reg_thread_id[i], 1UL << id,
> > +__ATOMIC_RELEASE); }
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Remove a reader thread, from the list of threads reporting their
> > + * quiescent state on a QS variable.
> > + *
> > + * This is implemented as a lock-free function. It is multi-thread safe.
> > + * This API can be called from the reader threads during shutdown.
> > + * Ongoing QS queries will stop waiting for the status from this
> > + * unregistered reader thread.
> > + *
> > + * @param v
> > + *   QS variable
> > + * @param thread_id
> > + *   Reader thread with this thread ID will stop reporting its quiescent
> > + *   state on the QS variable.
> > + */
> > +static __rte_always_inline void __rte_experimental
> > +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int
> > +thread_id) {
> > +	unsigned int i, id;
> > +
> > +	RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
> > +
> > +	id = thread_id & RTE_QSBR_THRID_MASK;
> > +	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;
> > +
> > +	/* Make sure the removal of the thread from the list of
> > +	 * reporting threads is visible before the thread
> > +	 * does anything else.
> > +	 */
> > +	__atomic_fetch_and(&v->reg_thread_id[i],
> > +				~(1UL << id), __ATOMIC_RELEASE);
> > +}
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Add a registered reader thread, to the list of threads reporting
> > +their
> > + * quiescent state on a QS variable.
> > + *
> > + * This is implemented as a lock-free function. It is multi-thread
> > + * safe.
> > + *
> > + * Any registered reader thread that wants to report its quiescent
> > +state must
> > + * call this API before calling rte_rcu_qsbr_update. This can be
> > +called
> > + * during initialization or as part of the packet processing loop.
> > + *
> > + * The reader thread must call rte_rcu_thread_offline API, before
> > + * calling any functions that block, to ensure that
> > +rte_rcu_qsbr_check
> > + * API does not wait indefinitely for the reader thread to update its QS.
> > + *
> > + * The reader thread must call rte_rcu_thread_online API, after the
> > +blocking
> > + * function call returns, to ensure that rte_rcu_qsbr_check API
> > + * waits for the reader thread to update its QS.
> > + *
> > + * @param v
> > + *   QS variable
> > + * @param thread_id
> > + *   Reader thread with this thread ID will report its quiescent state on
> > + *   the QS variable.
> > + */
> > +static __rte_always_inline void __rte_experimental
> > +rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int
> > +thread_id) {
> > +	uint64_t t;
> > +
> > +	RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
> > +
> > +	/* Copy the current value of token.
> > +	 * The fence at the end of the function will ensure that
> > +	 * the following will not move down after the load of any shared
> > +	 * data structure.
> > +	 */
> > +	t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
> > +
> > +	/* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
> > +	 * 'cnt' (64b) is accessed atomically.
> > +	 */
> > +	__atomic_store_n(&RTE_QSBR_CNT_ARRAY_ELM(v, thread_id)->cnt,
> > +		t, __ATOMIC_RELAXED);
> > +
> > +	/* The subsequent load of the data structure should not
> > +	 * move above the store. Hence a store-load barrier
> > +	 * is required.
> > +	 * If the load of the data structure moves above the store,
> > +	 * writer might not see that the reader is online, even though
> > +	 * the reader is referencing the shared data structure.
> > +	 */
> > +	__atomic_thread_fence(__ATOMIC_SEQ_CST);
> 
> If it has to generate a proper memory-barrier here anyway, could it use
> rte_smp_mb() here?
> At least for IA it would generate more lightweight one.
I have used the C++11 memory model functions. I prefer to not mix it with barriers. Does ICC generate lightweight code for the above fence?
Is it ok to add rte_smp_mb for x86 alone?

> Konstantin
> 
> > +}
> > +
Ananyev, Konstantin March 28, 2019, 11:15 a.m. | #3
> >
> > > +#define RTE_QSBR_CNT_THR_OFFLINE 0
> > > +#define RTE_QSBR_CNT_INIT 1
> > > +
> > > +/**
> > > + * RTE thread Quiescent State structure.
> > > + * Quiescent state counter array (array of 'struct
> > > +rte_rcu_qsbr_cnt'),
> > > + * whose size is dependent on the maximum number of reader threads
> > > + * (m_threads) using this variable is stored immediately following
> > > + * this structure.
> > > + */
> > > +struct rte_rcu_qsbr {
> > > +	uint64_t token __rte_cache_aligned;
> > > +	/**< Counter to allow for multiple simultaneous QS queries */
> > > +
> > > +	uint32_t num_elems __rte_cache_aligned;
> > > +	/**< Number of elements in the thread ID array */
> > > +	uint32_t m_threads;
> > > +	/**< Maximum number of threads this RCU variable will use */
> > > +
> > > +	uint64_t reg_thread_id[RTE_QSBR_THRID_ARRAY_ELEMS]
> > __rte_cache_aligned;
> > > +	/**< Registered thread IDs are stored in a bitmap array */
> >
> >
> > As I understand you ended up with fixed size array to avoid 2 variable size
> > arrays in this struct?
> Yes
> 
> > Is that big penalty for register/unregister() to either store a pointer to bitmap,
> > or calculate it based on num_elems value?
> In the last RFC I sent out [1], I tested the impact of having non-fixed size array. There 'was' a performance degradation in most of the
> performance tests. The issue was with calculating the address of per thread QSBR counters (not with the address calculation of the bitmap).
> With the current patch, I do not see the performance difference (the difference between the RFC and this patch are the memory orderings,
> they are masking any perf gain from having a fixed array). However, I have kept the fixed size array as the generated code does not have
> additional calculations to get the address of qsbr counter array elements.
> 
> [1] http://mails.dpdk.org/archives/dev/2019-February/125029.html

Ok I see, but can we then arrange them ina  different way:
qsbr_cnt[] will start at the end of struct rte_rcu_qsbr
(same as you have it right now).
While bitmap will be placed after qsbr_cnt[].
As I understand register/unregister is not consider on critical path,
so some perf-degradation here doesn't matter.
Also check() would need extra address calculation for bitmap,
but considering that we have to go through all bitmap (and in worst case qsbr_cnt[])
anyway, that probably not a big deal?   

> 
> > As another thought - do we really need bitmap at all?
> The bit map is helping avoid accessing all the elements in rte_rcu_qsbr_cnt array (as you have mentioned below). This provides the ability to
> scale the number of threads dynamically. For ex: an application can create a qsbr variable with 48 max threads, but currently only 2 threads
> are active (due to traffic conditions).

I understand that bitmap supposed to speedup check() for
situations when most threads are unregistered.
My thought was that might be check() speedup for such situation is not that critical.

> 
> > Might it is possible to sotre register value for each thread inside it's
> > rte_rcu_qsbr_cnt:
> > struct rte_rcu_qsbr_cnt {uint64_t cnt; uint32_t register;}
> > __rte_cache_aligned; ?
> > That would cause check() to walk through all elems in rte_rcu_qsbr_cnt array,
> > but from other side would help to avoid cache conflicts for register/unregister.
> With the addition of rte_rcu_qsbr_thread_online/offline APIs, the register/unregister APIs are not in critical path anymore. Hence, the
> cache conflicts are fine. The online/offline APIs work on thread specific cache lines and these are in the critical path.
> 
> >
> > > +} __rte_cache_aligned;
> > > +
Honnappa Nagarahalli March 29, 2019, 5:54 a.m. | #4
> 
> > >
> > > > +#define RTE_QSBR_CNT_THR_OFFLINE 0 #define RTE_QSBR_CNT_INIT
> 1
> > > > +
> > > > +/**
> > > > + * RTE thread Quiescent State structure.
> > > > + * Quiescent state counter array (array of 'struct
> > > > +rte_rcu_qsbr_cnt'),
> > > > + * whose size is dependent on the maximum number of reader
> > > > +threads
> > > > + * (m_threads) using this variable is stored immediately
> > > > +following
> > > > + * this structure.
> > > > + */
> > > > +struct rte_rcu_qsbr {
> > > > +	uint64_t token __rte_cache_aligned;
> > > > +	/**< Counter to allow for multiple simultaneous QS queries */
> > > > +
> > > > +	uint32_t num_elems __rte_cache_aligned;
> > > > +	/**< Number of elements in the thread ID array */
> > > > +	uint32_t m_threads;
> > > > +	/**< Maximum number of threads this RCU variable will use */
> > > > +
> > > > +	uint64_t reg_thread_id[RTE_QSBR_THRID_ARRAY_ELEMS]
> > > __rte_cache_aligned;
> > > > +	/**< Registered thread IDs are stored in a bitmap array */
> > >
> > >
> > > As I understand you ended up with fixed size array to avoid 2
> > > variable size arrays in this struct?
> > Yes
> >
> > > Is that big penalty for register/unregister() to either store a
> > > pointer to bitmap, or calculate it based on num_elems value?
> > In the last RFC I sent out [1], I tested the impact of having
> > non-fixed size array. There 'was' a performance degradation in most of the
> performance tests. The issue was with calculating the address of per thread
> QSBR counters (not with the address calculation of the bitmap).
> > With the current patch, I do not see the performance difference (the
> > difference between the RFC and this patch are the memory orderings,
> > they are masking any perf gain from having a fixed array). However, I have
> kept the fixed size array as the generated code does not have additional
> calculations to get the address of qsbr counter array elements.
> >
> > [1] http://mails.dpdk.org/archives/dev/2019-February/125029.html
> 
> Ok I see, but can we then arrange them ina  different way:
> qsbr_cnt[] will start at the end of struct rte_rcu_qsbr (same as you have it
> right now).
> While bitmap will be placed after qsbr_cnt[].
Yes, that is an option. Though, it would mean we have to calculate the address, similar to macro 'RTE_QSBR_CNT_ARRAY_ELM'

> As I understand register/unregister is not consider on critical path, so some
> perf-degradation here doesn't matter.
Yes

> Also check() would need extra address calculation for bitmap, but considering
> that we have to go through all bitmap (and in worst case qsbr_cnt[])
> anyway, that probably not a big deal?
I think the address calculation can be made simpler than what I had tried before. I can give it a shot.

> 
> >
> > > As another thought - do we really need bitmap at all?
> > The bit map is helping avoid accessing all the elements in
> > rte_rcu_qsbr_cnt array (as you have mentioned below). This provides
> > the ability to scale the number of threads dynamically. For ex: an
> application can create a qsbr variable with 48 max threads, but currently only
> 2 threads are active (due to traffic conditions).
> 
> I understand that bitmap supposed to speedup check() for situations when
> most threads are unregistered.
> My thought was that might be check() speedup for such situation is not that
> critical.
IMO, there is a need to address both the cases, considering the future direction of DPDK. It is possible to introduce a counter for the current number of threads registered. If that is same as maximum number of threads, then scanning the registered thread ID array can be skipped.

> 
> >
> > > Might it is possible to sotre register value for each thread inside
> > > it's
> > > rte_rcu_qsbr_cnt:
> > > struct rte_rcu_qsbr_cnt {uint64_t cnt; uint32_t register;}
> > > __rte_cache_aligned; ?
> > > That would cause check() to walk through all elems in
> > > rte_rcu_qsbr_cnt array, but from other side would help to avoid cache
> conflicts for register/unregister.
> > With the addition of rte_rcu_qsbr_thread_online/offline APIs, the
> > register/unregister APIs are not in critical path anymore. Hence, the cache
> conflicts are fine. The online/offline APIs work on thread specific cache lines
> and these are in the critical path.
> >
> > >
> > > > +} __rte_cache_aligned;
> > > > +

Patch

diff --git a/MAINTAINERS b/MAINTAINERS
index 452b8eb82..5827c1bbe 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -1230,6 +1230,11 @@  F: examples/bpf/
 F: app/test/test_bpf.c
 F: doc/guides/prog_guide/bpf_lib.rst
 
+RCU - EXPERIMENTAL
+M: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
+F: lib/librte_rcu/
+F: doc/guides/prog_guide/rcu_lib.rst
+
 
 Test Applications
 -----------------
diff --git a/config/common_base b/config/common_base
index 0b09a9348..d3557ff3c 100644
--- a/config/common_base
+++ b/config/common_base
@@ -805,6 +805,12 @@  CONFIG_RTE_LIBRTE_LATENCY_STATS=y
 #
 CONFIG_RTE_LIBRTE_TELEMETRY=n
 
+#
+# Compile librte_rcu
+#
+CONFIG_RTE_LIBRTE_RCU=y
+CONFIG_RTE_LIBRTE_RCU_DEBUG=n
+
 #
 # Compile librte_lpm
 #
diff --git a/lib/Makefile b/lib/Makefile
index a358f1c19..b24a9363f 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -109,6 +109,8 @@  DIRS-$(CONFIG_RTE_LIBRTE_IPSEC) += librte_ipsec
 DEPDIRS-librte_ipsec := librte_eal librte_mbuf librte_cryptodev librte_security
 DIRS-$(CONFIG_RTE_LIBRTE_TELEMETRY) += librte_telemetry
 DEPDIRS-librte_telemetry := librte_eal librte_metrics librte_ethdev
+DIRS-$(CONFIG_RTE_LIBRTE_RCU) += librte_rcu
+DEPDIRS-librte_rcu := librte_eal
 
 ifeq ($(CONFIG_RTE_EXEC_ENV_LINUX),y)
 DIRS-$(CONFIG_RTE_LIBRTE_KNI) += librte_kni
diff --git a/lib/librte_rcu/Makefile b/lib/librte_rcu/Makefile
new file mode 100644
index 000000000..6aa677bd1
--- /dev/null
+++ b/lib/librte_rcu/Makefile
@@ -0,0 +1,23 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2018 Arm Limited
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# library name
+LIB = librte_rcu.a
+
+CFLAGS += -DALLOW_EXPERIMENTAL_API
+CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
+LDLIBS += -lrte_eal
+
+EXPORT_MAP := rte_rcu_version.map
+
+LIBABIVER := 1
+
+# all source are stored in SRCS-y
+SRCS-$(CONFIG_RTE_LIBRTE_RCU) := rte_rcu_qsbr.c
+
+# install includes
+SYMLINK-$(CONFIG_RTE_LIBRTE_RCU)-include := rte_rcu_qsbr.h
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build
new file mode 100644
index 000000000..c009ae4b7
--- /dev/null
+++ b/lib/librte_rcu/meson.build
@@ -0,0 +1,5 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2018 Arm Limited
+
+sources = files('rte_rcu_qsbr.c')
+headers = files('rte_rcu_qsbr.h')
diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c
new file mode 100644
index 000000000..0fc4515ea
--- /dev/null
+++ b/lib/librte_rcu/rte_rcu_qsbr.c
@@ -0,0 +1,99 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2018 Arm Limited
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <stdint.h>
+#include <errno.h>
+
+#include <rte_common.h>
+#include <rte_log.h>
+#include <rte_memory.h>
+#include <rte_malloc.h>
+#include <rte_eal.h>
+#include <rte_eal_memconfig.h>
+#include <rte_atomic.h>
+#include <rte_per_lcore.h>
+#include <rte_lcore.h>
+#include <rte_errno.h>
+
+#include "rte_rcu_qsbr.h"
+
+/* Get the memory size of QSBR variable */
+size_t __rte_experimental
+rte_rcu_qsbr_get_memsize(uint32_t max_threads)
+{
+	size_t sz;
+
+	RTE_ASSERT(max_threads == 0);
+
+	sz = sizeof(struct rte_rcu_qsbr);
+
+	/* Add the size of quiescent state counter array */
+	sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;
+
+	return RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+}
+
+/* Initialize a quiescent state variable */
+void __rte_experimental
+rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
+{
+	RTE_ASSERT(v == NULL);
+
+	memset(v, 0, rte_rcu_qsbr_get_memsize(max_threads));
+	v->m_threads = max_threads;
+	v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,
+			RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
+			RTE_QSBR_THRID_ARRAY_ELM_SIZE;
+	v->token = RTE_QSBR_CNT_INIT;
+}
+
+/* Dump the details of a single quiescent state variable to a file. */
+void __rte_experimental
+rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
+{
+	uint64_t bmap;
+	uint32_t i, t;
+
+	RTE_ASSERT(v == NULL || f == NULL);
+
+	fprintf(f, "\nQuiescent State Variable @%p\n", v);
+
+	fprintf(f, "  QS variable memory size = %lu\n",
+				rte_rcu_qsbr_get_memsize(v->m_threads));
+	fprintf(f, "  Given # max threads = %u\n", v->m_threads);
+
+	fprintf(f, "  Registered thread ID mask = 0x");
+	for (i = 0; i < v->num_elems; i++)
+		fprintf(f, "%lx", __atomic_load_n(&v->reg_thread_id[i],
+					__ATOMIC_ACQUIRE));
+	fprintf(f, "\n");
+
+	fprintf(f, "  Token = %lu\n",
+			__atomic_load_n(&v->token, __ATOMIC_ACQUIRE));
+
+	fprintf(f, "Quiescent State Counts for readers:\n");
+	for (i = 0; i < v->num_elems; i++) {
+		bmap = __atomic_load_n(&v->reg_thread_id[i], __ATOMIC_ACQUIRE);
+		while (bmap) {
+			t = __builtin_ctzl(bmap);
+			fprintf(f, "thread ID = %d, count = %lu\n", t,
+				__atomic_load_n(
+					&RTE_QSBR_CNT_ARRAY_ELM(v, i)->cnt,
+					__ATOMIC_RELAXED));
+			bmap &= ~(1UL << t);
+		}
+	}
+}
+
+int rcu_log_type;
+
+RTE_INIT(rte_rcu_register)
+{
+	rcu_log_type = rte_log_register("lib.rcu");
+	if (rcu_log_type >= 0)
+		rte_log_set_level(rcu_log_type, RTE_LOG_ERR);
+}
diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h
new file mode 100644
index 000000000..83943f751
--- /dev/null
+++ b/lib/librte_rcu/rte_rcu_qsbr.h
@@ -0,0 +1,511 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright (c) 2018 Arm Limited
+ */
+
+#ifndef _RTE_RCU_QSBR_H_
+#define _RTE_RCU_QSBR_H_
+
+/**
+ * @file
+ * RTE Quiescent State Based Reclamation (QSBR)
+ *
+ * Quiescent State (QS) is any point in the thread execution
+ * where the thread does not hold a reference to a data structure
+ * in shared memory. While using lock-less data structures, the writer
+ * can safely free memory once all the reader threads have entered
+ * quiescent state.
+ *
+ * This library provides the ability for the readers to report quiescent
+ * state and for the writers to identify when all the readers have
+ * entered quiescent state.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdio.h>
+#include <stdint.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_debug.h>
+
+extern int rcu_log_type;
+
+#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
+#define RCU_DP_LOG(level, fmt, args...) \
+	rte_log(RTE_LOG_ ## level, rcu_log_type, \
+		"%s(): " fmt "\n", __func__, ## args)
+#else
+#define RCU_DP_LOG(level, fmt, args...)
+#endif
+
+/* Registered thread IDs are stored as a bitmap of 64b element array.
+ * Given thread id needs to be converted to index into the array and
+ * the id within the array element.
+ */
+#define RTE_RCU_MAX_THREADS 1024
+#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
+#define RTE_QSBR_THRID_ARRAY_ELEMS \
+	(RTE_ALIGN_MUL_CEIL(RTE_RCU_MAX_THREADS, \
+	 RTE_QSBR_THRID_ARRAY_ELM_SIZE) / RTE_QSBR_THRID_ARRAY_ELM_SIZE)
+#define RTE_QSBR_THRID_INDEX_SHIFT 6
+#define RTE_QSBR_THRID_MASK 0x3f
+#define RTE_QSBR_THRID_INVALID 0xffffffff
+
+/* Worker thread counter */
+struct rte_rcu_qsbr_cnt {
+	uint64_t cnt;
+	/**< Quiescent state counter. Value 0 indicates the thread is offline */
+} __rte_cache_aligned;
+
+#define RTE_QSBR_CNT_ARRAY_ELM(v, i) (((struct rte_rcu_qsbr_cnt *)(v + 1)) + i)
+#define RTE_QSBR_CNT_THR_OFFLINE 0
+#define RTE_QSBR_CNT_INIT 1
+
+/**
+ * RTE thread Quiescent State structure.
+ * Quiescent state counter array (array of 'struct rte_rcu_qsbr_cnt'),
+ * whose size is dependent on the maximum number of reader threads
+ * (m_threads) using this variable is stored immediately following
+ * this structure.
+ */
+struct rte_rcu_qsbr {
+	uint64_t token __rte_cache_aligned;
+	/**< Counter to allow for multiple simultaneous QS queries */
+
+	uint32_t num_elems __rte_cache_aligned;
+	/**< Number of elements in the thread ID array */
+	uint32_t m_threads;
+	/**< Maximum number of threads this RCU variable will use */
+
+	uint64_t reg_thread_id[RTE_QSBR_THRID_ARRAY_ELEMS] __rte_cache_aligned;
+	/**< Registered thread IDs are stored in a bitmap array */
+} __rte_cache_aligned;
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Return the size of the memory occupied by a Quiescent State variable.
+ *
+ * @param max_threads
+ *   Maximum number of threads reporting quiescent state on this variable.
+ * @return
+ *   Size of memory in bytes required for this QS variable.
+ */
+size_t __rte_experimental
+rte_rcu_qsbr_get_memsize(uint32_t max_threads);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Initialize a Quiescent State (QS) variable.
+ *
+ * @param v
+ *   QS variable
+ * @param max_threads
+ *   Maximum number of threads reporting QS on this variable.
+ *
+ */
+void __rte_experimental
+rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Register a reader thread to report its quiescent state
+ * on a QS variable.
+ *
+ * This is implemented as a lock-free function. It is multi-thread
+ * safe.
+ * Any reader thread that wants to report its quiescent state must
+ * call this API. This can be called during initialization or as part
+ * of the packet processing loop.
+ *
+ * Note that rte_rcu_qsbr_thread_online must be called before the
+ * thread updates its QS using rte_rcu_qsbr_update.
+ *
+ * @param v
+ *   QS variable
+ * @param thread_id
+ *   Reader thread with this thread ID will report its quiescent state on
+ *   the QS variable.
+ */
+static __rte_always_inline void __rte_experimental
+rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
+{
+	unsigned int i, id;
+
+	RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
+
+	id = thread_id & RTE_QSBR_THRID_MASK;
+	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;
+
+	/* Release the new register thread ID to other threads
+	 * calling rte_rcu_qsbr_check.
+	 */
+	__atomic_fetch_or(&v->reg_thread_id[i], 1UL << id, __ATOMIC_RELEASE);
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Remove a reader thread, from the list of threads reporting their
+ * quiescent state on a QS variable.
+ *
+ * This is implemented as a lock-free function. It is multi-thread safe.
+ * This API can be called from the reader threads during shutdown.
+ * Ongoing QS queries will stop waiting for the status from this
+ * unregistered reader thread.
+ *
+ * @param v
+ *   QS variable
+ * @param thread_id
+ *   Reader thread with this thread ID will stop reporting its quiescent
+ *   state on the QS variable.
+ */
+static __rte_always_inline void __rte_experimental
+rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
+{
+	unsigned int i, id;
+
+	RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
+
+	id = thread_id & RTE_QSBR_THRID_MASK;
+	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;
+
+	/* Make sure the removal of the thread from the list of
+	 * reporting threads is visible before the thread
+	 * does anything else.
+	 */
+	__atomic_fetch_and(&v->reg_thread_id[i],
+				~(1UL << id), __ATOMIC_RELEASE);
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Add a registered reader thread, to the list of threads reporting their
+ * quiescent state on a QS variable.
+ *
+ * This is implemented as a lock-free function. It is multi-thread
+ * safe.
+ *
+ * Any registered reader thread that wants to report its quiescent state must
+ * call this API before calling rte_rcu_qsbr_update. This can be called
+ * during initialization or as part of the packet processing loop.
+ *
+ * The reader thread must call rte_rcu_thread_offline API, before
+ * calling any functions that block, to ensure that rte_rcu_qsbr_check
+ * API does not wait indefinitely for the reader thread to update its QS.
+ *
+ * The reader thread must call rte_rcu_thread_online API, after the blocking
+ * function call returns, to ensure that rte_rcu_qsbr_check API
+ * waits for the reader thread to update its QS.
+ *
+ * @param v
+ *   QS variable
+ * @param thread_id
+ *   Reader thread with this thread ID will report its quiescent state on
+ *   the QS variable.
+ */
+static __rte_always_inline void __rte_experimental
+rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
+{
+	uint64_t t;
+
+	RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
+
+	/* Copy the current value of token.
+	 * The fence at the end of the function will ensure that
+	 * the following will not move down after the load of any shared
+	 * data structure.
+	 */
+	t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
+
+	/* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
+	 * 'cnt' (64b) is accessed atomically.
+	 */
+	__atomic_store_n(&RTE_QSBR_CNT_ARRAY_ELM(v, thread_id)->cnt,
+		t, __ATOMIC_RELAXED);
+
+	/* The subsequent load of the data structure should not
+	 * move above the store. Hence a store-load barrier
+	 * is required.
+	 * If the load of the data structure moves above the store,
+	 * writer might not see that the reader is online, even though
+	 * the reader is referencing the shared data structure.
+	 */
+	__atomic_thread_fence(__ATOMIC_SEQ_CST);
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Remove a registered reader thread from the list of threads reporting their
+ * quiescent state on a QS variable.
+ *
+ * This is implemented as a lock-free function. It is multi-thread
+ * safe.
+ *
+ * This can be called during initialization or as part of the packet
+ * processing loop.
+ *
+ * The reader thread must call rte_rcu_thread_offline API, before
+ * calling any functions that block, to ensure that rte_rcu_qsbr_check
+ * API does not wait indefinitely for the reader thread to update its QS.
+ *
+ * @param v
+ *   QS variable
+ * @param thread_id
+ *   rte_rcu_qsbr_check API will not wait for the reader thread with
+ *   this thread ID to report its quiescent state on the QS variable.
+ */
+static __rte_always_inline void __rte_experimental
+rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
+{
+	RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
+
+	/* The reader can go offline only after the load of the
+	 * data structure is completed. i.e. any load of the
+	 * data strcture can not move after this store.
+	 */
+
+	__atomic_store_n(&RTE_QSBR_CNT_ARRAY_ELM(v, thread_id)->cnt,
+		RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Ask the reader threads to report the quiescent state
+ * status.
+ *
+ * This is implemented as a lock-free function. It is multi-thread
+ * safe and can be called from worker threads.
+ *
+ * @param v
+ *   TQS variable
+ * @return
+ *   - This is the token for this call of the API. This should be
+ *     passed to rte_rcu_qsbr_check API.
+ */
+static __rte_always_inline uint64_t __rte_experimental
+rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
+{
+	uint64_t t;
+
+	RTE_ASSERT(v == NULL || t == NULL);
+
+	/* Release the changes to the shared data structure.
+	 * This store release will ensure that changes to any data
+	 * structure are visible to the workers before the token
+	 * update is visible.
+	 */
+	t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
+
+	return t;
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Update quiescent state for a reader thread.
+ *
+ * This is implemented as a lock-free function. It is multi-thread safe.
+ * All the reader threads registered to report their quiescent state
+ * on the QS variable must call this API.
+ *
+ * @param v
+ *   QS variable
+ * @param thread_id
+ *   Update the quiescent state for the reader with this thread ID.
+ */
+static __rte_always_inline void __rte_experimental
+rte_rcu_qsbr_update(struct rte_rcu_qsbr *v, unsigned int thread_id)
+{
+	uint64_t t;
+
+	RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
+
+	/* Acquire the changes to the shared data structure released
+	 * by rte_rcu_qsbr_start.
+	 * Later loads of the shared data structure should not move
+	 * above this load. Hence, use load-acquire.
+	 */
+	t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
+
+	/* Inform the writer that updates are visible to this reader.
+	 * Prior loads of the shared data structure should not move
+	 * beyond this store. Hence use store-release.
+	 */
+	__atomic_store_n(&RTE_QSBR_CNT_ARRAY_ELM(v, thread_id)->cnt,
+			 t, __ATOMIC_RELEASE);
+
+	RCU_DP_LOG(DEBUG, "%s: update: token = %lu, Thread ID = %d",
+		__func__, t, thread_id);
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Checks if all the reader threads have entered the quiescent state
+ * referenced by token.
+ *
+ * This is implemented as a lock-free function. It is multi-thread
+ * safe and can be called from the worker threads as well.
+ *
+ * If this API is called with 'wait' set to true, the following
+ * factors must be considered:
+ *
+ * 1) If the calling thread is also reporting the status on the
+ * same RCU variable, it must update the QS status, before calling
+ * this API.
+ *
+ * 2) In addition, while calling from multiple threads, more than
+ * one of those threads cannot be reporting the QS status on the
+ * same RCU variable.
+ *
+ * @param v
+ *   QS variable
+ * @param t
+ *   Token returned by rte_rcu_qsbr_start API
+ * @param wait
+ *   If true, block till all the reader threads have completed entering
+ *   the quiescent state 'n' number of times
+ * @return
+ *   - 0 if all reader threads have NOT passed through specified number
+ *     of quiescent states.
+ *   - 1 if all reader threads have passed through specified number
+ *     of quiescent states.
+ */
+static __rte_always_inline int __rte_experimental
+rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
+{
+	uint32_t i, j, id;
+	uint64_t bmap;
+	uint64_t cnt;
+
+	RTE_ASSERT(v == NULL);
+
+	i = 0;
+	do {
+		/* Load the current registered thread bit map before
+		 * loading the reader thread quiescent state counters.
+		 */
+		bmap = __atomic_load_n(&v->reg_thread_id[i], __ATOMIC_ACQUIRE);
+		id = i << RTE_QSBR_THRID_INDEX_SHIFT;
+
+		while (bmap) {
+			j = __builtin_ctzl(bmap);
+			RCU_DP_LOG(DEBUG,
+				"%s: check: token = %lu, wait = %d, Bit Map = 0x%lx, Thread ID = %d",
+				__func__, t, wait, bmap, id+j);
+			cnt = __atomic_load_n(
+					&RTE_QSBR_CNT_ARRAY_ELM(v, id + j)->cnt,
+					__ATOMIC_ACQUIRE);
+			RCU_DP_LOG(DEBUG,
+				"%s: status: token = %lu, wait = %d, Thread QS cnt = %lu, Thread ID = %d",
+				__func__, t, wait,
+				RTE_QSBR_CNT_ARRAY_ELM(v, id + j)->cnt, id+j);
+			/* Counter is not checked for wrap-around condition
+			 * as it is a 64b counter.
+			 */
+			if (unlikely(cnt != RTE_QSBR_CNT_THR_OFFLINE &&
+					cnt < t)) {
+				/* This thread is not in QS */
+				if (!wait)
+					return 0;
+
+				rte_pause();
+				/* This thread might have unregistered.
+				 * Re-read the bitmap.
+				 */
+				bmap = __atomic_load_n(
+						&v->reg_thread_id[i],
+						__ATOMIC_ACQUIRE);
+
+				continue;
+			}
+
+			bmap &= ~(1UL << j);
+		}
+
+		i++;
+	} while (i < v->num_elems);
+
+	return 1;
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Wait till the reader threads have entered quiescent state.
+ *
+ * This is implemented as a lock-free function. It is multi-thread safe.
+ * This API can be thought of as a wrapper around rte_rcu_qsbr_start and
+ * rte_rcu_qsbr_check APIs.
+ *
+ * If this API is called from multiple threads, more than one of
+ * those threads cannot be reporting the QS status on the same
+ * RCU variable.
+ *
+ * @param v
+ *   QS variable
+ * @param thread_id
+ *   Thread ID of the caller if it is registered to report QS on
+ *   this QS variable (i.e. the calling thread is also part of the
+ *   readside critical section). If not, pass RTE_QSBR_THRID_INVALID.
+ */
+static __rte_always_inline void __rte_experimental
+rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
+{
+	uint64_t t;
+
+	RTE_ASSERT(v == NULL);
+
+	t = rte_rcu_qsbr_start(v);
+
+	/* If the current thread has readside critical section,
+	 * update its QS status.
+	 */
+	if (thread_id != RTE_QSBR_THRID_INVALID)
+		rte_rcu_qsbr_update(v, thread_id);
+
+	/* Wait for other readers to enter QS */
+	rte_rcu_qsbr_check(v, t, true);
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Dump the details of a single QS variables to a file.
+ *
+ * It is NOT multi-thread safe.
+ *
+ * @param f
+ *   A pointer to a file for output
+ * @param v
+ *   QS variable
+ */
+void __rte_experimental
+rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_RCU_QSBR_H_ */
diff --git a/lib/librte_rcu/rte_rcu_version.map b/lib/librte_rcu/rte_rcu_version.map
new file mode 100644
index 000000000..019560adf
--- /dev/null
+++ b/lib/librte_rcu/rte_rcu_version.map
@@ -0,0 +1,9 @@ 
+EXPERIMENTAL {
+	global:
+
+	rte_rcu_qsbr_get_memsize;
+	rte_rcu_qsbr_init;
+	rte_rcu_qsbr_dump;
+
+	local: *;
+};
diff --git a/lib/meson.build b/lib/meson.build
index 99957ba7d..3feb44b75 100644
--- a/lib/meson.build
+++ b/lib/meson.build
@@ -22,7 +22,7 @@  libraries = [
 	'gro', 'gso', 'ip_frag', 'jobstats',
 	'kni', 'latencystats', 'lpm', 'member',
 	'power', 'pdump', 'rawdev',
-	'reorder', 'sched', 'security', 'vhost',
+	'reorder', 'sched', 'security', 'vhost', 'rcu',
 	#ipsec lib depends on crypto and security
 	'ipsec',
 	# add pkt framework libs which use other libs from above
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index 262132fc6..2de0b5fc6 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -96,6 +96,7 @@  _LDLIBS-$(CONFIG_RTE_LIBRTE_EAL)            += -lrte_eal
 _LDLIBS-$(CONFIG_RTE_LIBRTE_CMDLINE)        += -lrte_cmdline
 _LDLIBS-$(CONFIG_RTE_LIBRTE_REORDER)        += -lrte_reorder
 _LDLIBS-$(CONFIG_RTE_LIBRTE_SCHED)          += -lrte_sched
+_LDLIBS-$(CONFIG_RTE_LIBRTE_RCU)            += -lrte_rcu
 
 ifeq ($(CONFIG_RTE_EXEC_ENV_LINUX),y)
 _LDLIBS-$(CONFIG_RTE_LIBRTE_KNI)            += -lrte_kni