[v7,1/3] lib/lpm: integrate RCU QSBR

Message ID 20200707151554.64431-2-ruifeng.wang@arm.com (mailing list archive)
State Superseded, archived
Delegated to: David Marchand
Headers
Series RCU integration with LPM library |

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/iol-broadcom-Performance success Performance Testing PASS
ci/iol-intel-Performance success Performance Testing PASS
ci/iol-testing success Testing PASS
ci/iol-mellanox-Performance success Performance Testing PASS
ci/Intel-compilation success Compilation OK

Commit Message

Ruifeng Wang July 7, 2020, 3:15 p.m. UTC
  Currently, the tbl8 group is freed even though the readers might be
using the tbl8 group entries. The freed tbl8 group can be reallocated
quickly. This results in incorrect lookup results.

RCU QSBR process is integrated for safe tbl8 group reclaim.
Refer to RCU documentation to understand various aspects of
integrating RCU library into other libraries.

Signed-off-by: Ruifeng Wang <ruifeng.wang@arm.com>
Reviewed-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Acked-by: Ray Kinsella <mdr@ashroe.eu>
---
 doc/guides/prog_guide/lpm_lib.rst  |  32 ++++++++
 lib/librte_lpm/Makefile            |   2 +-
 lib/librte_lpm/meson.build         |   1 +
 lib/librte_lpm/rte_lpm.c           | 120 ++++++++++++++++++++++++++---
 lib/librte_lpm/rte_lpm.h           |  59 ++++++++++++++
 lib/librte_lpm/rte_lpm_version.map |   6 ++
 6 files changed, 208 insertions(+), 12 deletions(-)
  

Comments

Vladimir Medvedkin July 8, 2020, 12:36 p.m. UTC | #1
On 07/07/2020 16:15, Ruifeng Wang wrote:
> Currently, the tbl8 group is freed even though the readers might be
> using the tbl8 group entries. The freed tbl8 group can be reallocated
> quickly. This results in incorrect lookup results.
>
> RCU QSBR process is integrated for safe tbl8 group reclaim.
> Refer to RCU documentation to understand various aspects of
> integrating RCU library into other libraries.
>
> Signed-off-by: Ruifeng Wang <ruifeng.wang@arm.com>
> Reviewed-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> Acked-by: Ray Kinsella <mdr@ashroe.eu>
> ---
>   doc/guides/prog_guide/lpm_lib.rst  |  32 ++++++++
>   lib/librte_lpm/Makefile            |   2 +-
>   lib/librte_lpm/meson.build         |   1 +
>   lib/librte_lpm/rte_lpm.c           | 120 ++++++++++++++++++++++++++---
>   lib/librte_lpm/rte_lpm.h           |  59 ++++++++++++++
>   lib/librte_lpm/rte_lpm_version.map |   6 ++
>   6 files changed, 208 insertions(+), 12 deletions(-)
>
> diff --git a/doc/guides/prog_guide/lpm_lib.rst b/doc/guides/prog_guide/lpm_lib.rst
> index 1609a57d0..03945904b 100644
> --- a/doc/guides/prog_guide/lpm_lib.rst
> +++ b/doc/guides/prog_guide/lpm_lib.rst
> @@ -145,6 +145,38 @@ depending on whether we need to move to the next table or not.
>   Prefix expansion is one of the keys of this algorithm,
>   since it improves the speed dramatically by adding redundancy.
>   
> +Deletion
> +~~~~~~~~
> +
> +When deleting a rule, a replacement rule is searched for. Replacement rule is an existing rule that has
> +the longest prefix match with the rule to be deleted, but has shorter prefix.
> +
> +If a replacement rule is found, target tbl24 and tbl8 entries are updated to have the same depth and next hop
> +value with the replacement rule.
> +
> +If no replacement rule can be found, target tbl24 and tbl8 entries will be cleared.
> +
> +Prefix expansion is performed if the rule's depth is not exactly 24 bits or 32 bits.
> +
> +After deleting a rule, a group of tbl8s that belongs to the same tbl24 entry are freed in following cases:
> +
> +*   All tbl8s in the group are empty .
> +
> +*   All tbl8s in the group have the same values and with depth no greater than 24.
> +
> +Free of tbl8s have different behaviors:
> +
> +*   If RCU is not used, tbl8s are cleared and reclaimed immediately.
> +
> +*   If RCU is used, tbl8s are reclaimed when readers are in quiescent state.
> +
> +When the LPM is not using RCU, tbl8 group can be freed immediately even though the readers might be using
> +the tbl8 group entries. This might result in incorrect lookup results.
> +
> +RCU QSBR process is integrated for safe tbl8 group reclamation. Application has certain responsibilities
> +while using this feature. Please refer to resource reclamation framework of :ref:`RCU library <RCU_Library>`
> +for more details.
> +
>   Lookup
>   ~~~~~~
>   
> diff --git a/lib/librte_lpm/Makefile b/lib/librte_lpm/Makefile
> index d682785b6..6f06c5c03 100644
> --- a/lib/librte_lpm/Makefile
> +++ b/lib/librte_lpm/Makefile
> @@ -8,7 +8,7 @@ LIB = librte_lpm.a
>   
>   CFLAGS += -O3
>   CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
> -LDLIBS += -lrte_eal -lrte_hash
> +LDLIBS += -lrte_eal -lrte_hash -lrte_rcu
>   
>   EXPORT_MAP := rte_lpm_version.map
>   
> diff --git a/lib/librte_lpm/meson.build b/lib/librte_lpm/meson.build
> index 021ac6d8d..6cfc083c5 100644
> --- a/lib/librte_lpm/meson.build
> +++ b/lib/librte_lpm/meson.build
> @@ -7,3 +7,4 @@ headers = files('rte_lpm.h', 'rte_lpm6.h')
>   # without worrying about which architecture we actually need
>   headers += files('rte_lpm_altivec.h', 'rte_lpm_neon.h', 'rte_lpm_sse.h')
>   deps += ['hash']
> +deps += ['rcu']
> diff --git a/lib/librte_lpm/rte_lpm.c b/lib/librte_lpm/rte_lpm.c
> index 38ab512a4..d498ba761 100644
> --- a/lib/librte_lpm/rte_lpm.c
> +++ b/lib/librte_lpm/rte_lpm.c
> @@ -1,5 +1,6 @@
>   /* SPDX-License-Identifier: BSD-3-Clause
>    * Copyright(c) 2010-2014 Intel Corporation
> + * Copyright(c) 2020 Arm Limited
>    */
>   
>   #include <string.h>
> @@ -246,12 +247,82 @@ rte_lpm_free(struct rte_lpm *lpm)
>   
>   	rte_mcfg_tailq_write_unlock();
>   
> +	if (lpm->dq)
> +		rte_rcu_qsbr_dq_delete(lpm->dq);
>   	rte_free(lpm->tbl8);
>   	rte_free(lpm->rules_tbl);
>   	rte_free(lpm);
>   	rte_free(te);
>   }
>   
> +static void
> +__lpm_rcu_qsbr_free_resource(void *p, void *data, unsigned int n)
> +{
> +	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> +	uint32_t tbl8_group_index = *(uint32_t *)data;
> +	struct rte_lpm_tbl_entry *tbl8 = ((struct rte_lpm *)p)->tbl8;
> +
> +	RTE_SET_USED(n);
> +	/* Set tbl8 group invalid */
> +	__atomic_store(&tbl8[tbl8_group_index], &zero_tbl8_entry,
> +		__ATOMIC_RELAXED);
> +}
> +
> +/* Associate QSBR variable with an LPM object.
> + */
> +int
> +rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_lpm_rcu_config *cfg,
> +	struct rte_rcu_qsbr_dq **dq)
> +{
> +	char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
> +	struct rte_rcu_qsbr_dq_parameters params = {0};
> +
> +	if ((lpm == NULL) || (cfg == NULL)) {
> +		rte_errno = EINVAL;
> +		return 1;
> +	}
> +
> +	if (lpm->v) {
> +		rte_errno = EEXIST;
> +		return 1;
> +	}
> +
> +	if (cfg->mode == RTE_LPM_QSBR_MODE_SYNC) {
> +		/* No other things to do. */
> +	} else if (cfg->mode == RTE_LPM_QSBR_MODE_DQ) {
> +		/* Init QSBR defer queue. */
> +		snprintf(rcu_dq_name, sizeof(rcu_dq_name),
> +				"LPM_RCU_%s", lpm->name);
> +		params.name = rcu_dq_name;
> +		params.size = cfg->dq_size;
> +		if (params.size == 0)
> +			params.size = lpm->number_tbl8s;
> +		params.trigger_reclaim_limit = cfg->reclaim_thd;
> +		params.max_reclaim_size = cfg->reclaim_max;
> +		if (params.max_reclaim_size == 0)
> +			params.max_reclaim_size = RTE_LPM_RCU_DQ_RECLAIM_MAX;
> +		params.esize = sizeof(uint32_t);	/* tbl8 group index */
> +		params.free_fn = __lpm_rcu_qsbr_free_resource;
> +		params.p = lpm;
> +		params.v = cfg->v;
> +		lpm->dq = rte_rcu_qsbr_dq_create(&params);
> +		if (lpm->dq == NULL) {
> +			RTE_LOG(ERR, LPM,
> +					"LPM QS defer queue creation failed\n");
> +			return 1;
> +		}
> +		if (dq)
> +			*dq = lpm->dq;
> +	} else {
> +		rte_errno = EINVAL;
> +		return 1;
> +	}
> +	lpm->rcu_mode = cfg->mode;
> +	lpm->v = cfg->v;
> +
> +	return 0;
> +}
> +
>   /*
>    * Adds a rule to the rule table.
>    *
> @@ -394,14 +465,15 @@ rule_find(struct rte_lpm *lpm, uint32_t ip_masked, uint8_t depth)
>    * Find, clean and allocate a tbl8.
>    */
>   static int32_t
> -tbl8_alloc(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
> +_tbl8_alloc(struct rte_lpm *lpm)
>   {
>   	uint32_t group_idx; /* tbl8 group index. */
>   	struct rte_lpm_tbl_entry *tbl8_entry;
>   
>   	/* Scan through tbl8 to find a free (i.e. INVALID) tbl8 group. */
> -	for (group_idx = 0; group_idx < number_tbl8s; group_idx++) {
> -		tbl8_entry = &tbl8[group_idx * RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
> +	for (group_idx = 0; group_idx < lpm->number_tbl8s; group_idx++) {
> +		tbl8_entry = &lpm->tbl8[group_idx *
> +					RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
>   		/* If a free tbl8 group is found clean it and set as VALID. */
>   		if (!tbl8_entry->valid_group) {
>   			struct rte_lpm_tbl_entry new_tbl8_entry = {
> @@ -427,14 +499,40 @@ tbl8_alloc(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
>   	return -ENOSPC;
>   }
>   
> +static int32_t
> +tbl8_alloc(struct rte_lpm *lpm)
> +{
> +	int32_t group_idx; /* tbl8 group index. */
> +
> +	group_idx = _tbl8_alloc(lpm);
> +	if ((group_idx == -ENOSPC) && (lpm->dq != NULL)) {
> +		/* If there are no tbl8 groups try to reclaim one. */
> +		if (rte_rcu_qsbr_dq_reclaim(lpm->dq, 1, NULL, NULL, NULL) == 0)
> +			group_idx = _tbl8_alloc(lpm);
> +	}
> +
> +	return group_idx;
> +}
> +
>   static void
> -tbl8_free(struct rte_lpm_tbl_entry *tbl8, uint32_t tbl8_group_start)
> +tbl8_free(struct rte_lpm *lpm, uint32_t tbl8_group_start)
>   {
> -	/* Set tbl8 group invalid*/
>   	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
>   
> -	__atomic_store(&tbl8[tbl8_group_start], &zero_tbl8_entry,
> -			__ATOMIC_RELAXED);
> +	if (!lpm->v) {
> +		/* Set tbl8 group invalid*/
> +		__atomic_store(&lpm->tbl8[tbl8_group_start], &zero_tbl8_entry,
> +				__ATOMIC_RELAXED);
> +	} else if (lpm->rcu_mode == RTE_LPM_QSBR_MODE_SYNC) {
> +		/* Wait for quiescent state change. */
> +		rte_rcu_qsbr_synchronize(lpm->v, RTE_QSBR_THRID_INVALID);
> +		/* Set tbl8 group invalid*/
> +		__atomic_store(&lpm->tbl8[tbl8_group_start], &zero_tbl8_entry,
> +				__ATOMIC_RELAXED);
> +	} else if (lpm->rcu_mode == RTE_LPM_QSBR_MODE_DQ) {
> +		/* Push into QSBR defer queue. */
> +		rte_rcu_qsbr_dq_enqueue(lpm->dq, (void *)&tbl8_group_start);
> +	}
>   }
>   
>   static __rte_noinline int32_t
> @@ -523,7 +621,7 @@ add_depth_big(struct rte_lpm *lpm, uint32_t ip_masked, uint8_t depth,
>   
>   	if (!lpm->tbl24[tbl24_index].valid) {
>   		/* Search for a free tbl8 group. */
> -		tbl8_group_index = tbl8_alloc(lpm->tbl8, lpm->number_tbl8s);
> +		tbl8_group_index = tbl8_alloc(lpm);
>   
>   		/* Check tbl8 allocation was successful. */
>   		if (tbl8_group_index < 0) {
> @@ -569,7 +667,7 @@ add_depth_big(struct rte_lpm *lpm, uint32_t ip_masked, uint8_t depth,
>   	} /* If valid entry but not extended calculate the index into Table8. */
>   	else if (lpm->tbl24[tbl24_index].valid_group == 0) {
>   		/* Search for free tbl8 group. */
> -		tbl8_group_index = tbl8_alloc(lpm->tbl8, lpm->number_tbl8s);
> +		tbl8_group_index = tbl8_alloc(lpm);
>   
>   		if (tbl8_group_index < 0) {
>   			return tbl8_group_index;
> @@ -977,7 +1075,7 @@ delete_depth_big(struct rte_lpm *lpm, uint32_t ip_masked,
>   		 */
>   		lpm->tbl24[tbl24_index].valid = 0;
>   		__atomic_thread_fence(__ATOMIC_RELEASE);
> -		tbl8_free(lpm->tbl8, tbl8_group_start);
> +		tbl8_free(lpm, tbl8_group_start);
>   	} else if (tbl8_recycle_index > -1) {
>   		/* Update tbl24 entry. */
>   		struct rte_lpm_tbl_entry new_tbl24_entry = {
> @@ -993,7 +1091,7 @@ delete_depth_big(struct rte_lpm *lpm, uint32_t ip_masked,
>   		__atomic_store(&lpm->tbl24[tbl24_index], &new_tbl24_entry,
>   				__ATOMIC_RELAXED);
>   		__atomic_thread_fence(__ATOMIC_RELEASE);
> -		tbl8_free(lpm->tbl8, tbl8_group_start);
> +		tbl8_free(lpm, tbl8_group_start);
>   	}
>   #undef group_idx
>   	return 0;
> diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h
> index b9d49ac87..7889f21b3 100644
> --- a/lib/librte_lpm/rte_lpm.h
> +++ b/lib/librte_lpm/rte_lpm.h
> @@ -1,5 +1,6 @@
>   /* SPDX-License-Identifier: BSD-3-Clause
>    * Copyright(c) 2010-2014 Intel Corporation
> + * Copyright(c) 2020 Arm Limited
>    */
>   
>   #ifndef _RTE_LPM_H_
> @@ -20,6 +21,7 @@
>   #include <rte_memory.h>
>   #include <rte_common.h>
>   #include <rte_vect.h>
> +#include <rte_rcu_qsbr.h>
>   
>   #ifdef __cplusplus
>   extern "C" {
> @@ -62,6 +64,17 @@ extern "C" {
>   /** Bitmask used to indicate successful lookup */
>   #define RTE_LPM_LOOKUP_SUCCESS          0x01000000
>   
> +/** @internal Default RCU defer queue entries to reclaim in one go. */
> +#define RTE_LPM_RCU_DQ_RECLAIM_MAX	16
> +
> +/** RCU reclamation modes */
> +enum rte_lpm_qsbr_mode {
> +	/** Create defer queue for reclaim. */
> +	RTE_LPM_QSBR_MODE_DQ = 0,
> +	/** Use blocking mode reclaim. No defer queue created. */
> +	RTE_LPM_QSBR_MODE_SYNC
> +};
> +
>   #if RTE_BYTE_ORDER == RTE_LITTLE_ENDIAN
>   /** @internal Tbl24 entry structure. */
>   __extension__
> @@ -130,6 +143,28 @@ struct rte_lpm {
>   			__rte_cache_aligned; /**< LPM tbl24 table. */
>   	struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */
>   	struct rte_lpm_rule *rules_tbl; /**< LPM rules. */
> +#ifdef ALLOW_EXPERIMENTAL_API
> +	/* RCU config. */
> +	struct rte_rcu_qsbr *v;		/* RCU QSBR variable. */
> +	enum rte_lpm_qsbr_mode rcu_mode;/* Blocking, defer queue. */
> +	struct rte_rcu_qsbr_dq *dq;	/* RCU QSBR defer queue. */
> +#endif
> +};
> +
> +/** LPM RCU QSBR configuration structure. */
> +struct rte_lpm_rcu_config {
> +	struct rte_rcu_qsbr *v;	/* RCU QSBR variable. */
> +	/* Mode of RCU QSBR. RTE_LPM_QSBR_MODE_xxx
> +	 * '0' for default: create defer queue for reclaim.
> +	 */
> +	enum rte_lpm_qsbr_mode mode;
> +	uint32_t dq_size;	/* RCU defer queue size.
> +				 * default: lpm->number_tbl8s.
> +				 */
> +	uint32_t reclaim_thd;	/* Threshold to trigger auto reclaim. */
> +	uint32_t reclaim_max;	/* Max entries to reclaim in one go.
> +				 * default: RTE_LPM_RCU_DQ_RECLAIM_MAX.
> +				 */
>   };
>   
>   /**
> @@ -179,6 +214,30 @@ rte_lpm_find_existing(const char *name);
>   void
>   rte_lpm_free(struct rte_lpm *lpm);
>   
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Associate RCU QSBR variable with an LPM object.
> + *
> + * @param lpm
> + *   the lpm object to add RCU QSBR
> + * @param cfg
> + *   RCU QSBR configuration
> + * @param dq
> + *   handler of created RCU QSBR defer queue
> + * @return
> + *   On success - 0
> + *   On error - 1 with error code set in rte_errno.
> + *   Possible rte_errno codes are:
> + *   - EINVAL - invalid pointer
> + *   - EEXIST - already added QSBR
> + *   - ENOMEM - memory allocation failure
> + */
> +__rte_experimental
> +int rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_lpm_rcu_config *cfg,
> +	struct rte_rcu_qsbr_dq **dq);
> +
>   /**
>    * Add a rule to the LPM table.
>    *
> diff --git a/lib/librte_lpm/rte_lpm_version.map b/lib/librte_lpm/rte_lpm_version.map
> index 500f58b80..bfccd7eac 100644
> --- a/lib/librte_lpm/rte_lpm_version.map
> +++ b/lib/librte_lpm/rte_lpm_version.map
> @@ -21,3 +21,9 @@ DPDK_20.0 {
>   
>   	local: *;
>   };
> +
> +EXPERIMENTAL {
> +	global:
> +
> +	rte_lpm_rcu_qsbr_add;
> +};

Acked-by: Vladimir Medvedkin <vladimir.medvedkin@intel.com>
  
David Marchand July 8, 2020, 2:30 p.m. UTC | #2
On Tue, Jul 7, 2020 at 5:16 PM Ruifeng Wang <ruifeng.wang@arm.com> wrote:
> diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h
> index b9d49ac87..7889f21b3 100644
> --- a/lib/librte_lpm/rte_lpm.h
> +++ b/lib/librte_lpm/rte_lpm.h
> @@ -1,5 +1,6 @@
>  /* SPDX-License-Identifier: BSD-3-Clause
>   * Copyright(c) 2010-2014 Intel Corporation
> + * Copyright(c) 2020 Arm Limited
>   */
>
>  #ifndef _RTE_LPM_H_
> @@ -20,6 +21,7 @@
>  #include <rte_memory.h>
>  #include <rte_common.h>
>  #include <rte_vect.h>
> +#include <rte_rcu_qsbr.h>
>
>  #ifdef __cplusplus
>  extern "C" {
> @@ -62,6 +64,17 @@ extern "C" {
>  /** Bitmask used to indicate successful lookup */
>  #define RTE_LPM_LOOKUP_SUCCESS          0x01000000
>
> +/** @internal Default RCU defer queue entries to reclaim in one go. */
> +#define RTE_LPM_RCU_DQ_RECLAIM_MAX     16
> +
> +/** RCU reclamation modes */
> +enum rte_lpm_qsbr_mode {
> +       /** Create defer queue for reclaim. */
> +       RTE_LPM_QSBR_MODE_DQ = 0,
> +       /** Use blocking mode reclaim. No defer queue created. */
> +       RTE_LPM_QSBR_MODE_SYNC
> +};
> +
>  #if RTE_BYTE_ORDER == RTE_LITTLE_ENDIAN
>  /** @internal Tbl24 entry structure. */
>  __extension__
> @@ -130,6 +143,28 @@ struct rte_lpm {
>                         __rte_cache_aligned; /**< LPM tbl24 table. */
>         struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */
>         struct rte_lpm_rule *rules_tbl; /**< LPM rules. */
> +#ifdef ALLOW_EXPERIMENTAL_API
> +       /* RCU config. */
> +       struct rte_rcu_qsbr *v;         /* RCU QSBR variable. */
> +       enum rte_lpm_qsbr_mode rcu_mode;/* Blocking, defer queue. */
> +       struct rte_rcu_qsbr_dq *dq;     /* RCU QSBR defer queue. */
> +#endif
> +};

I can see failures in travis reports for v7 and v6.
I reproduced them in my env.

1 function with some indirect sub-type change:

  [C]'function int rte_lpm_add(rte_lpm*, uint32_t, uint8_t, uint32_t)'
at rte_lpm.c:764:1 has some indirect sub-type changes:
    parameter 1 of type 'rte_lpm*' has sub-type changes:
      in pointed to type 'struct rte_lpm' at rte_lpm.h:134:1:
        type size hasn't changed
        3 data member insertions:
          'rte_rcu_qsbr* rte_lpm::v', at offset 536873600 (in bits) at
rte_lpm.h:148:1
          'rte_lpm_qsbr_mode rte_lpm::rcu_mode', at offset 536873664
(in bits) at rte_lpm.h:149:1
          'rte_rcu_qsbr_dq* rte_lpm::dq', at offset 536873728 (in
bits) at rte_lpm.h:150:1


Going back to my proposal of hiding what does not need to be seen.

Disclaimer, *this is quick & dirty* but it builds and passes ABI check:

$ git diff
diff --git a/lib/librte_lpm/rte_lpm.c b/lib/librte_lpm/rte_lpm.c
index d498ba761..7109aef6a 100644
--- a/lib/librte_lpm/rte_lpm.c
+++ b/lib/librte_lpm/rte_lpm.c
@@ -115,6 +115,15 @@ rte_lpm_find_existing(const char *name)
        return l;
 }

+struct internal_lpm {
+       /* Public object */
+       struct rte_lpm lpm;
+       /* RCU config. */
+       struct rte_rcu_qsbr *v;         /* RCU QSBR variable. */
+       enum rte_lpm_qsbr_mode rcu_mode;/* Blocking, defer queue. */
+       struct rte_rcu_qsbr_dq *dq;     /* RCU QSBR defer queue. */
+};
+
 /*
  * Allocates memory for LPM object
  */
@@ -123,6 +132,7 @@ rte_lpm_create(const char *name, int socket_id,
                const struct rte_lpm_config *config)
 {
        char mem_name[RTE_LPM_NAMESIZE];
+       struct internal_lpm *internal = NULL;
        struct rte_lpm *lpm = NULL;
        struct rte_tailq_entry *te;
        uint32_t mem_size, rules_size, tbl8s_size;
@@ -141,12 +151,6 @@ rte_lpm_create(const char *name, int socket_id,

        snprintf(mem_name, sizeof(mem_name), "LPM_%s", name);

-       /* Determine the amount of memory to allocate. */
-       mem_size = sizeof(*lpm);
-       rules_size = sizeof(struct rte_lpm_rule) * config->max_rules;
-       tbl8s_size = (sizeof(struct rte_lpm_tbl_entry) *
-                       RTE_LPM_TBL8_GROUP_NUM_ENTRIES * config->number_tbl8s);
-
        rte_mcfg_tailq_write_lock();

        /* guarantee there's no existing */
@@ -170,16 +174,23 @@ rte_lpm_create(const char *name, int socket_id,
                goto exit;
        }

+       /* Determine the amount of memory to allocate. */
+       mem_size = sizeof(*internal);
+       rules_size = sizeof(struct rte_lpm_rule) * config->max_rules;
+       tbl8s_size = (sizeof(struct rte_lpm_tbl_entry) *
+                       RTE_LPM_TBL8_GROUP_NUM_ENTRIES * config->number_tbl8s);
+
        /* Allocate memory to store the LPM data structures. */
-       lpm = rte_zmalloc_socket(mem_name, mem_size,
+       internal = rte_zmalloc_socket(mem_name, mem_size,
                        RTE_CACHE_LINE_SIZE, socket_id);
-       if (lpm == NULL) {
+       if (internal == NULL) {
                RTE_LOG(ERR, LPM, "LPM memory allocation failed\n");
                rte_free(te);
                rte_errno = ENOMEM;
                goto exit;
        }

+       lpm = &internal->lpm;
        lpm->rules_tbl = rte_zmalloc_socket(NULL,
                        (size_t)rules_size, RTE_CACHE_LINE_SIZE, socket_id);

@@ -226,6 +237,7 @@ rte_lpm_create(const char *name, int socket_id,
 void
 rte_lpm_free(struct rte_lpm *lpm)
 {
+       struct internal_lpm *internal;
        struct rte_lpm_list *lpm_list;
        struct rte_tailq_entry *te;

@@ -247,8 +259,9 @@ rte_lpm_free(struct rte_lpm *lpm)

        rte_mcfg_tailq_write_unlock();

-       if (lpm->dq)
-               rte_rcu_qsbr_dq_delete(lpm->dq);
+       internal = container_of(lpm, struct internal_lpm, lpm);
+       if (internal->dq != NULL)
+               rte_rcu_qsbr_dq_delete(internal->dq);
        rte_free(lpm->tbl8);
        rte_free(lpm->rules_tbl);
        rte_free(lpm);
@@ -276,13 +289,15 @@ rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct
rte_lpm_rcu_config *cfg,
 {
        char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
        struct rte_rcu_qsbr_dq_parameters params = {0};
+       struct internal_lpm *internal;

-       if ((lpm == NULL) || (cfg == NULL)) {
+       if (lpm == NULL || cfg == NULL) {
                rte_errno = EINVAL;
                return 1;
        }

-       if (lpm->v) {
+       internal = container_of(lpm, struct internal_lpm, lpm);
+       if (internal->v != NULL) {
                rte_errno = EEXIST;
                return 1;
        }
@@ -305,20 +320,19 @@ rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct
rte_lpm_rcu_config *cfg,
                params.free_fn = __lpm_rcu_qsbr_free_resource;
                params.p = lpm;
                params.v = cfg->v;
-               lpm->dq = rte_rcu_qsbr_dq_create(&params);
-               if (lpm->dq == NULL) {
-                       RTE_LOG(ERR, LPM,
-                                       "LPM QS defer queue creation failed\n");
+               internal->dq = rte_rcu_qsbr_dq_create(&params);
+               if (internal->dq == NULL) {
+                       RTE_LOG(ERR, LPM, "LPM QS defer queue creation
failed\n");
                        return 1;
                }
                if (dq)
-                       *dq = lpm->dq;
+                       *dq = internal->dq;
        } else {
                rte_errno = EINVAL;
                return 1;
        }
-       lpm->rcu_mode = cfg->mode;
-       lpm->v = cfg->v;
+       internal->rcu_mode = cfg->mode;
+       internal->v = cfg->v;

        return 0;
 }
@@ -502,12 +516,13 @@ _tbl8_alloc(struct rte_lpm *lpm)
 static int32_t
 tbl8_alloc(struct rte_lpm *lpm)
 {
+       struct internal_lpm *internal = container_of(lpm, struct
internal_lpm, lpm);
        int32_t group_idx; /* tbl8 group index. */

        group_idx = _tbl8_alloc(lpm);
-       if ((group_idx == -ENOSPC) && (lpm->dq != NULL)) {
+       if (group_idx == -ENOSPC && internal->dq != NULL) {
                /* If there are no tbl8 groups try to reclaim one. */
-               if (rte_rcu_qsbr_dq_reclaim(lpm->dq, 1, NULL, NULL, NULL) == 0)
+               if (rte_rcu_qsbr_dq_reclaim(internal->dq, 1, NULL,
NULL, NULL) == 0)
                        group_idx = _tbl8_alloc(lpm);
        }

@@ -518,20 +533,21 @@ static void
 tbl8_free(struct rte_lpm *lpm, uint32_t tbl8_group_start)
 {
        struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
+       struct internal_lpm *internal = container_of(lpm, struct
internal_lpm, lpm);

-       if (!lpm->v) {
+       if (internal->v == NULL) {
                /* Set tbl8 group invalid*/
                __atomic_store(&lpm->tbl8[tbl8_group_start], &zero_tbl8_entry,
                                __ATOMIC_RELAXED);
-       } else if (lpm->rcu_mode == RTE_LPM_QSBR_MODE_SYNC) {
+       } else if (internal->rcu_mode == RTE_LPM_QSBR_MODE_SYNC) {
                /* Wait for quiescent state change. */
-               rte_rcu_qsbr_synchronize(lpm->v, RTE_QSBR_THRID_INVALID);
+               rte_rcu_qsbr_synchronize(internal->v, RTE_QSBR_THRID_INVALID);
                /* Set tbl8 group invalid*/
                __atomic_store(&lpm->tbl8[tbl8_group_start], &zero_tbl8_entry,
                                __ATOMIC_RELAXED);
-       } else if (lpm->rcu_mode == RTE_LPM_QSBR_MODE_DQ) {
+       } else if (internal->rcu_mode == RTE_LPM_QSBR_MODE_DQ) {
                /* Push into QSBR defer queue. */
-               rte_rcu_qsbr_dq_enqueue(lpm->dq, (void *)&tbl8_group_start);
+               rte_rcu_qsbr_dq_enqueue(internal->dq, (void
*)&tbl8_group_start);
        }
 }

diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h
index 7889f21b3..a9568fcdd 100644
--- a/lib/librte_lpm/rte_lpm.h
+++ b/lib/librte_lpm/rte_lpm.h
@@ -143,12 +143,6 @@ struct rte_lpm {
                        __rte_cache_aligned; /**< LPM tbl24 table. */
        struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */
        struct rte_lpm_rule *rules_tbl; /**< LPM rules. */
-#ifdef ALLOW_EXPERIMENTAL_API
-       /* RCU config. */
-       struct rte_rcu_qsbr *v;         /* RCU QSBR variable. */
-       enum rte_lpm_qsbr_mode rcu_mode;/* Blocking, defer queue. */
-       struct rte_rcu_qsbr_dq *dq;     /* RCU QSBR defer queue. */
-#endif
 };

 /** LPM RCU QSBR configuration structure. */
  
Ruifeng Wang July 8, 2020, 3:34 p.m. UTC | #3
> -----Original Message-----
> From: David Marchand <david.marchand@redhat.com>
> Sent: Wednesday, July 8, 2020 10:30 PM
> To: Ruifeng Wang <Ruifeng.Wang@arm.com>
> Cc: Bruce Richardson <bruce.richardson@intel.com>; Vladimir Medvedkin
> <vladimir.medvedkin@intel.com>; John McNamara
> <john.mcnamara@intel.com>; Marko Kovacevic
> <marko.kovacevic@intel.com>; Ray Kinsella <mdr@ashroe.eu>; Neil Horman
> <nhorman@tuxdriver.com>; dev <dev@dpdk.org>; Ananyev, Konstantin
> <konstantin.ananyev@intel.com>; Honnappa Nagarahalli
> <Honnappa.Nagarahalli@arm.com>; nd <nd@arm.com>
> Subject: Re: [dpdk-dev] [PATCH v7 1/3] lib/lpm: integrate RCU QSBR
> 
> On Tue, Jul 7, 2020 at 5:16 PM Ruifeng Wang <ruifeng.wang@arm.com>
> wrote:
> > diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h index
> > b9d49ac87..7889f21b3 100644
> > --- a/lib/librte_lpm/rte_lpm.h
> > +++ b/lib/librte_lpm/rte_lpm.h
> > @@ -1,5 +1,6 @@
> >  /* SPDX-License-Identifier: BSD-3-Clause
> >   * Copyright(c) 2010-2014 Intel Corporation
> > + * Copyright(c) 2020 Arm Limited
> >   */
> >
> >  #ifndef _RTE_LPM_H_
> > @@ -20,6 +21,7 @@
> >  #include <rte_memory.h>
> >  #include <rte_common.h>
> >  #include <rte_vect.h>
> > +#include <rte_rcu_qsbr.h>
> >
> >  #ifdef __cplusplus
> >  extern "C" {
> > @@ -62,6 +64,17 @@ extern "C" {
> >  /** Bitmask used to indicate successful lookup */
> >  #define RTE_LPM_LOOKUP_SUCCESS          0x01000000
> >
> > +/** @internal Default RCU defer queue entries to reclaim in one go. */
> > +#define RTE_LPM_RCU_DQ_RECLAIM_MAX     16
> > +
> > +/** RCU reclamation modes */
> > +enum rte_lpm_qsbr_mode {
> > +       /** Create defer queue for reclaim. */
> > +       RTE_LPM_QSBR_MODE_DQ = 0,
> > +       /** Use blocking mode reclaim. No defer queue created. */
> > +       RTE_LPM_QSBR_MODE_SYNC
> > +};
> > +
> >  #if RTE_BYTE_ORDER == RTE_LITTLE_ENDIAN
> >  /** @internal Tbl24 entry structure. */  __extension__ @@ -130,6
> > +143,28 @@ struct rte_lpm {
> >                         __rte_cache_aligned; /**< LPM tbl24 table. */
> >         struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */
> >         struct rte_lpm_rule *rules_tbl; /**< LPM rules. */
> > +#ifdef ALLOW_EXPERIMENTAL_API
> > +       /* RCU config. */
> > +       struct rte_rcu_qsbr *v;         /* RCU QSBR variable. */
> > +       enum rte_lpm_qsbr_mode rcu_mode;/* Blocking, defer queue. */
> > +       struct rte_rcu_qsbr_dq *dq;     /* RCU QSBR defer queue. */
> > +#endif
> > +};
> 
> I can see failures in travis reports for v7 and v6.
> I reproduced them in my env.
> 
> 1 function with some indirect sub-type change:
> 
>   [C]'function int rte_lpm_add(rte_lpm*, uint32_t, uint8_t, uint32_t)'
> at rte_lpm.c:764:1 has some indirect sub-type changes:
>     parameter 1 of type 'rte_lpm*' has sub-type changes:
>       in pointed to type 'struct rte_lpm' at rte_lpm.h:134:1:
>         type size hasn't changed
>         3 data member insertions:
>           'rte_rcu_qsbr* rte_lpm::v', at offset 536873600 (in bits) at
> rte_lpm.h:148:1
>           'rte_lpm_qsbr_mode rte_lpm::rcu_mode', at offset 536873664 (in bits)
> at rte_lpm.h:149:1
>           'rte_rcu_qsbr_dq* rte_lpm::dq', at offset 536873728 (in
> bits) at rte_lpm.h:150:1
> 
Sorry, I thought if ALLOW_EXPERIMENTAL was added, ABI would be kept when experimental was not allowed by user.
ABI and ALLOW_EXPERIMENTAL should be two different things.

> 
> Going back to my proposal of hiding what does not need to be seen.
> 
> Disclaimer, *this is quick & dirty* but it builds and passes ABI check:
> 
> $ git diff
> diff --git a/lib/librte_lpm/rte_lpm.c b/lib/librte_lpm/rte_lpm.c index
> d498ba761..7109aef6a 100644
> --- a/lib/librte_lpm/rte_lpm.c
> +++ b/lib/librte_lpm/rte_lpm.c
I understand your proposal in v5 now. A new data structure encloses rte_lpm and new members that for RCU use.
In this way, rte_lpm ABI is kept. And we can move out other members in rte_lpm that not need to be exposed in 20.11 release.
I will fix the ABI issue in next version.

> @@ -115,6 +115,15 @@ rte_lpm_find_existing(const char *name)
>         return l;
>  }
> 
> +struct internal_lpm {
> +       /* Public object */
> +       struct rte_lpm lpm;
> +       /* RCU config. */
> +       struct rte_rcu_qsbr *v;         /* RCU QSBR variable. */
> +       enum rte_lpm_qsbr_mode rcu_mode;/* Blocking, defer queue. */
> +       struct rte_rcu_qsbr_dq *dq;     /* RCU QSBR defer queue. */
> +};
> +
>  /*
>   * Allocates memory for LPM object
>   */
> @@ -123,6 +132,7 @@ rte_lpm_create(const char *name, int socket_id,
>                 const struct rte_lpm_config *config)  {
>         char mem_name[RTE_LPM_NAMESIZE];
> +       struct internal_lpm *internal = NULL;
>         struct rte_lpm *lpm = NULL;
>         struct rte_tailq_entry *te;
>         uint32_t mem_size, rules_size, tbl8s_size; @@ -141,12 +151,6 @@
> rte_lpm_create(const char *name, int socket_id,
> 
>         snprintf(mem_name, sizeof(mem_name), "LPM_%s", name);
> 
> -       /* Determine the amount of memory to allocate. */
> -       mem_size = sizeof(*lpm);
> -       rules_size = sizeof(struct rte_lpm_rule) * config->max_rules;
> -       tbl8s_size = (sizeof(struct rte_lpm_tbl_entry) *
> -                       RTE_LPM_TBL8_GROUP_NUM_ENTRIES * config-
> >number_tbl8s);
> -
>         rte_mcfg_tailq_write_lock();
> 
>         /* guarantee there's no existing */ @@ -170,16 +174,23 @@
> rte_lpm_create(const char *name, int socket_id,
>                 goto exit;
>         }
> 
> +       /* Determine the amount of memory to allocate. */
> +       mem_size = sizeof(*internal);
> +       rules_size = sizeof(struct rte_lpm_rule) * config->max_rules;
> +       tbl8s_size = (sizeof(struct rte_lpm_tbl_entry) *
> +                       RTE_LPM_TBL8_GROUP_NUM_ENTRIES *
> + config->number_tbl8s);
> +
>         /* Allocate memory to store the LPM data structures. */
> -       lpm = rte_zmalloc_socket(mem_name, mem_size,
> +       internal = rte_zmalloc_socket(mem_name, mem_size,
>                         RTE_CACHE_LINE_SIZE, socket_id);
> -       if (lpm == NULL) {
> +       if (internal == NULL) {
>                 RTE_LOG(ERR, LPM, "LPM memory allocation failed\n");
>                 rte_free(te);
>                 rte_errno = ENOMEM;
>                 goto exit;
>         }
> 
> +       lpm = &internal->lpm;
>         lpm->rules_tbl = rte_zmalloc_socket(NULL,
>                         (size_t)rules_size, RTE_CACHE_LINE_SIZE, socket_id);
> 
> @@ -226,6 +237,7 @@ rte_lpm_create(const char *name, int socket_id,
> void  rte_lpm_free(struct rte_lpm *lpm)  {
> +       struct internal_lpm *internal;
>         struct rte_lpm_list *lpm_list;
>         struct rte_tailq_entry *te;
> 
> @@ -247,8 +259,9 @@ rte_lpm_free(struct rte_lpm *lpm)
> 
>         rte_mcfg_tailq_write_unlock();
> 
> -       if (lpm->dq)
> -               rte_rcu_qsbr_dq_delete(lpm->dq);
> +       internal = container_of(lpm, struct internal_lpm, lpm);
> +       if (internal->dq != NULL)
> +               rte_rcu_qsbr_dq_delete(internal->dq);
>         rte_free(lpm->tbl8);
>         rte_free(lpm->rules_tbl);
>         rte_free(lpm);
> @@ -276,13 +289,15 @@ rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct
> rte_lpm_rcu_config *cfg,  {
>         char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
>         struct rte_rcu_qsbr_dq_parameters params = {0};
> +       struct internal_lpm *internal;
> 
> -       if ((lpm == NULL) || (cfg == NULL)) {
> +       if (lpm == NULL || cfg == NULL) {
>                 rte_errno = EINVAL;
>                 return 1;
>         }
> 
> -       if (lpm->v) {
> +       internal = container_of(lpm, struct internal_lpm, lpm);
> +       if (internal->v != NULL) {
>                 rte_errno = EEXIST;
>                 return 1;
>         }
> @@ -305,20 +320,19 @@ rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct
> rte_lpm_rcu_config *cfg,
>                 params.free_fn = __lpm_rcu_qsbr_free_resource;
>                 params.p = lpm;
>                 params.v = cfg->v;
> -               lpm->dq = rte_rcu_qsbr_dq_create(&params);
> -               if (lpm->dq == NULL) {
> -                       RTE_LOG(ERR, LPM,
> -                                       "LPM QS defer queue creation failed\n");
> +               internal->dq = rte_rcu_qsbr_dq_create(&params);
> +               if (internal->dq == NULL) {
> +                       RTE_LOG(ERR, LPM, "LPM QS defer queue creation
> failed\n");
>                         return 1;
>                 }
>                 if (dq)
> -                       *dq = lpm->dq;
> +                       *dq = internal->dq;
>         } else {
>                 rte_errno = EINVAL;
>                 return 1;
>         }
> -       lpm->rcu_mode = cfg->mode;
> -       lpm->v = cfg->v;
> +       internal->rcu_mode = cfg->mode;
> +       internal->v = cfg->v;
> 
>         return 0;
>  }
> @@ -502,12 +516,13 @@ _tbl8_alloc(struct rte_lpm *lpm)  static int32_t
> tbl8_alloc(struct rte_lpm *lpm)  {
> +       struct internal_lpm *internal = container_of(lpm, struct
> internal_lpm, lpm);
>         int32_t group_idx; /* tbl8 group index. */
> 
>         group_idx = _tbl8_alloc(lpm);
> -       if ((group_idx == -ENOSPC) && (lpm->dq != NULL)) {
> +       if (group_idx == -ENOSPC && internal->dq != NULL) {
>                 /* If there are no tbl8 groups try to reclaim one. */
> -               if (rte_rcu_qsbr_dq_reclaim(lpm->dq, 1, NULL, NULL, NULL) == 0)
> +               if (rte_rcu_qsbr_dq_reclaim(internal->dq, 1, NULL,
> NULL, NULL) == 0)
>                         group_idx = _tbl8_alloc(lpm);
>         }
> 
> @@ -518,20 +533,21 @@ static void
>  tbl8_free(struct rte_lpm *lpm, uint32_t tbl8_group_start)  {
>         struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> +       struct internal_lpm *internal = container_of(lpm, struct
> internal_lpm, lpm);
> 
> -       if (!lpm->v) {
> +       if (internal->v == NULL) {
>                 /* Set tbl8 group invalid*/
>                 __atomic_store(&lpm->tbl8[tbl8_group_start], &zero_tbl8_entry,
>                                 __ATOMIC_RELAXED);
> -       } else if (lpm->rcu_mode == RTE_LPM_QSBR_MODE_SYNC) {
> +       } else if (internal->rcu_mode == RTE_LPM_QSBR_MODE_SYNC) {
>                 /* Wait for quiescent state change. */
> -               rte_rcu_qsbr_synchronize(lpm->v, RTE_QSBR_THRID_INVALID);
> +               rte_rcu_qsbr_synchronize(internal->v,
> + RTE_QSBR_THRID_INVALID);
>                 /* Set tbl8 group invalid*/
>                 __atomic_store(&lpm->tbl8[tbl8_group_start], &zero_tbl8_entry,
>                                 __ATOMIC_RELAXED);
> -       } else if (lpm->rcu_mode == RTE_LPM_QSBR_MODE_DQ) {
> +       } else if (internal->rcu_mode == RTE_LPM_QSBR_MODE_DQ) {
>                 /* Push into QSBR defer queue. */
> -               rte_rcu_qsbr_dq_enqueue(lpm->dq, (void *)&tbl8_group_start);
> +               rte_rcu_qsbr_dq_enqueue(internal->dq, (void
> *)&tbl8_group_start);
>         }
>  }
> 
> diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h index
> 7889f21b3..a9568fcdd 100644
> --- a/lib/librte_lpm/rte_lpm.h
> +++ b/lib/librte_lpm/rte_lpm.h
> @@ -143,12 +143,6 @@ struct rte_lpm {
>                         __rte_cache_aligned; /**< LPM tbl24 table. */
>         struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */
>         struct rte_lpm_rule *rules_tbl; /**< LPM rules. */ -#ifdef
> ALLOW_EXPERIMENTAL_API
> -       /* RCU config. */
> -       struct rte_rcu_qsbr *v;         /* RCU QSBR variable. */
> -       enum rte_lpm_qsbr_mode rcu_mode;/* Blocking, defer queue. */
> -       struct rte_rcu_qsbr_dq *dq;     /* RCU QSBR defer queue. */
> -#endif
>  };
> 
>  /** LPM RCU QSBR configuration structure. */
> 
> 
> 
> 
> --
> David Marchand
  

Patch

diff --git a/doc/guides/prog_guide/lpm_lib.rst b/doc/guides/prog_guide/lpm_lib.rst
index 1609a57d0..03945904b 100644
--- a/doc/guides/prog_guide/lpm_lib.rst
+++ b/doc/guides/prog_guide/lpm_lib.rst
@@ -145,6 +145,38 @@  depending on whether we need to move to the next table or not.
 Prefix expansion is one of the keys of this algorithm,
 since it improves the speed dramatically by adding redundancy.
 
+Deletion
+~~~~~~~~
+
+When deleting a rule, a replacement rule is searched for. Replacement rule is an existing rule that has
+the longest prefix match with the rule to be deleted, but has shorter prefix.
+
+If a replacement rule is found, target tbl24 and tbl8 entries are updated to have the same depth and next hop
+value with the replacement rule.
+
+If no replacement rule can be found, target tbl24 and tbl8 entries will be cleared.
+
+Prefix expansion is performed if the rule's depth is not exactly 24 bits or 32 bits.
+
+After deleting a rule, a group of tbl8s that belongs to the same tbl24 entry are freed in following cases:
+
+*   All tbl8s in the group are empty .
+
+*   All tbl8s in the group have the same values and with depth no greater than 24.
+
+Free of tbl8s have different behaviors:
+
+*   If RCU is not used, tbl8s are cleared and reclaimed immediately.
+
+*   If RCU is used, tbl8s are reclaimed when readers are in quiescent state.
+
+When the LPM is not using RCU, tbl8 group can be freed immediately even though the readers might be using
+the tbl8 group entries. This might result in incorrect lookup results.
+
+RCU QSBR process is integrated for safe tbl8 group reclamation. Application has certain responsibilities
+while using this feature. Please refer to resource reclamation framework of :ref:`RCU library <RCU_Library>`
+for more details.
+
 Lookup
 ~~~~~~
 
diff --git a/lib/librte_lpm/Makefile b/lib/librte_lpm/Makefile
index d682785b6..6f06c5c03 100644
--- a/lib/librte_lpm/Makefile
+++ b/lib/librte_lpm/Makefile
@@ -8,7 +8,7 @@  LIB = librte_lpm.a
 
 CFLAGS += -O3
 CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
-LDLIBS += -lrte_eal -lrte_hash
+LDLIBS += -lrte_eal -lrte_hash -lrte_rcu
 
 EXPORT_MAP := rte_lpm_version.map
 
diff --git a/lib/librte_lpm/meson.build b/lib/librte_lpm/meson.build
index 021ac6d8d..6cfc083c5 100644
--- a/lib/librte_lpm/meson.build
+++ b/lib/librte_lpm/meson.build
@@ -7,3 +7,4 @@  headers = files('rte_lpm.h', 'rte_lpm6.h')
 # without worrying about which architecture we actually need
 headers += files('rte_lpm_altivec.h', 'rte_lpm_neon.h', 'rte_lpm_sse.h')
 deps += ['hash']
+deps += ['rcu']
diff --git a/lib/librte_lpm/rte_lpm.c b/lib/librte_lpm/rte_lpm.c
index 38ab512a4..d498ba761 100644
--- a/lib/librte_lpm/rte_lpm.c
+++ b/lib/librte_lpm/rte_lpm.c
@@ -1,5 +1,6 @@ 
 /* SPDX-License-Identifier: BSD-3-Clause
  * Copyright(c) 2010-2014 Intel Corporation
+ * Copyright(c) 2020 Arm Limited
  */
 
 #include <string.h>
@@ -246,12 +247,82 @@  rte_lpm_free(struct rte_lpm *lpm)
 
 	rte_mcfg_tailq_write_unlock();
 
+	if (lpm->dq)
+		rte_rcu_qsbr_dq_delete(lpm->dq);
 	rte_free(lpm->tbl8);
 	rte_free(lpm->rules_tbl);
 	rte_free(lpm);
 	rte_free(te);
 }
 
+static void
+__lpm_rcu_qsbr_free_resource(void *p, void *data, unsigned int n)
+{
+	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
+	uint32_t tbl8_group_index = *(uint32_t *)data;
+	struct rte_lpm_tbl_entry *tbl8 = ((struct rte_lpm *)p)->tbl8;
+
+	RTE_SET_USED(n);
+	/* Set tbl8 group invalid */
+	__atomic_store(&tbl8[tbl8_group_index], &zero_tbl8_entry,
+		__ATOMIC_RELAXED);
+}
+
+/* Associate QSBR variable with an LPM object.
+ */
+int
+rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_lpm_rcu_config *cfg,
+	struct rte_rcu_qsbr_dq **dq)
+{
+	char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
+	struct rte_rcu_qsbr_dq_parameters params = {0};
+
+	if ((lpm == NULL) || (cfg == NULL)) {
+		rte_errno = EINVAL;
+		return 1;
+	}
+
+	if (lpm->v) {
+		rte_errno = EEXIST;
+		return 1;
+	}
+
+	if (cfg->mode == RTE_LPM_QSBR_MODE_SYNC) {
+		/* No other things to do. */
+	} else if (cfg->mode == RTE_LPM_QSBR_MODE_DQ) {
+		/* Init QSBR defer queue. */
+		snprintf(rcu_dq_name, sizeof(rcu_dq_name),
+				"LPM_RCU_%s", lpm->name);
+		params.name = rcu_dq_name;
+		params.size = cfg->dq_size;
+		if (params.size == 0)
+			params.size = lpm->number_tbl8s;
+		params.trigger_reclaim_limit = cfg->reclaim_thd;
+		params.max_reclaim_size = cfg->reclaim_max;
+		if (params.max_reclaim_size == 0)
+			params.max_reclaim_size = RTE_LPM_RCU_DQ_RECLAIM_MAX;
+		params.esize = sizeof(uint32_t);	/* tbl8 group index */
+		params.free_fn = __lpm_rcu_qsbr_free_resource;
+		params.p = lpm;
+		params.v = cfg->v;
+		lpm->dq = rte_rcu_qsbr_dq_create(&params);
+		if (lpm->dq == NULL) {
+			RTE_LOG(ERR, LPM,
+					"LPM QS defer queue creation failed\n");
+			return 1;
+		}
+		if (dq)
+			*dq = lpm->dq;
+	} else {
+		rte_errno = EINVAL;
+		return 1;
+	}
+	lpm->rcu_mode = cfg->mode;
+	lpm->v = cfg->v;
+
+	return 0;
+}
+
 /*
  * Adds a rule to the rule table.
  *
@@ -394,14 +465,15 @@  rule_find(struct rte_lpm *lpm, uint32_t ip_masked, uint8_t depth)
  * Find, clean and allocate a tbl8.
  */
 static int32_t
-tbl8_alloc(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
+_tbl8_alloc(struct rte_lpm *lpm)
 {
 	uint32_t group_idx; /* tbl8 group index. */
 	struct rte_lpm_tbl_entry *tbl8_entry;
 
 	/* Scan through tbl8 to find a free (i.e. INVALID) tbl8 group. */
-	for (group_idx = 0; group_idx < number_tbl8s; group_idx++) {
-		tbl8_entry = &tbl8[group_idx * RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
+	for (group_idx = 0; group_idx < lpm->number_tbl8s; group_idx++) {
+		tbl8_entry = &lpm->tbl8[group_idx *
+					RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
 		/* If a free tbl8 group is found clean it and set as VALID. */
 		if (!tbl8_entry->valid_group) {
 			struct rte_lpm_tbl_entry new_tbl8_entry = {
@@ -427,14 +499,40 @@  tbl8_alloc(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
 	return -ENOSPC;
 }
 
+static int32_t
+tbl8_alloc(struct rte_lpm *lpm)
+{
+	int32_t group_idx; /* tbl8 group index. */
+
+	group_idx = _tbl8_alloc(lpm);
+	if ((group_idx == -ENOSPC) && (lpm->dq != NULL)) {
+		/* If there are no tbl8 groups try to reclaim one. */
+		if (rte_rcu_qsbr_dq_reclaim(lpm->dq, 1, NULL, NULL, NULL) == 0)
+			group_idx = _tbl8_alloc(lpm);
+	}
+
+	return group_idx;
+}
+
 static void
-tbl8_free(struct rte_lpm_tbl_entry *tbl8, uint32_t tbl8_group_start)
+tbl8_free(struct rte_lpm *lpm, uint32_t tbl8_group_start)
 {
-	/* Set tbl8 group invalid*/
 	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
 
-	__atomic_store(&tbl8[tbl8_group_start], &zero_tbl8_entry,
-			__ATOMIC_RELAXED);
+	if (!lpm->v) {
+		/* Set tbl8 group invalid*/
+		__atomic_store(&lpm->tbl8[tbl8_group_start], &zero_tbl8_entry,
+				__ATOMIC_RELAXED);
+	} else if (lpm->rcu_mode == RTE_LPM_QSBR_MODE_SYNC) {
+		/* Wait for quiescent state change. */
+		rte_rcu_qsbr_synchronize(lpm->v, RTE_QSBR_THRID_INVALID);
+		/* Set tbl8 group invalid*/
+		__atomic_store(&lpm->tbl8[tbl8_group_start], &zero_tbl8_entry,
+				__ATOMIC_RELAXED);
+	} else if (lpm->rcu_mode == RTE_LPM_QSBR_MODE_DQ) {
+		/* Push into QSBR defer queue. */
+		rte_rcu_qsbr_dq_enqueue(lpm->dq, (void *)&tbl8_group_start);
+	}
 }
 
 static __rte_noinline int32_t
@@ -523,7 +621,7 @@  add_depth_big(struct rte_lpm *lpm, uint32_t ip_masked, uint8_t depth,
 
 	if (!lpm->tbl24[tbl24_index].valid) {
 		/* Search for a free tbl8 group. */
-		tbl8_group_index = tbl8_alloc(lpm->tbl8, lpm->number_tbl8s);
+		tbl8_group_index = tbl8_alloc(lpm);
 
 		/* Check tbl8 allocation was successful. */
 		if (tbl8_group_index < 0) {
@@ -569,7 +667,7 @@  add_depth_big(struct rte_lpm *lpm, uint32_t ip_masked, uint8_t depth,
 	} /* If valid entry but not extended calculate the index into Table8. */
 	else if (lpm->tbl24[tbl24_index].valid_group == 0) {
 		/* Search for free tbl8 group. */
-		tbl8_group_index = tbl8_alloc(lpm->tbl8, lpm->number_tbl8s);
+		tbl8_group_index = tbl8_alloc(lpm);
 
 		if (tbl8_group_index < 0) {
 			return tbl8_group_index;
@@ -977,7 +1075,7 @@  delete_depth_big(struct rte_lpm *lpm, uint32_t ip_masked,
 		 */
 		lpm->tbl24[tbl24_index].valid = 0;
 		__atomic_thread_fence(__ATOMIC_RELEASE);
-		tbl8_free(lpm->tbl8, tbl8_group_start);
+		tbl8_free(lpm, tbl8_group_start);
 	} else if (tbl8_recycle_index > -1) {
 		/* Update tbl24 entry. */
 		struct rte_lpm_tbl_entry new_tbl24_entry = {
@@ -993,7 +1091,7 @@  delete_depth_big(struct rte_lpm *lpm, uint32_t ip_masked,
 		__atomic_store(&lpm->tbl24[tbl24_index], &new_tbl24_entry,
 				__ATOMIC_RELAXED);
 		__atomic_thread_fence(__ATOMIC_RELEASE);
-		tbl8_free(lpm->tbl8, tbl8_group_start);
+		tbl8_free(lpm, tbl8_group_start);
 	}
 #undef group_idx
 	return 0;
diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h
index b9d49ac87..7889f21b3 100644
--- a/lib/librte_lpm/rte_lpm.h
+++ b/lib/librte_lpm/rte_lpm.h
@@ -1,5 +1,6 @@ 
 /* SPDX-License-Identifier: BSD-3-Clause
  * Copyright(c) 2010-2014 Intel Corporation
+ * Copyright(c) 2020 Arm Limited
  */
 
 #ifndef _RTE_LPM_H_
@@ -20,6 +21,7 @@ 
 #include <rte_memory.h>
 #include <rte_common.h>
 #include <rte_vect.h>
+#include <rte_rcu_qsbr.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -62,6 +64,17 @@  extern "C" {
 /** Bitmask used to indicate successful lookup */
 #define RTE_LPM_LOOKUP_SUCCESS          0x01000000
 
+/** @internal Default RCU defer queue entries to reclaim in one go. */
+#define RTE_LPM_RCU_DQ_RECLAIM_MAX	16
+
+/** RCU reclamation modes */
+enum rte_lpm_qsbr_mode {
+	/** Create defer queue for reclaim. */
+	RTE_LPM_QSBR_MODE_DQ = 0,
+	/** Use blocking mode reclaim. No defer queue created. */
+	RTE_LPM_QSBR_MODE_SYNC
+};
+
 #if RTE_BYTE_ORDER == RTE_LITTLE_ENDIAN
 /** @internal Tbl24 entry structure. */
 __extension__
@@ -130,6 +143,28 @@  struct rte_lpm {
 			__rte_cache_aligned; /**< LPM tbl24 table. */
 	struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */
 	struct rte_lpm_rule *rules_tbl; /**< LPM rules. */
+#ifdef ALLOW_EXPERIMENTAL_API
+	/* RCU config. */
+	struct rte_rcu_qsbr *v;		/* RCU QSBR variable. */
+	enum rte_lpm_qsbr_mode rcu_mode;/* Blocking, defer queue. */
+	struct rte_rcu_qsbr_dq *dq;	/* RCU QSBR defer queue. */
+#endif
+};
+
+/** LPM RCU QSBR configuration structure. */
+struct rte_lpm_rcu_config {
+	struct rte_rcu_qsbr *v;	/* RCU QSBR variable. */
+	/* Mode of RCU QSBR. RTE_LPM_QSBR_MODE_xxx
+	 * '0' for default: create defer queue for reclaim.
+	 */
+	enum rte_lpm_qsbr_mode mode;
+	uint32_t dq_size;	/* RCU defer queue size.
+				 * default: lpm->number_tbl8s.
+				 */
+	uint32_t reclaim_thd;	/* Threshold to trigger auto reclaim. */
+	uint32_t reclaim_max;	/* Max entries to reclaim in one go.
+				 * default: RTE_LPM_RCU_DQ_RECLAIM_MAX.
+				 */
 };
 
 /**
@@ -179,6 +214,30 @@  rte_lpm_find_existing(const char *name);
 void
 rte_lpm_free(struct rte_lpm *lpm);
 
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Associate RCU QSBR variable with an LPM object.
+ *
+ * @param lpm
+ *   the lpm object to add RCU QSBR
+ * @param cfg
+ *   RCU QSBR configuration
+ * @param dq
+ *   handler of created RCU QSBR defer queue
+ * @return
+ *   On success - 0
+ *   On error - 1 with error code set in rte_errno.
+ *   Possible rte_errno codes are:
+ *   - EINVAL - invalid pointer
+ *   - EEXIST - already added QSBR
+ *   - ENOMEM - memory allocation failure
+ */
+__rte_experimental
+int rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_lpm_rcu_config *cfg,
+	struct rte_rcu_qsbr_dq **dq);
+
 /**
  * Add a rule to the LPM table.
  *
diff --git a/lib/librte_lpm/rte_lpm_version.map b/lib/librte_lpm/rte_lpm_version.map
index 500f58b80..bfccd7eac 100644
--- a/lib/librte_lpm/rte_lpm_version.map
+++ b/lib/librte_lpm/rte_lpm_version.map
@@ -21,3 +21,9 @@  DPDK_20.0 {
 
 	local: *;
 };
+
+EXPERIMENTAL {
+	global:
+
+	rte_lpm_rcu_qsbr_add;
+};