[v3,1/3] lib/lpm: integrate RCU QSBR
diff mbox series

Message ID 20191001182857.43867-2-honnappa.nagarahalli@arm.com
State Deferred
Delegated to: David Marchand
Headers show
Series
  • RCU integration with LPM library
Related show

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Performance-Testing fail build patch failure
ci/Intel-compilation fail Compilation issues

Commit Message

Honnappa Nagarahalli Oct. 1, 2019, 6:28 p.m. UTC
From: Ruifeng Wang <ruifeng.wang@arm.com>

Currently, the tbl8 group is freed even though the readers might be
using the tbl8 group entries. The freed tbl8 group can be reallocated
quickly. This results in incorrect lookup results.

RCU QSBR process is integrated for safe tbl8 group reclaim.
Refer to RCU documentation to understand various aspects of
integrating RCU library into other libraries.

Signed-off-by: Ruifeng Wang <ruifeng.wang@arm.com>
Reviewed-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
---
 lib/librte_lpm/Makefile            |   3 +-
 lib/librte_lpm/meson.build         |   2 +
 lib/librte_lpm/rte_lpm.c           | 102 +++++++++++++++++++++++++----
 lib/librte_lpm/rte_lpm.h           |  21 ++++++
 lib/librte_lpm/rte_lpm_version.map |   6 ++
 5 files changed, 122 insertions(+), 12 deletions(-)

Comments

Medvedkin, Vladimir Oct. 4, 2019, 4:05 p.m. UTC | #1
Hi Honnappa,

On 01/10/2019 19:28, Honnappa Nagarahalli wrote:
> From: Ruifeng Wang <ruifeng.wang@arm.com>
>
> Currently, the tbl8 group is freed even though the readers might be
> using the tbl8 group entries. The freed tbl8 group can be reallocated
> quickly. This results in incorrect lookup results.
>
> RCU QSBR process is integrated for safe tbl8 group reclaim.
> Refer to RCU documentation to understand various aspects of
> integrating RCU library into other libraries.
>
> Signed-off-by: Ruifeng Wang <ruifeng.wang@arm.com>
> Reviewed-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> ---
>   lib/librte_lpm/Makefile            |   3 +-
>   lib/librte_lpm/meson.build         |   2 +
>   lib/librte_lpm/rte_lpm.c           | 102 +++++++++++++++++++++++++----
>   lib/librte_lpm/rte_lpm.h           |  21 ++++++
>   lib/librte_lpm/rte_lpm_version.map |   6 ++
>   5 files changed, 122 insertions(+), 12 deletions(-)
>
> diff --git a/lib/librte_lpm/Makefile b/lib/librte_lpm/Makefile
> index a7946a1c5..ca9e16312 100644
> --- a/lib/librte_lpm/Makefile
> +++ b/lib/librte_lpm/Makefile
> @@ -6,9 +6,10 @@ include $(RTE_SDK)/mk/rte.vars.mk
>   # library name
>   LIB = librte_lpm.a
>   
> +CFLAGS += -DALLOW_EXPERIMENTAL_API
>   CFLAGS += -O3
>   CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
> -LDLIBS += -lrte_eal -lrte_hash
> +LDLIBS += -lrte_eal -lrte_hash -lrte_rcu
>   
>   EXPORT_MAP := rte_lpm_version.map
>   
> diff --git a/lib/librte_lpm/meson.build b/lib/librte_lpm/meson.build
> index a5176d8ae..19a35107f 100644
> --- a/lib/librte_lpm/meson.build
> +++ b/lib/librte_lpm/meson.build
> @@ -2,9 +2,11 @@
>   # Copyright(c) 2017 Intel Corporation
>   
>   version = 2
> +allow_experimental_apis = true
>   sources = files('rte_lpm.c', 'rte_lpm6.c')
>   headers = files('rte_lpm.h', 'rte_lpm6.h')
>   # since header files have different names, we can install all vector headers
>   # without worrying about which architecture we actually need
>   headers += files('rte_lpm_altivec.h', 'rte_lpm_neon.h', 'rte_lpm_sse.h')
>   deps += ['hash']
> +deps += ['rcu']
> diff --git a/lib/librte_lpm/rte_lpm.c b/lib/librte_lpm/rte_lpm.c
> index 3a929a1b1..ca58d4b35 100644
> --- a/lib/librte_lpm/rte_lpm.c
> +++ b/lib/librte_lpm/rte_lpm.c
> @@ -1,5 +1,6 @@
>   /* SPDX-License-Identifier: BSD-3-Clause
>    * Copyright(c) 2010-2014 Intel Corporation
> + * Copyright(c) 2019 Arm Limited
>    */
>   
>   #include <string.h>
> @@ -381,6 +382,8 @@ rte_lpm_free_v1604(struct rte_lpm *lpm)
>   
>   	rte_mcfg_tailq_write_unlock();
>   
> +	if (lpm->dq)
> +		rte_rcu_qsbr_dq_delete(lpm->dq);
>   	rte_free(lpm->tbl8);
>   	rte_free(lpm->rules_tbl);
>   	rte_free(lpm);
> @@ -390,6 +393,59 @@ BIND_DEFAULT_SYMBOL(rte_lpm_free, _v1604, 16.04);
>   MAP_STATIC_SYMBOL(void rte_lpm_free(struct rte_lpm *lpm),
>   		rte_lpm_free_v1604);
As a general comment, are you going to add rcu support to the legacy _v20 ?
>   
> +struct __rte_lpm_rcu_dq_entry {
> +	uint32_t tbl8_group_index;
> +	uint32_t pad;
> +};

Is this struct necessary? I mean in tbl8_free_v1604() you can pass 
tbl8_group_index as a pointer without "e.pad = 0;".

And what about 32bit environment?

> +
> +static void
> +__lpm_rcu_qsbr_free_resource(void *p, void *data)
> +{
> +	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> +	struct __rte_lpm_rcu_dq_entry *e =
> +			(struct __rte_lpm_rcu_dq_entry *)data;
> +	struct rte_lpm_tbl_entry *tbl8 = (struct rte_lpm_tbl_entry *)p;
> +
> +	/* Set tbl8 group invalid */
> +	__atomic_store(&tbl8[e->tbl8_group_index], &zero_tbl8_entry,
> +		__ATOMIC_RELAXED);
> +}
> +
> +/* Associate QSBR variable with an LPM object.
> + */
> +int
> +rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr *v)
> +{
> +	char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
> +	struct rte_rcu_qsbr_dq_parameters params;
> +
> +	if ((lpm == NULL) || (v == NULL)) {
> +		rte_errno = EINVAL;
> +		return 1;
> +	}
> +
> +	if (lpm->dq) {
> +		rte_errno = EEXIST;
> +		return 1;
> +	}
> +
> +	/* Init QSBR defer queue. */
> +	snprintf(rcu_dq_name, sizeof(rcu_dq_name), "LPM_RCU_%s", lpm->name);

Consider moving this logic into rte_rcu_qsbr_dq_create(). I think there 
you could prefix the name with just RCU_ . So it would be possible to 
move include <rte_ring.h> into the rte_rcu_qsbr.c from rte_rcu_qsbr.h 
and get rid of RTE_RCU_QSBR_DQ_NAMESIZE macro in rte_rcu_qsbr.h file.

> +	params.name = rcu_dq_name;
> +	params.size = lpm->number_tbl8s;
> +	params.esize = sizeof(struct __rte_lpm_rcu_dq_entry);
> +	params.f = __lpm_rcu_qsbr_free_resource;
> +	params.p = lpm->tbl8;
> +	params.v = v;
> +	lpm->dq = rte_rcu_qsbr_dq_create(&params);
> +	if (lpm->dq == NULL) {
> +		RTE_LOG(ERR, LPM, "LPM QS defer queue creation failed\n");
> +		return 1;
> +	}
> +
> +	return 0;
> +}
> +
>   /*
>    * Adds a rule to the rule table.
>    *
> @@ -679,14 +735,15 @@ tbl8_alloc_v20(struct rte_lpm_tbl_entry_v20 *tbl8)
>   }
>   
>   static int32_t
> -tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
> +__tbl8_alloc_v1604(struct rte_lpm *lpm)
>   {
>   	uint32_t group_idx; /* tbl8 group index. */
>   	struct rte_lpm_tbl_entry *tbl8_entry;
>   
>   	/* Scan through tbl8 to find a free (i.e. INVALID) tbl8 group. */
> -	for (group_idx = 0; group_idx < number_tbl8s; group_idx++) {
> -		tbl8_entry = &tbl8[group_idx * RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
> +	for (group_idx = 0; group_idx < lpm->number_tbl8s; group_idx++) {
> +		tbl8_entry = &lpm->tbl8[group_idx *
> +					RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
>   		/* If a free tbl8 group is found clean it and set as VALID. */
>   		if (!tbl8_entry->valid_group) {
>   			struct rte_lpm_tbl_entry new_tbl8_entry = {
> @@ -712,6 +769,21 @@ tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
>   	return -ENOSPC;
>   }
>   
> +static int32_t
> +tbl8_alloc_v1604(struct rte_lpm *lpm)
> +{
> +	int32_t group_idx; /* tbl8 group index. */
> +
> +	group_idx = __tbl8_alloc_v1604(lpm);
> +	if ((group_idx < 0) && (lpm->dq != NULL)) {
> +		/* If there are no tbl8 groups try to reclaim some. */
> +		if (rte_rcu_qsbr_dq_reclaim(lpm->dq) == 0)
> +			group_idx = __tbl8_alloc_v1604(lpm);
> +	}
> +
> +	return group_idx;
> +}
> +
>   static void
>   tbl8_free_v20(struct rte_lpm_tbl_entry_v20 *tbl8, uint32_t tbl8_group_start)
>   {
> @@ -728,13 +800,21 @@ tbl8_free_v20(struct rte_lpm_tbl_entry_v20 *tbl8, uint32_t tbl8_group_start)
>   }
>   
>   static void
> -tbl8_free_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t tbl8_group_start)
> +tbl8_free_v1604(struct rte_lpm *lpm, uint32_t tbl8_group_start)
>   {
> -	/* Set tbl8 group invalid*/
>   	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> +	struct __rte_lpm_rcu_dq_entry e;
>   
> -	__atomic_store(&tbl8[tbl8_group_start], &zero_tbl8_entry,
> -			__ATOMIC_RELAXED);
> +	if (lpm->dq != NULL) {
> +		e.tbl8_group_index = tbl8_group_start;
> +		e.pad = 0;
> +		/* Push into QSBR defer queue. */
> +		rte_rcu_qsbr_dq_enqueue(lpm->dq, (void *)&e);
> +	} else {
> +		/* Set tbl8 group invalid*/
> +		__atomic_store(&lpm->tbl8[tbl8_group_start], &zero_tbl8_entry,
> +				__ATOMIC_RELAXED);
> +	}
>   }
>   
>   static __rte_noinline int32_t
> @@ -1037,7 +1117,7 @@ add_depth_big_v1604(struct rte_lpm *lpm, uint32_t ip_masked, uint8_t depth,
>   
>   	if (!lpm->tbl24[tbl24_index].valid) {
>   		/* Search for a free tbl8 group. */
> -		tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, lpm->number_tbl8s);
> +		tbl8_group_index = tbl8_alloc_v1604(lpm);
>   
>   		/* Check tbl8 allocation was successful. */
>   		if (tbl8_group_index < 0) {
> @@ -1083,7 +1163,7 @@ add_depth_big_v1604(struct rte_lpm *lpm, uint32_t ip_masked, uint8_t depth,
>   	} /* If valid entry but not extended calculate the index into Table8. */
>   	else if (lpm->tbl24[tbl24_index].valid_group == 0) {
>   		/* Search for free tbl8 group. */
> -		tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, lpm->number_tbl8s);
> +		tbl8_group_index = tbl8_alloc_v1604(lpm);
>   
>   		if (tbl8_group_index < 0) {
>   			return tbl8_group_index;
> @@ -1818,7 +1898,7 @@ delete_depth_big_v1604(struct rte_lpm *lpm, uint32_t ip_masked,
>   		 */
>   		lpm->tbl24[tbl24_index].valid = 0;
>   		__atomic_thread_fence(__ATOMIC_RELEASE);
> -		tbl8_free_v1604(lpm->tbl8, tbl8_group_start);
> +		tbl8_free_v1604(lpm, tbl8_group_start);
>   	} else if (tbl8_recycle_index > -1) {
>   		/* Update tbl24 entry. */
>   		struct rte_lpm_tbl_entry new_tbl24_entry = {
> @@ -1834,7 +1914,7 @@ delete_depth_big_v1604(struct rte_lpm *lpm, uint32_t ip_masked,
>   		__atomic_store(&lpm->tbl24[tbl24_index], &new_tbl24_entry,
>   				__ATOMIC_RELAXED);
>   		__atomic_thread_fence(__ATOMIC_RELEASE);
> -		tbl8_free_v1604(lpm->tbl8, tbl8_group_start);
> +		tbl8_free_v1604(lpm, tbl8_group_start);
>   	}
>   #undef group_idx
>   	return 0;
> diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h
> index 906ec4483..49c12a68d 100644
> --- a/lib/librte_lpm/rte_lpm.h
> +++ b/lib/librte_lpm/rte_lpm.h
> @@ -1,5 +1,6 @@
>   /* SPDX-License-Identifier: BSD-3-Clause
>    * Copyright(c) 2010-2014 Intel Corporation
> + * Copyright(c) 2019 Arm Limited
>    */
>   
>   #ifndef _RTE_LPM_H_
> @@ -21,6 +22,7 @@
>   #include <rte_common.h>
>   #include <rte_vect.h>
>   #include <rte_compat.h>
> +#include <rte_rcu_qsbr.h>
>   
>   #ifdef __cplusplus
>   extern "C" {
> @@ -186,6 +188,7 @@ struct rte_lpm {
>   			__rte_cache_aligned; /**< LPM tbl24 table. */
>   	struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */
>   	struct rte_lpm_rule *rules_tbl; /**< LPM rules. */
> +	struct rte_rcu_qsbr_dq *dq;	/**< RCU QSBR defer queue.*/
>   };
>   
>   /**
> @@ -248,6 +251,24 @@ rte_lpm_free_v20(struct rte_lpm_v20 *lpm);
>   void
>   rte_lpm_free_v1604(struct rte_lpm *lpm);
>   
> +/**
> + * Associate RCU QSBR variable with an LPM object.
> + *
> + * @param lpm
> + *   the lpm object to add RCU QSBR
> + * @param v
> + *   RCU QSBR variable
> + * @return
> + *   On success - 0
> + *   On error - 1 with error code set in rte_errno.
> + *   Possible rte_errno codes are:
> + *   - EINVAL - invalid pointer
> + *   - EEXIST - already added QSBR
> + *   - ENOMEM - memory allocation failure
> + */
> +__rte_experimental
> +int rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr *v);
> +
>   /**
>    * Add a rule to the LPM table.
>    *
> diff --git a/lib/librte_lpm/rte_lpm_version.map b/lib/librte_lpm/rte_lpm_version.map
> index 90beac853..b353aabd2 100644
> --- a/lib/librte_lpm/rte_lpm_version.map
> +++ b/lib/librte_lpm/rte_lpm_version.map
> @@ -44,3 +44,9 @@ DPDK_17.05 {
>   	rte_lpm6_lookup_bulk_func;
>   
>   } DPDK_16.04;
> +
> +EXPERIMENTAL {
> +	global:
> +
> +	rte_lpm_rcu_qsbr_add;
> +};
Ananyev, Konstantin Oct. 7, 2019, 9:21 a.m. UTC | #2
Hi guys,

> 
> From: Ruifeng Wang <ruifeng.wang@arm.com>
> 
> Currently, the tbl8 group is freed even though the readers might be
> using the tbl8 group entries. The freed tbl8 group can be reallocated
> quickly. This results in incorrect lookup results.
> 
> RCU QSBR process is integrated for safe tbl8 group reclaim.
> Refer to RCU documentation to understand various aspects of
> integrating RCU library into other libraries.
> 
> Signed-off-by: Ruifeng Wang <ruifeng.wang@arm.com>
> Reviewed-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> ---
>  lib/librte_lpm/Makefile            |   3 +-
>  lib/librte_lpm/meson.build         |   2 +
>  lib/librte_lpm/rte_lpm.c           | 102 +++++++++++++++++++++++++----
>  lib/librte_lpm/rte_lpm.h           |  21 ++++++
>  lib/librte_lpm/rte_lpm_version.map |   6 ++
>  5 files changed, 122 insertions(+), 12 deletions(-)
> 
> diff --git a/lib/librte_lpm/Makefile b/lib/librte_lpm/Makefile
> index a7946a1c5..ca9e16312 100644
> --- a/lib/librte_lpm/Makefile
> +++ b/lib/librte_lpm/Makefile
> @@ -6,9 +6,10 @@ include $(RTE_SDK)/mk/rte.vars.mk
>  # library name
>  LIB = librte_lpm.a
> 
> +CFLAGS += -DALLOW_EXPERIMENTAL_API
>  CFLAGS += -O3
>  CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
> -LDLIBS += -lrte_eal -lrte_hash
> +LDLIBS += -lrte_eal -lrte_hash -lrte_rcu
> 
>  EXPORT_MAP := rte_lpm_version.map
> 
> diff --git a/lib/librte_lpm/meson.build b/lib/librte_lpm/meson.build
> index a5176d8ae..19a35107f 100644
> --- a/lib/librte_lpm/meson.build
> +++ b/lib/librte_lpm/meson.build
> @@ -2,9 +2,11 @@
>  # Copyright(c) 2017 Intel Corporation
> 
>  version = 2
> +allow_experimental_apis = true
>  sources = files('rte_lpm.c', 'rte_lpm6.c')
>  headers = files('rte_lpm.h', 'rte_lpm6.h')
>  # since header files have different names, we can install all vector headers
>  # without worrying about which architecture we actually need
>  headers += files('rte_lpm_altivec.h', 'rte_lpm_neon.h', 'rte_lpm_sse.h')
>  deps += ['hash']
> +deps += ['rcu']
> diff --git a/lib/librte_lpm/rte_lpm.c b/lib/librte_lpm/rte_lpm.c
> index 3a929a1b1..ca58d4b35 100644
> --- a/lib/librte_lpm/rte_lpm.c
> +++ b/lib/librte_lpm/rte_lpm.c
> @@ -1,5 +1,6 @@
>  /* SPDX-License-Identifier: BSD-3-Clause
>   * Copyright(c) 2010-2014 Intel Corporation
> + * Copyright(c) 2019 Arm Limited
>   */
> 
>  #include <string.h>
> @@ -381,6 +382,8 @@ rte_lpm_free_v1604(struct rte_lpm *lpm)
> 
>  	rte_mcfg_tailq_write_unlock();
> 
> +	if (lpm->dq)
> +		rte_rcu_qsbr_dq_delete(lpm->dq);
>  	rte_free(lpm->tbl8);
>  	rte_free(lpm->rules_tbl);
>  	rte_free(lpm);
> @@ -390,6 +393,59 @@ BIND_DEFAULT_SYMBOL(rte_lpm_free, _v1604, 16.04);
>  MAP_STATIC_SYMBOL(void rte_lpm_free(struct rte_lpm *lpm),
>  		rte_lpm_free_v1604);
> 
> +struct __rte_lpm_rcu_dq_entry {
> +	uint32_t tbl8_group_index;
> +	uint32_t pad;
> +};
> +
> +static void
> +__lpm_rcu_qsbr_free_resource(void *p, void *data)
> +{
> +	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> +	struct __rte_lpm_rcu_dq_entry *e =
> +			(struct __rte_lpm_rcu_dq_entry *)data;
> +	struct rte_lpm_tbl_entry *tbl8 = (struct rte_lpm_tbl_entry *)p;
> +
> +	/* Set tbl8 group invalid */
> +	__atomic_store(&tbl8[e->tbl8_group_index], &zero_tbl8_entry,
> +		__ATOMIC_RELAXED);
> +}
> +
> +/* Associate QSBR variable with an LPM object.
> + */
> +int
> +rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr *v)
> +{
> +	char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
> +	struct rte_rcu_qsbr_dq_parameters params;
> +
> +	if ((lpm == NULL) || (v == NULL)) {
> +		rte_errno = EINVAL;
> +		return 1;
> +	}
> +
> +	if (lpm->dq) {
> +		rte_errno = EEXIST;
> +		return 1;
> +	}
> +
> +	/* Init QSBR defer queue. */
> +	snprintf(rcu_dq_name, sizeof(rcu_dq_name), "LPM_RCU_%s", lpm->name);
> +	params.name = rcu_dq_name;
> +	params.size = lpm->number_tbl8s;
> +	params.esize = sizeof(struct __rte_lpm_rcu_dq_entry);
> +	params.f = __lpm_rcu_qsbr_free_resource;
> +	params.p = lpm->tbl8;
> +	params.v = v;
> +	lpm->dq = rte_rcu_qsbr_dq_create(&params);
> +	if (lpm->dq == NULL) {
> +		RTE_LOG(ERR, LPM, "LPM QS defer queue creation failed\n");
> +		return 1;
> +	}

Few thoughts about that function:
It names rcu_qsbr_add() but in fact it allocates defer queue for give rcu var.
So first thought - is it always necessary?
For some use-cases I suppose user might be ok to wait for quiescent state change
inside tbl8_free()?     
Another thing you do allocate defer queue, but it is internal, so user can't call
reclaim() manually, which looks strange.
Why not to return defer_queue pointer to the user, so he can call reclaim() himself
at appropriate time?
Third thing - you always allocate defer queue with size equal to number of tbl8.
Though I understand it could be up to 16M tbl8 groups inside the LPM.
Do we really need defer queue that long?
Especially  considering that current rcu_defer_queue will start reclamation when 1/8
of defer_quueue becomes full and wouldn't reclaim more then 1/16 of it.
Probably better to let user to decide himself how long defer_queue he needs for that LPM?

Konstantin


> +
> +	return 0;
> +}
> +
>  /*
>   * Adds a rule to the rule table.
>   *
> @@ -679,14 +735,15 @@ tbl8_alloc_v20(struct rte_lpm_tbl_entry_v20 *tbl8)
>  }
> 
>  static int32_t
> -tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
> +__tbl8_alloc_v1604(struct rte_lpm *lpm)
>  {
>  	uint32_t group_idx; /* tbl8 group index. */
>  	struct rte_lpm_tbl_entry *tbl8_entry;
> 
>  	/* Scan through tbl8 to find a free (i.e. INVALID) tbl8 group. */
> -	for (group_idx = 0; group_idx < number_tbl8s; group_idx++) {
> -		tbl8_entry = &tbl8[group_idx * RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
> +	for (group_idx = 0; group_idx < lpm->number_tbl8s; group_idx++) {
> +		tbl8_entry = &lpm->tbl8[group_idx *
> +					RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
>  		/* If a free tbl8 group is found clean it and set as VALID. */
>  		if (!tbl8_entry->valid_group) {
>  			struct rte_lpm_tbl_entry new_tbl8_entry = {
> @@ -712,6 +769,21 @@ tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
>  	return -ENOSPC;
>  }
> 
> +static int32_t
> +tbl8_alloc_v1604(struct rte_lpm *lpm)
> +{
> +	int32_t group_idx; /* tbl8 group index. */
> +
> +	group_idx = __tbl8_alloc_v1604(lpm);
> +	if ((group_idx < 0) && (lpm->dq != NULL)) {
> +		/* If there are no tbl8 groups try to reclaim some. */
> +		if (rte_rcu_qsbr_dq_reclaim(lpm->dq) == 0)
> +			group_idx = __tbl8_alloc_v1604(lpm);
> +	}
> +
> +	return group_idx;
> +}
> +
>  static void
>  tbl8_free_v20(struct rte_lpm_tbl_entry_v20 *tbl8, uint32_t tbl8_group_start)
>  {
> @@ -728,13 +800,21 @@ tbl8_free_v20(struct rte_lpm_tbl_entry_v20 *tbl8, uint32_t tbl8_group_start)
>  }
> 
>  static void
> -tbl8_free_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t tbl8_group_start)
> +tbl8_free_v1604(struct rte_lpm *lpm, uint32_t tbl8_group_start)
>  {
> -	/* Set tbl8 group invalid*/
>  	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> +	struct __rte_lpm_rcu_dq_entry e;
> 
> -	__atomic_store(&tbl8[tbl8_group_start], &zero_tbl8_entry,
> -			__ATOMIC_RELAXED);
> +	if (lpm->dq != NULL) {
> +		e.tbl8_group_index = tbl8_group_start;
> +		e.pad = 0;
> +		/* Push into QSBR defer queue. */
> +		rte_rcu_qsbr_dq_enqueue(lpm->dq, (void *)&e);
> +	} else {
> +		/* Set tbl8 group invalid*/
> +		__atomic_store(&lpm->tbl8[tbl8_group_start], &zero_tbl8_entry,
> +				__ATOMIC_RELAXED);
> +	}
>  }
> 
>  static __rte_noinline int32_t
> @@ -1037,7 +1117,7 @@ add_depth_big_v1604(struct rte_lpm *lpm, uint32_t ip_masked, uint8_t depth,
> 
>  	if (!lpm->tbl24[tbl24_index].valid) {
>  		/* Search for a free tbl8 group. */
> -		tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, lpm->number_tbl8s);
> +		tbl8_group_index = tbl8_alloc_v1604(lpm);
> 
>  		/* Check tbl8 allocation was successful. */
>  		if (tbl8_group_index < 0) {
> @@ -1083,7 +1163,7 @@ add_depth_big_v1604(struct rte_lpm *lpm, uint32_t ip_masked, uint8_t depth,
>  	} /* If valid entry but not extended calculate the index into Table8. */
>  	else if (lpm->tbl24[tbl24_index].valid_group == 0) {
>  		/* Search for free tbl8 group. */
> -		tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, lpm->number_tbl8s);
> +		tbl8_group_index = tbl8_alloc_v1604(lpm);
> 
>  		if (tbl8_group_index < 0) {
>  			return tbl8_group_index;
> @@ -1818,7 +1898,7 @@ delete_depth_big_v1604(struct rte_lpm *lpm, uint32_t ip_masked,
>  		 */
>  		lpm->tbl24[tbl24_index].valid = 0;
>  		__atomic_thread_fence(__ATOMIC_RELEASE);
> -		tbl8_free_v1604(lpm->tbl8, tbl8_group_start);
> +		tbl8_free_v1604(lpm, tbl8_group_start);
>  	} else if (tbl8_recycle_index > -1) {
>  		/* Update tbl24 entry. */
>  		struct rte_lpm_tbl_entry new_tbl24_entry = {
> @@ -1834,7 +1914,7 @@ delete_depth_big_v1604(struct rte_lpm *lpm, uint32_t ip_masked,
>  		__atomic_store(&lpm->tbl24[tbl24_index], &new_tbl24_entry,
>  				__ATOMIC_RELAXED);
>  		__atomic_thread_fence(__ATOMIC_RELEASE);
> -		tbl8_free_v1604(lpm->tbl8, tbl8_group_start);
> +		tbl8_free_v1604(lpm, tbl8_group_start);
>  	}
>  #undef group_idx
>  	return 0;
> diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h
> index 906ec4483..49c12a68d 100644
> --- a/lib/librte_lpm/rte_lpm.h
> +++ b/lib/librte_lpm/rte_lpm.h
> @@ -1,5 +1,6 @@
>  /* SPDX-License-Identifier: BSD-3-Clause
>   * Copyright(c) 2010-2014 Intel Corporation
> + * Copyright(c) 2019 Arm Limited
>   */
> 
>  #ifndef _RTE_LPM_H_
> @@ -21,6 +22,7 @@
>  #include <rte_common.h>
>  #include <rte_vect.h>
>  #include <rte_compat.h>
> +#include <rte_rcu_qsbr.h>
> 
>  #ifdef __cplusplus
>  extern "C" {
> @@ -186,6 +188,7 @@ struct rte_lpm {
>  			__rte_cache_aligned; /**< LPM tbl24 table. */
>  	struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */
>  	struct rte_lpm_rule *rules_tbl; /**< LPM rules. */
> +	struct rte_rcu_qsbr_dq *dq;	/**< RCU QSBR defer queue.*/
>  };
> 
>  /**
> @@ -248,6 +251,24 @@ rte_lpm_free_v20(struct rte_lpm_v20 *lpm);
>  void
>  rte_lpm_free_v1604(struct rte_lpm *lpm);
> 
> +/**
> + * Associate RCU QSBR variable with an LPM object.
> + *
> + * @param lpm
> + *   the lpm object to add RCU QSBR
> + * @param v
> + *   RCU QSBR variable
> + * @return
> + *   On success - 0
> + *   On error - 1 with error code set in rte_errno.
> + *   Possible rte_errno codes are:
> + *   - EINVAL - invalid pointer
> + *   - EEXIST - already added QSBR
> + *   - ENOMEM - memory allocation failure
> + */
> +__rte_experimental
> +int rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr *v);
> +
>  /**
>   * Add a rule to the LPM table.
>   *
> diff --git a/lib/librte_lpm/rte_lpm_version.map b/lib/librte_lpm/rte_lpm_version.map
> index 90beac853..b353aabd2 100644
> --- a/lib/librte_lpm/rte_lpm_version.map
> +++ b/lib/librte_lpm/rte_lpm_version.map
> @@ -44,3 +44,9 @@ DPDK_17.05 {
>  	rte_lpm6_lookup_bulk_func;
> 
>  } DPDK_16.04;
> +
> +EXPERIMENTAL {
> +	global:
> +
> +	rte_lpm_rcu_qsbr_add;
> +};
> --
> 2.17.1
Honnappa Nagarahalli Oct. 9, 2019, 3:48 a.m. UTC | #3
<snip>

> 
> Hi Honnappa,
> 
> On 01/10/2019 19:28, Honnappa Nagarahalli wrote:
> > From: Ruifeng Wang <ruifeng.wang@arm.com>
> >
> > Currently, the tbl8 group is freed even though the readers might be
> > using the tbl8 group entries. The freed tbl8 group can be reallocated
> > quickly. This results in incorrect lookup results.
> >
> > RCU QSBR process is integrated for safe tbl8 group reclaim.
> > Refer to RCU documentation to understand various aspects of
> > integrating RCU library into other libraries.
> >
> > Signed-off-by: Ruifeng Wang <ruifeng.wang@arm.com>
> > Reviewed-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> > ---
> >   lib/librte_lpm/Makefile            |   3 +-
> >   lib/librte_lpm/meson.build         |   2 +
> >   lib/librte_lpm/rte_lpm.c           | 102 +++++++++++++++++++++++++----
> >   lib/librte_lpm/rte_lpm.h           |  21 ++++++
> >   lib/librte_lpm/rte_lpm_version.map |   6 ++
> >   5 files changed, 122 insertions(+), 12 deletions(-)
> >
> > diff --git a/lib/librte_lpm/Makefile b/lib/librte_lpm/Makefile index
> > a7946a1c5..ca9e16312 100644
> > --- a/lib/librte_lpm/Makefile
> > +++ b/lib/librte_lpm/Makefile
> > @@ -6,9 +6,10 @@ include $(RTE_SDK)/mk/rte.vars.mk
> >   # library name
> >   LIB = librte_lpm.a
> >
> > +CFLAGS += -DALLOW_EXPERIMENTAL_API
> >   CFLAGS += -O3
> >   CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -LDLIBS += -lrte_eal
> > -lrte_hash
> > +LDLIBS += -lrte_eal -lrte_hash -lrte_rcu
> >
> >   EXPORT_MAP := rte_lpm_version.map
> >
> > diff --git a/lib/librte_lpm/meson.build b/lib/librte_lpm/meson.build
> > index a5176d8ae..19a35107f 100644
> > --- a/lib/librte_lpm/meson.build
> > +++ b/lib/librte_lpm/meson.build
> > @@ -2,9 +2,11 @@
> >   # Copyright(c) 2017 Intel Corporation
> >
> >   version = 2
> > +allow_experimental_apis = true
> >   sources = files('rte_lpm.c', 'rte_lpm6.c')
> >   headers = files('rte_lpm.h', 'rte_lpm6.h')
> >   # since header files have different names, we can install all vector headers
> >   # without worrying about which architecture we actually need
> >   headers += files('rte_lpm_altivec.h', 'rte_lpm_neon.h', 'rte_lpm_sse.h')
> >   deps += ['hash']
> > +deps += ['rcu']
> > diff --git a/lib/librte_lpm/rte_lpm.c b/lib/librte_lpm/rte_lpm.c index
> > 3a929a1b1..ca58d4b35 100644
> > --- a/lib/librte_lpm/rte_lpm.c
> > +++ b/lib/librte_lpm/rte_lpm.c
> > @@ -1,5 +1,6 @@
> >   /* SPDX-License-Identifier: BSD-3-Clause
> >    * Copyright(c) 2010-2014 Intel Corporation
> > + * Copyright(c) 2019 Arm Limited
> >    */
> >
> >   #include <string.h>
> > @@ -381,6 +382,8 @@ rte_lpm_free_v1604(struct rte_lpm *lpm)
> >
> >   	rte_mcfg_tailq_write_unlock();
> >
> > +	if (lpm->dq)
> > +		rte_rcu_qsbr_dq_delete(lpm->dq);
> >   	rte_free(lpm->tbl8);
> >   	rte_free(lpm->rules_tbl);
> >   	rte_free(lpm);
> > @@ -390,6 +393,59 @@ BIND_DEFAULT_SYMBOL(rte_lpm_free, _v1604,
> 16.04);
> >   MAP_STATIC_SYMBOL(void rte_lpm_free(struct rte_lpm *lpm),
> >   		rte_lpm_free_v1604);
> As a general comment, are you going to add rcu support to the legacy _v20 ?
I do not see a requirement from my side. What's your suggestion?

> >
> > +struct __rte_lpm_rcu_dq_entry {
> > +	uint32_t tbl8_group_index;
> > +	uint32_t pad;
> > +};
> 
> Is this struct necessary? I mean in tbl8_free_v1604() you can pass
> tbl8_group_index as a pointer without "e.pad = 0;".
Agree, that is another way. This structure will go away once the ring library supports storing 32b elements.

> 
> And what about 32bit environment?
Waiting for rte_ring to support 32b elements (the patch is being discussed).

> 
> > +
> > +static void
> > +__lpm_rcu_qsbr_free_resource(void *p, void *data) {
> > +	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> > +	struct __rte_lpm_rcu_dq_entry *e =
> > +			(struct __rte_lpm_rcu_dq_entry *)data;
> > +	struct rte_lpm_tbl_entry *tbl8 = (struct rte_lpm_tbl_entry *)p;
> > +
> > +	/* Set tbl8 group invalid */
> > +	__atomic_store(&tbl8[e->tbl8_group_index], &zero_tbl8_entry,
> > +		__ATOMIC_RELAXED);
> > +}
> > +
> > +/* Associate QSBR variable with an LPM object.
> > + */
> > +int
> > +rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr *v) {
> > +	char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
> > +	struct rte_rcu_qsbr_dq_parameters params;
> > +
> > +	if ((lpm == NULL) || (v == NULL)) {
> > +		rte_errno = EINVAL;
> > +		return 1;
> > +	}
> > +
> > +	if (lpm->dq) {
> > +		rte_errno = EEXIST;
> > +		return 1;
> > +	}
> > +
> > +	/* Init QSBR defer queue. */
> > +	snprintf(rcu_dq_name, sizeof(rcu_dq_name), "LPM_RCU_%s", lpm-
> >name);
> 
> Consider moving this logic into rte_rcu_qsbr_dq_create(). I think there you
> could prefix the name with just RCU_ . So it would be possible to move
> include <rte_ring.h> into the rte_rcu_qsbr.c from rte_rcu_qsbr.h and get rid
> of RTE_RCU_QSBR_DQ_NAMESIZE macro in rte_rcu_qsbr.h file.
Macro is required to provide a length for the name, similar to what rte_ring does. What would be the length of the 'name' if RTE_RCU_QSBR_DQ_NAMESIZE is removed?
If the 'RCU_' has to be prefixed in 'rte_rcu_qsbr_dq_create', then RTE_RCU_QSBR_DQ_NAMESIZE needs to be readjusted in the header file. I am trying to keep it simple by constructing the string in a single function.

> 
> > +	params.name = rcu_dq_name;
> > +	params.size = lpm->number_tbl8s;
> > +	params.esize = sizeof(struct __rte_lpm_rcu_dq_entry);
> > +	params.f = __lpm_rcu_qsbr_free_resource;
> > +	params.p = lpm->tbl8;
> > +	params.v = v;
> > +	lpm->dq = rte_rcu_qsbr_dq_create(&params);
> > +	if (lpm->dq == NULL) {
> > +		RTE_LOG(ERR, LPM, "LPM QS defer queue creation failed\n");
> > +		return 1;
> > +	}
> > +
> > +	return 0;
> > +}
> > +
> >   /*
> >    * Adds a rule to the rule table.
> >    *
> > @@ -679,14 +735,15 @@ tbl8_alloc_v20(struct rte_lpm_tbl_entry_v20
> *tbl8)
> >   }
> >
> >   static int32_t
> > -tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t
> > number_tbl8s)
> > +__tbl8_alloc_v1604(struct rte_lpm *lpm)
> >   {
> >   	uint32_t group_idx; /* tbl8 group index. */
> >   	struct rte_lpm_tbl_entry *tbl8_entry;
> >
> >   	/* Scan through tbl8 to find a free (i.e. INVALID) tbl8 group. */
> > -	for (group_idx = 0; group_idx < number_tbl8s; group_idx++) {
> > -		tbl8_entry = &tbl8[group_idx *
> RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
> > +	for (group_idx = 0; group_idx < lpm->number_tbl8s; group_idx++) {
> > +		tbl8_entry = &lpm->tbl8[group_idx *
> > +
> 	RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
> >   		/* If a free tbl8 group is found clean it and set as VALID. */
> >   		if (!tbl8_entry->valid_group) {
> >   			struct rte_lpm_tbl_entry new_tbl8_entry = { @@ -
> 712,6 +769,21 @@
> > tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
> >   	return -ENOSPC;
> >   }
> >
> > +static int32_t
> > +tbl8_alloc_v1604(struct rte_lpm *lpm) {
> > +	int32_t group_idx; /* tbl8 group index. */
> > +
> > +	group_idx = __tbl8_alloc_v1604(lpm);
> > +	if ((group_idx < 0) && (lpm->dq != NULL)) {
> > +		/* If there are no tbl8 groups try to reclaim some. */
> > +		if (rte_rcu_qsbr_dq_reclaim(lpm->dq) == 0)
> > +			group_idx = __tbl8_alloc_v1604(lpm);
> > +	}
> > +
> > +	return group_idx;
> > +}
> > +
> >   static void
> >   tbl8_free_v20(struct rte_lpm_tbl_entry_v20 *tbl8, uint32_t
> tbl8_group_start)
> >   {
> > @@ -728,13 +800,21 @@ tbl8_free_v20(struct rte_lpm_tbl_entry_v20
> *tbl8, uint32_t tbl8_group_start)
> >   }
> >
> >   static void
> > -tbl8_free_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t
> > tbl8_group_start)
> > +tbl8_free_v1604(struct rte_lpm *lpm, uint32_t tbl8_group_start)
> >   {
> > -	/* Set tbl8 group invalid*/
> >   	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> > +	struct __rte_lpm_rcu_dq_entry e;
> >
> > -	__atomic_store(&tbl8[tbl8_group_start], &zero_tbl8_entry,
> > -			__ATOMIC_RELAXED);
> > +	if (lpm->dq != NULL) {
> > +		e.tbl8_group_index = tbl8_group_start;
> > +		e.pad = 0;
> > +		/* Push into QSBR defer queue. */
> > +		rte_rcu_qsbr_dq_enqueue(lpm->dq, (void *)&e);
> > +	} else {
> > +		/* Set tbl8 group invalid*/
> > +		__atomic_store(&lpm->tbl8[tbl8_group_start],
> &zero_tbl8_entry,
> > +				__ATOMIC_RELAXED);
> > +	}
> >   }
> >
> >   static __rte_noinline int32_t
> > @@ -1037,7 +1117,7 @@ add_depth_big_v1604(struct rte_lpm *lpm,
> > uint32_t ip_masked, uint8_t depth,
> >
> >   	if (!lpm->tbl24[tbl24_index].valid) {
> >   		/* Search for a free tbl8 group. */
> > -		tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, lpm-
> >number_tbl8s);
> > +		tbl8_group_index = tbl8_alloc_v1604(lpm);
> >
> >   		/* Check tbl8 allocation was successful. */
> >   		if (tbl8_group_index < 0) {
> > @@ -1083,7 +1163,7 @@ add_depth_big_v1604(struct rte_lpm *lpm,
> uint32_t ip_masked, uint8_t depth,
> >   	} /* If valid entry but not extended calculate the index into Table8. */
> >   	else if (lpm->tbl24[tbl24_index].valid_group == 0) {
> >   		/* Search for free tbl8 group. */
> > -		tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, lpm-
> >number_tbl8s);
> > +		tbl8_group_index = tbl8_alloc_v1604(lpm);
> >
> >   		if (tbl8_group_index < 0) {
> >   			return tbl8_group_index;
> > @@ -1818,7 +1898,7 @@ delete_depth_big_v1604(struct rte_lpm *lpm,
> uint32_t ip_masked,
> >   		 */
> >   		lpm->tbl24[tbl24_index].valid = 0;
> >   		__atomic_thread_fence(__ATOMIC_RELEASE);
> > -		tbl8_free_v1604(lpm->tbl8, tbl8_group_start);
> > +		tbl8_free_v1604(lpm, tbl8_group_start);
> >   	} else if (tbl8_recycle_index > -1) {
> >   		/* Update tbl24 entry. */
> >   		struct rte_lpm_tbl_entry new_tbl24_entry = { @@ -1834,7
> +1914,7 @@
> > delete_depth_big_v1604(struct rte_lpm *lpm, uint32_t ip_masked,
> >   		__atomic_store(&lpm->tbl24[tbl24_index],
> &new_tbl24_entry,
> >   				__ATOMIC_RELAXED);
> >   		__atomic_thread_fence(__ATOMIC_RELEASE);
> > -		tbl8_free_v1604(lpm->tbl8, tbl8_group_start);
> > +		tbl8_free_v1604(lpm, tbl8_group_start);
> >   	}
> >   #undef group_idx
> >   	return 0;
> > diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h index
> > 906ec4483..49c12a68d 100644
> > --- a/lib/librte_lpm/rte_lpm.h
> > +++ b/lib/librte_lpm/rte_lpm.h
> > @@ -1,5 +1,6 @@
> >   /* SPDX-License-Identifier: BSD-3-Clause
> >    * Copyright(c) 2010-2014 Intel Corporation
> > + * Copyright(c) 2019 Arm Limited
> >    */
> >
> >   #ifndef _RTE_LPM_H_
> > @@ -21,6 +22,7 @@
> >   #include <rte_common.h>
> >   #include <rte_vect.h>
> >   #include <rte_compat.h>
> > +#include <rte_rcu_qsbr.h>
> >
> >   #ifdef __cplusplus
> >   extern "C" {
> > @@ -186,6 +188,7 @@ struct rte_lpm {
> >   			__rte_cache_aligned; /**< LPM tbl24 table. */
> >   	struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */
> >   	struct rte_lpm_rule *rules_tbl; /**< LPM rules. */
> > +	struct rte_rcu_qsbr_dq *dq;	/**< RCU QSBR defer queue.*/
> >   };
> >
> >   /**
> > @@ -248,6 +251,24 @@ rte_lpm_free_v20(struct rte_lpm_v20 *lpm);
> >   void
> >   rte_lpm_free_v1604(struct rte_lpm *lpm);
> >
> > +/**
> > + * Associate RCU QSBR variable with an LPM object.
> > + *
> > + * @param lpm
> > + *   the lpm object to add RCU QSBR
> > + * @param v
> > + *   RCU QSBR variable
> > + * @return
> > + *   On success - 0
> > + *   On error - 1 with error code set in rte_errno.
> > + *   Possible rte_errno codes are:
> > + *   - EINVAL - invalid pointer
> > + *   - EEXIST - already added QSBR
> > + *   - ENOMEM - memory allocation failure
> > + */
> > +__rte_experimental
> > +int rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr
> > +*v);
> > +
> >   /**
> >    * Add a rule to the LPM table.
> >    *
> > diff --git a/lib/librte_lpm/rte_lpm_version.map
> > b/lib/librte_lpm/rte_lpm_version.map
> > index 90beac853..b353aabd2 100644
> > --- a/lib/librte_lpm/rte_lpm_version.map
> > +++ b/lib/librte_lpm/rte_lpm_version.map
> > @@ -44,3 +44,9 @@ DPDK_17.05 {
> >   	rte_lpm6_lookup_bulk_func;
> >
> >   } DPDK_16.04;
> > +
> > +EXPERIMENTAL {
> > +	global:
> > +
> > +	rte_lpm_rcu_qsbr_add;
> > +};
> 
> --
> Regards,
> Vladimir
Honnappa Nagarahalli Oct. 13, 2019, 4:36 a.m. UTC | #4
<snip>

> Hi guys,
I have tried to consolidate design related questions here. If I have missed anything, please add.

> 
> >
> > From: Ruifeng Wang <ruifeng.wang@arm.com>
> >
> > Currently, the tbl8 group is freed even though the readers might be
> > using the tbl8 group entries. The freed tbl8 group can be reallocated
> > quickly. This results in incorrect lookup results.
> >
> > RCU QSBR process is integrated for safe tbl8 group reclaim.
> > Refer to RCU documentation to understand various aspects of
> > integrating RCU library into other libraries.
> >
> > Signed-off-by: Ruifeng Wang <ruifeng.wang@arm.com>
> > Reviewed-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> > ---
> >  lib/librte_lpm/Makefile            |   3 +-
> >  lib/librte_lpm/meson.build         |   2 +
> >  lib/librte_lpm/rte_lpm.c           | 102 +++++++++++++++++++++++++----
> >  lib/librte_lpm/rte_lpm.h           |  21 ++++++
> >  lib/librte_lpm/rte_lpm_version.map |   6 ++
> >  5 files changed, 122 insertions(+), 12 deletions(-)
> >
> > diff --git a/lib/librte_lpm/Makefile b/lib/librte_lpm/Makefile index
> > a7946a1c5..ca9e16312 100644
> > --- a/lib/librte_lpm/Makefile
> > +++ b/lib/librte_lpm/Makefile
> > @@ -6,9 +6,10 @@ include $(RTE_SDK)/mk/rte.vars.mk  # library name
> > LIB = librte_lpm.a
> >
> > +CFLAGS += -DALLOW_EXPERIMENTAL_API
> >  CFLAGS += -O3
> >  CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -LDLIBS += -lrte_eal -lrte_hash
> > +LDLIBS += -lrte_eal -lrte_hash -lrte_rcu
> >
> >  EXPORT_MAP := rte_lpm_version.map
> >
> > diff --git a/lib/librte_lpm/meson.build b/lib/librte_lpm/meson.build
> > index a5176d8ae..19a35107f 100644
> > --- a/lib/librte_lpm/meson.build
> > +++ b/lib/librte_lpm/meson.build
> > @@ -2,9 +2,11 @@
> >  # Copyright(c) 2017 Intel Corporation
> >
> >  version = 2
> > +allow_experimental_apis = true
> >  sources = files('rte_lpm.c', 'rte_lpm6.c')  headers =
> > files('rte_lpm.h', 'rte_lpm6.h')  # since header files have different
> > names, we can install all vector headers  # without worrying about
> > which architecture we actually need  headers +=
> > files('rte_lpm_altivec.h', 'rte_lpm_neon.h', 'rte_lpm_sse.h')  deps +=
> > ['hash']
> > +deps += ['rcu']
> > diff --git a/lib/librte_lpm/rte_lpm.c b/lib/librte_lpm/rte_lpm.c index
> > 3a929a1b1..ca58d4b35 100644
> > --- a/lib/librte_lpm/rte_lpm.c
> > +++ b/lib/librte_lpm/rte_lpm.c
> > @@ -1,5 +1,6 @@
> >  /* SPDX-License-Identifier: BSD-3-Clause
> >   * Copyright(c) 2010-2014 Intel Corporation
> > + * Copyright(c) 2019 Arm Limited
> >   */
> >
> >  #include <string.h>
> > @@ -381,6 +382,8 @@ rte_lpm_free_v1604(struct rte_lpm *lpm)
> >
> >  	rte_mcfg_tailq_write_unlock();
> >
> > +	if (lpm->dq)
> > +		rte_rcu_qsbr_dq_delete(lpm->dq);
> >  	rte_free(lpm->tbl8);
> >  	rte_free(lpm->rules_tbl);
> >  	rte_free(lpm);
> > @@ -390,6 +393,59 @@ BIND_DEFAULT_SYMBOL(rte_lpm_free, _v1604,
> 16.04);
> > MAP_STATIC_SYMBOL(void rte_lpm_free(struct rte_lpm *lpm),
> >  		rte_lpm_free_v1604);
> >
> > +struct __rte_lpm_rcu_dq_entry {
> > +	uint32_t tbl8_group_index;
> > +	uint32_t pad;
> > +};
> > +
> > +static void
> > +__lpm_rcu_qsbr_free_resource(void *p, void *data) {
> > +	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> > +	struct __rte_lpm_rcu_dq_entry *e =
> > +			(struct __rte_lpm_rcu_dq_entry *)data;
> > +	struct rte_lpm_tbl_entry *tbl8 = (struct rte_lpm_tbl_entry *)p;
> > +
> > +	/* Set tbl8 group invalid */
> > +	__atomic_store(&tbl8[e->tbl8_group_index], &zero_tbl8_entry,
> > +		__ATOMIC_RELAXED);
> > +}
> > +
> > +/* Associate QSBR variable with an LPM object.
> > + */
> > +int
> > +rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr *v) {
> > +	char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
> > +	struct rte_rcu_qsbr_dq_parameters params;
> > +
> > +	if ((lpm == NULL) || (v == NULL)) {
> > +		rte_errno = EINVAL;
> > +		return 1;
> > +	}
> > +
> > +	if (lpm->dq) {
> > +		rte_errno = EEXIST;
> > +		return 1;
> > +	}
> > +
> > +	/* Init QSBR defer queue. */
> > +	snprintf(rcu_dq_name, sizeof(rcu_dq_name), "LPM_RCU_%s", lpm-
> >name);
> > +	params.name = rcu_dq_name;
> > +	params.size = lpm->number_tbl8s;
> > +	params.esize = sizeof(struct __rte_lpm_rcu_dq_entry);
> > +	params.f = __lpm_rcu_qsbr_free_resource;
> > +	params.p = lpm->tbl8;
> > +	params.v = v;
> > +	lpm->dq = rte_rcu_qsbr_dq_create(&params);
> > +	if (lpm->dq == NULL) {
> > +		RTE_LOG(ERR, LPM, "LPM QS defer queue creation failed\n");
> > +		return 1;
> > +	}
> 
> Few thoughts about that function:
Few things to keep in mind, the goal of the design is to make it easy for the applications to adopt lock-free algorithms. The reclamation process in the writer is a major portion of code one has to write for using lock-free algorithms. The current design is such that the writer does not have to change any code or write additional code other than calling 'rte_lpm_rcu_qsbr_add'.

> It names rcu_qsbr_add() but in fact it allocates defer queue for give rcu var.
> So first thought - is it always necessary?
This is part of the design. If the application does not want to use this integrated logic then, it does not have to call this API. It can use the RCU defer APIs to implement its own logic. But, if I ask the question, does this integrated logic address most of the use cases of the LPM library, I think the answer is yes.

> For some use-cases I suppose user might be ok to wait for quiescent state
> change
> inside tbl8_free()?
Yes, that is a possibility (for ex: no frequent route changes). But, I think that is very trivial for the application to implement. Though, the LPM library has to separate the 'delete' and 'free' operations. Similar operations are provided in rte_hash library. IMO, we should follow consistent approach.

> Another thing you do allocate defer queue, but it is internal, so user can't call
> reclaim() manually, which looks strange.
> Why not to return defer_queue pointer to the user, so he can call reclaim()
> himself at appropriate time?
The intention of the design is to take the complexity away from the user of LPM library. IMO, the current design will address most uses cases of LPM library. If we expose the 2 parameters (when to trigger reclamation and how much to reclaim) in the 'rte_lpm_rcu_qsbr_add' API, it should provide enough flexibility to the application.

> Third thing - you always allocate defer queue with size equal to number of
> tbl8.
> Though I understand it could be up to 16M tbl8 groups inside the LPM.
> Do we really need defer queue that long?
No, we do not need it to be this long. It is this long today to avoid returning no-space on the defer queue error.

> Especially  considering that current rcu_defer_queue will start reclamation
> when 1/8 of defer_quueue becomes full and wouldn't reclaim more then
> 1/16 of it.
> Probably better to let user to decide himself how long defer_queue he needs
> for that LPM?
It makes sense to expose it to the user if the writer-writer concurrency is lock-free (no memory allocation allowed to expand the defer queue size when the queue is full). However, LPM is not lock-free on the writer side. If we think the writer could be lock-free in the future, it has to be exposed to the user. 

> 
> Konstantin
Pulling questions/comments from other threads:
Can we leave reclamation to some other house-keeping thread to do (sort of garbage collector). Or such mode is not supported/planned?

[Honnappa] If the reclamation cost is small, the current method provides advantages over having a separate thread to do reclamation. I did not plan to provide such an option. But may be it makes sense to keep the options open (especially from ABI perspective). May be we should add a flags field which will allow us to implement different methods in the future?

> 
> 
> > +
> > +	return 0;
> > +}
> > +
> >  /*
> >   * Adds a rule to the rule table.
> >   *
> > @@ -679,14 +735,15 @@ tbl8_alloc_v20(struct rte_lpm_tbl_entry_v20
> > *tbl8)  }
> >
> >  static int32_t
> > -tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t
> > number_tbl8s)
> > +__tbl8_alloc_v1604(struct rte_lpm *lpm)
> >  {
> >  	uint32_t group_idx; /* tbl8 group index. */
> >  	struct rte_lpm_tbl_entry *tbl8_entry;
> >
> >  	/* Scan through tbl8 to find a free (i.e. INVALID) tbl8 group. */
> > -	for (group_idx = 0; group_idx < number_tbl8s; group_idx++) {
> > -		tbl8_entry = &tbl8[group_idx *
> RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
> > +	for (group_idx = 0; group_idx < lpm->number_tbl8s; group_idx++) {
> > +		tbl8_entry = &lpm->tbl8[group_idx *
> > +
> 	RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
> >  		/* If a free tbl8 group is found clean it and set as VALID. */
> >  		if (!tbl8_entry->valid_group) {
> >  			struct rte_lpm_tbl_entry new_tbl8_entry = { @@ -
> 712,6 +769,21 @@
> > tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
> >  	return -ENOSPC;
> >  }
> >
> > +static int32_t
> > +tbl8_alloc_v1604(struct rte_lpm *lpm) {
> > +	int32_t group_idx; /* tbl8 group index. */
> > +
> > +	group_idx = __tbl8_alloc_v1604(lpm);
> > +	if ((group_idx < 0) && (lpm->dq != NULL)) {
> > +		/* If there are no tbl8 groups try to reclaim some. */
> > +		if (rte_rcu_qsbr_dq_reclaim(lpm->dq) == 0)
> > +			group_idx = __tbl8_alloc_v1604(lpm);
> > +	}
> > +
> > +	return group_idx;
> > +}
> > +
> >  static void
> >  tbl8_free_v20(struct rte_lpm_tbl_entry_v20 *tbl8, uint32_t
> > tbl8_group_start)  { @@ -728,13 +800,21 @@ tbl8_free_v20(struct
> > rte_lpm_tbl_entry_v20 *tbl8, uint32_t tbl8_group_start)  }
> >
> >  static void
> > -tbl8_free_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t
> > tbl8_group_start)
> > +tbl8_free_v1604(struct rte_lpm *lpm, uint32_t tbl8_group_start)
> >  {
> > -	/* Set tbl8 group invalid*/
> >  	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> > +	struct __rte_lpm_rcu_dq_entry e;
> >
> > -	__atomic_store(&tbl8[tbl8_group_start], &zero_tbl8_entry,
> > -			__ATOMIC_RELAXED);
> > +	if (lpm->dq != NULL) {
> > +		e.tbl8_group_index = tbl8_group_start;
> > +		e.pad = 0;
> > +		/* Push into QSBR defer queue. */
> > +		rte_rcu_qsbr_dq_enqueue(lpm->dq, (void *)&e);
> > +	} else {
> > +		/* Set tbl8 group invalid*/
> > +		__atomic_store(&lpm->tbl8[tbl8_group_start],
> &zero_tbl8_entry,
> > +				__ATOMIC_RELAXED);
> > +	}
> >  }
> >
> >  static __rte_noinline int32_t
> > @@ -1037,7 +1117,7 @@ add_depth_big_v1604(struct rte_lpm *lpm,
> > uint32_t ip_masked, uint8_t depth,
> >
> >  	if (!lpm->tbl24[tbl24_index].valid) {
> >  		/* Search for a free tbl8 group. */
> > -		tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, lpm-
> >number_tbl8s);
> > +		tbl8_group_index = tbl8_alloc_v1604(lpm);
> >
> >  		/* Check tbl8 allocation was successful. */
> >  		if (tbl8_group_index < 0) {
> > @@ -1083,7 +1163,7 @@ add_depth_big_v1604(struct rte_lpm *lpm,
> uint32_t ip_masked, uint8_t depth,
> >  	} /* If valid entry but not extended calculate the index into Table8. */
> >  	else if (lpm->tbl24[tbl24_index].valid_group == 0) {
> >  		/* Search for free tbl8 group. */
> > -		tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, lpm-
> >number_tbl8s);
> > +		tbl8_group_index = tbl8_alloc_v1604(lpm);
> >
> >  		if (tbl8_group_index < 0) {
> >  			return tbl8_group_index;
> > @@ -1818,7 +1898,7 @@ delete_depth_big_v1604(struct rte_lpm *lpm,
> uint32_t ip_masked,
> >  		 */
> >  		lpm->tbl24[tbl24_index].valid = 0;
> >  		__atomic_thread_fence(__ATOMIC_RELEASE);
> > -		tbl8_free_v1604(lpm->tbl8, tbl8_group_start);
> > +		tbl8_free_v1604(lpm, tbl8_group_start);
> >  	} else if (tbl8_recycle_index > -1) {
> >  		/* Update tbl24 entry. */
> >  		struct rte_lpm_tbl_entry new_tbl24_entry = { @@ -1834,7
> +1914,7 @@
> > delete_depth_big_v1604(struct rte_lpm *lpm, uint32_t ip_masked,
> >  		__atomic_store(&lpm->tbl24[tbl24_index],
> &new_tbl24_entry,
> >  				__ATOMIC_RELAXED);
> >  		__atomic_thread_fence(__ATOMIC_RELEASE);
> > -		tbl8_free_v1604(lpm->tbl8, tbl8_group_start);
> > +		tbl8_free_v1604(lpm, tbl8_group_start);
> >  	}
> >  #undef group_idx
> >  	return 0;
> > diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h index
> > 906ec4483..49c12a68d 100644
> > --- a/lib/librte_lpm/rte_lpm.h
> > +++ b/lib/librte_lpm/rte_lpm.h
> > @@ -1,5 +1,6 @@
> >  /* SPDX-License-Identifier: BSD-3-Clause
> >   * Copyright(c) 2010-2014 Intel Corporation
> > + * Copyright(c) 2019 Arm Limited
> >   */
> >
> >  #ifndef _RTE_LPM_H_
> > @@ -21,6 +22,7 @@
> >  #include <rte_common.h>
> >  #include <rte_vect.h>
> >  #include <rte_compat.h>
> > +#include <rte_rcu_qsbr.h>
> >
> >  #ifdef __cplusplus
> >  extern "C" {
> > @@ -186,6 +188,7 @@ struct rte_lpm {
> >  			__rte_cache_aligned; /**< LPM tbl24 table. */
> >  	struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */
> >  	struct rte_lpm_rule *rules_tbl; /**< LPM rules. */
> > +	struct rte_rcu_qsbr_dq *dq;	/**< RCU QSBR defer queue.*/
> >  };
> >
> >  /**
> > @@ -248,6 +251,24 @@ rte_lpm_free_v20(struct rte_lpm_v20 *lpm);
> void
> > rte_lpm_free_v1604(struct rte_lpm *lpm);
> >
> > +/**
> > + * Associate RCU QSBR variable with an LPM object.
> > + *
> > + * @param lpm
> > + *   the lpm object to add RCU QSBR
> > + * @param v
> > + *   RCU QSBR variable
> > + * @return
> > + *   On success - 0
> > + *   On error - 1 with error code set in rte_errno.
> > + *   Possible rte_errno codes are:
> > + *   - EINVAL - invalid pointer
> > + *   - EEXIST - already added QSBR
> > + *   - ENOMEM - memory allocation failure
> > + */
> > +__rte_experimental
> > +int rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr
> > +*v);
> > +
> >  /**
> >   * Add a rule to the LPM table.
> >   *
> > diff --git a/lib/librte_lpm/rte_lpm_version.map
> > b/lib/librte_lpm/rte_lpm_version.map
> > index 90beac853..b353aabd2 100644
> > --- a/lib/librte_lpm/rte_lpm_version.map
> > +++ b/lib/librte_lpm/rte_lpm_version.map
> > @@ -44,3 +44,9 @@ DPDK_17.05 {
> >  	rte_lpm6_lookup_bulk_func;
> >
> >  } DPDK_16.04;
> > +
> > +EXPERIMENTAL {
> > +	global:
> > +
> > +	rte_lpm_rcu_qsbr_add;
> > +};
> > --
> > 2.17.1
Ananyev, Konstantin Oct. 15, 2019, 11:15 a.m. UTC | #5
> <snip>
> 
> > Hi guys,
> I have tried to consolidate design related questions here. If I have missed anything, please add.
> 
> >
> > >
> > > From: Ruifeng Wang <ruifeng.wang@arm.com>
> > >
> > > Currently, the tbl8 group is freed even though the readers might be
> > > using the tbl8 group entries. The freed tbl8 group can be reallocated
> > > quickly. This results in incorrect lookup results.
> > >
> > > RCU QSBR process is integrated for safe tbl8 group reclaim.
> > > Refer to RCU documentation to understand various aspects of
> > > integrating RCU library into other libraries.
> > >
> > > Signed-off-by: Ruifeng Wang <ruifeng.wang@arm.com>
> > > Reviewed-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> > > ---
> > >  lib/librte_lpm/Makefile            |   3 +-
> > >  lib/librte_lpm/meson.build         |   2 +
> > >  lib/librte_lpm/rte_lpm.c           | 102 +++++++++++++++++++++++++----
> > >  lib/librte_lpm/rte_lpm.h           |  21 ++++++
> > >  lib/librte_lpm/rte_lpm_version.map |   6 ++
> > >  5 files changed, 122 insertions(+), 12 deletions(-)
> > >
> > > diff --git a/lib/librte_lpm/Makefile b/lib/librte_lpm/Makefile index
> > > a7946a1c5..ca9e16312 100644
> > > --- a/lib/librte_lpm/Makefile
> > > +++ b/lib/librte_lpm/Makefile
> > > @@ -6,9 +6,10 @@ include $(RTE_SDK)/mk/rte.vars.mk  # library name
> > > LIB = librte_lpm.a
> > >
> > > +CFLAGS += -DALLOW_EXPERIMENTAL_API
> > >  CFLAGS += -O3
> > >  CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -LDLIBS += -lrte_eal -lrte_hash
> > > +LDLIBS += -lrte_eal -lrte_hash -lrte_rcu
> > >
> > >  EXPORT_MAP := rte_lpm_version.map
> > >
> > > diff --git a/lib/librte_lpm/meson.build b/lib/librte_lpm/meson.build
> > > index a5176d8ae..19a35107f 100644
> > > --- a/lib/librte_lpm/meson.build
> > > +++ b/lib/librte_lpm/meson.build
> > > @@ -2,9 +2,11 @@
> > >  # Copyright(c) 2017 Intel Corporation
> > >
> > >  version = 2
> > > +allow_experimental_apis = true
> > >  sources = files('rte_lpm.c', 'rte_lpm6.c')  headers =
> > > files('rte_lpm.h', 'rte_lpm6.h')  # since header files have different
> > > names, we can install all vector headers  # without worrying about
> > > which architecture we actually need  headers +=
> > > files('rte_lpm_altivec.h', 'rte_lpm_neon.h', 'rte_lpm_sse.h')  deps +=
> > > ['hash']
> > > +deps += ['rcu']
> > > diff --git a/lib/librte_lpm/rte_lpm.c b/lib/librte_lpm/rte_lpm.c index
> > > 3a929a1b1..ca58d4b35 100644
> > > --- a/lib/librte_lpm/rte_lpm.c
> > > +++ b/lib/librte_lpm/rte_lpm.c
> > > @@ -1,5 +1,6 @@
> > >  /* SPDX-License-Identifier: BSD-3-Clause
> > >   * Copyright(c) 2010-2014 Intel Corporation
> > > + * Copyright(c) 2019 Arm Limited
> > >   */
> > >
> > >  #include <string.h>
> > > @@ -381,6 +382,8 @@ rte_lpm_free_v1604(struct rte_lpm *lpm)
> > >
> > >  	rte_mcfg_tailq_write_unlock();
> > >
> > > +	if (lpm->dq)
> > > +		rte_rcu_qsbr_dq_delete(lpm->dq);
> > >  	rte_free(lpm->tbl8);
> > >  	rte_free(lpm->rules_tbl);
> > >  	rte_free(lpm);
> > > @@ -390,6 +393,59 @@ BIND_DEFAULT_SYMBOL(rte_lpm_free, _v1604,
> > 16.04);
> > > MAP_STATIC_SYMBOL(void rte_lpm_free(struct rte_lpm *lpm),
> > >  		rte_lpm_free_v1604);
> > >
> > > +struct __rte_lpm_rcu_dq_entry {
> > > +	uint32_t tbl8_group_index;
> > > +	uint32_t pad;
> > > +};
> > > +
> > > +static void
> > > +__lpm_rcu_qsbr_free_resource(void *p, void *data) {
> > > +	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> > > +	struct __rte_lpm_rcu_dq_entry *e =
> > > +			(struct __rte_lpm_rcu_dq_entry *)data;
> > > +	struct rte_lpm_tbl_entry *tbl8 = (struct rte_lpm_tbl_entry *)p;
> > > +
> > > +	/* Set tbl8 group invalid */
> > > +	__atomic_store(&tbl8[e->tbl8_group_index], &zero_tbl8_entry,
> > > +		__ATOMIC_RELAXED);
> > > +}
> > > +
> > > +/* Associate QSBR variable with an LPM object.
> > > + */
> > > +int
> > > +rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr *v) {
> > > +	char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
> > > +	struct rte_rcu_qsbr_dq_parameters params;
> > > +
> > > +	if ((lpm == NULL) || (v == NULL)) {
> > > +		rte_errno = EINVAL;
> > > +		return 1;
> > > +	}
> > > +
> > > +	if (lpm->dq) {
> > > +		rte_errno = EEXIST;
> > > +		return 1;
> > > +	}
> > > +
> > > +	/* Init QSBR defer queue. */
> > > +	snprintf(rcu_dq_name, sizeof(rcu_dq_name), "LPM_RCU_%s", lpm-
> > >name);
> > > +	params.name = rcu_dq_name;
> > > +	params.size = lpm->number_tbl8s;
> > > +	params.esize = sizeof(struct __rte_lpm_rcu_dq_entry);
> > > +	params.f = __lpm_rcu_qsbr_free_resource;
> > > +	params.p = lpm->tbl8;
> > > +	params.v = v;
> > > +	lpm->dq = rte_rcu_qsbr_dq_create(&params);
> > > +	if (lpm->dq == NULL) {
> > > +		RTE_LOG(ERR, LPM, "LPM QS defer queue creation failed\n");
> > > +		return 1;
> > > +	}
> >
> > Few thoughts about that function:
> Few things to keep in mind, the goal of the design is to make it easy for the applications to adopt lock-free algorithms. The reclamation
> process in the writer is a major portion of code one has to write for using lock-free algorithms. The current design is such that the writer
> does not have to change any code or write additional code other than calling 'rte_lpm_rcu_qsbr_add'.
> 
> > It names rcu_qsbr_add() but in fact it allocates defer queue for give rcu var.
> > So first thought - is it always necessary?
> This is part of the design. If the application does not want to use this integrated logic then, it does not have to call this API. It can use the
> RCU defer APIs to implement its own logic. But, if I ask the question, does this integrated logic address most of the use cases of the LPM
> library, I think the answer is yes.
> 
> > For some use-cases I suppose user might be ok to wait for quiescent state
> > change
> > inside tbl8_free()?
> Yes, that is a possibility (for ex: no frequent route changes). But, I think that is very trivial for the application to implement. Though, the LPM
> library has to separate the 'delete' and 'free' operations. 

Exactly.
That's why it is not trivial with current LPM library.
In fact to do that himself right now, user would have to implement and support his own version of LPM code.

Honestly, I don't understand why you consider it as a drawback.
From my perspective only few things need to be changed:

1. Add 2 parameters to 'rte_lpm_rcu_qsbr_add():
    number of elems in defer_queue
    reclaim() threshold value.
If the user doesn't want to provide any values, that's fine we can use default ones here
(as you do it right now).
2. Make rte_lpm_rcu_qsbr_add() to return pointer to the defer_queue.
Again if user doesn't want to call reclaim() himself, he can just ignore return value.

These 2 changes will provide us with necessary flexibility that would help to cover more use-cases:
- user can decide how big should be the defer queue
- user can decide when/how he wants to do reclaim()

Konstantin

>Similar operations are provided in rte_hash library. IMO, we should follow
> consistent approach.
> 
> > Another thing you do allocate defer queue, but it is internal, so user can't call
> > reclaim() manually, which looks strange.
> > Why not to return defer_queue pointer to the user, so he can call reclaim()
> > himself at appropriate time?
> The intention of the design is to take the complexity away from the user of LPM library. IMO, the current design will address most uses
> cases of LPM library. If we expose the 2 parameters (when to trigger reclamation and how much to reclaim) in the 'rte_lpm_rcu_qsbr_add'
> API, it should provide enough flexibility to the application.
> 
> > Third thing - you always allocate defer queue with size equal to number of
> > tbl8.
> > Though I understand it could be up to 16M tbl8 groups inside the LPM.
> > Do we really need defer queue that long?
> No, we do not need it to be this long. It is this long today to avoid returning no-space on the defer queue error.
> 
> > Especially  considering that current rcu_defer_queue will start reclamation
> > when 1/8 of defer_quueue becomes full and wouldn't reclaim more then
> > 1/16 of it.
> > Probably better to let user to decide himself how long defer_queue he needs
> > for that LPM?
> It makes sense to expose it to the user if the writer-writer concurrency is lock-free (no memory allocation allowed to expand the defer
> queue size when the queue is full). However, LPM is not lock-free on the writer side. If we think the writer could be lock-free in the future, it
> has to be exposed to the user.
> 
> >
> > Konstantin
> Pulling questions/comments from other threads:
> Can we leave reclamation to some other house-keeping thread to do (sort of garbage collector). Or such mode is not supported/planned?
> 
> [Honnappa] If the reclamation cost is small, the current method provides advantages over having a separate thread to do reclamation. I did
> not plan to provide such an option. But may be it makes sense to keep the options open (especially from ABI perspective). May be we
> should add a flags field which will allow us to implement different methods in the future?
> 
> >
> >
> > > +
> > > +	return 0;
> > > +}
> > > +
> > >  /*
> > >   * Adds a rule to the rule table.
> > >   *
> > > @@ -679,14 +735,15 @@ tbl8_alloc_v20(struct rte_lpm_tbl_entry_v20
> > > *tbl8)  }
> > >
> > >  static int32_t
> > > -tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t
> > > number_tbl8s)
> > > +__tbl8_alloc_v1604(struct rte_lpm *lpm)
> > >  {
> > >  	uint32_t group_idx; /* tbl8 group index. */
> > >  	struct rte_lpm_tbl_entry *tbl8_entry;
> > >
> > >  	/* Scan through tbl8 to find a free (i.e. INVALID) tbl8 group. */
> > > -	for (group_idx = 0; group_idx < number_tbl8s; group_idx++) {
> > > -		tbl8_entry = &tbl8[group_idx *
> > RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
> > > +	for (group_idx = 0; group_idx < lpm->number_tbl8s; group_idx++) {
> > > +		tbl8_entry = &lpm->tbl8[group_idx *
> > > +
> > 	RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
> > >  		/* If a free tbl8 group is found clean it and set as VALID. */
> > >  		if (!tbl8_entry->valid_group) {
> > >  			struct rte_lpm_tbl_entry new_tbl8_entry = { @@ -
> > 712,6 +769,21 @@
> > > tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
> > >  	return -ENOSPC;
> > >  }
> > >
> > > +static int32_t
> > > +tbl8_alloc_v1604(struct rte_lpm *lpm) {
> > > +	int32_t group_idx; /* tbl8 group index. */
> > > +
> > > +	group_idx = __tbl8_alloc_v1604(lpm);
> > > +	if ((group_idx < 0) && (lpm->dq != NULL)) {
> > > +		/* If there are no tbl8 groups try to reclaim some. */
> > > +		if (rte_rcu_qsbr_dq_reclaim(lpm->dq) == 0)
> > > +			group_idx = __tbl8_alloc_v1604(lpm);
> > > +	}
> > > +
> > > +	return group_idx;
> > > +}
> > > +
> > >  static void
> > >  tbl8_free_v20(struct rte_lpm_tbl_entry_v20 *tbl8, uint32_t
> > > tbl8_group_start)  { @@ -728,13 +800,21 @@ tbl8_free_v20(struct
> > > rte_lpm_tbl_entry_v20 *tbl8, uint32_t tbl8_group_start)  }
> > >
> > >  static void
> > > -tbl8_free_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t
> > > tbl8_group_start)
> > > +tbl8_free_v1604(struct rte_lpm *lpm, uint32_t tbl8_group_start)
> > >  {
> > > -	/* Set tbl8 group invalid*/
> > >  	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> > > +	struct __rte_lpm_rcu_dq_entry e;
> > >
> > > -	__atomic_store(&tbl8[tbl8_group_start], &zero_tbl8_entry,
> > > -			__ATOMIC_RELAXED);
> > > +	if (lpm->dq != NULL) {
> > > +		e.tbl8_group_index = tbl8_group_start;
> > > +		e.pad = 0;
> > > +		/* Push into QSBR defer queue. */
> > > +		rte_rcu_qsbr_dq_enqueue(lpm->dq, (void *)&e);
> > > +	} else {
> > > +		/* Set tbl8 group invalid*/
> > > +		__atomic_store(&lpm->tbl8[tbl8_group_start],
> > &zero_tbl8_entry,
> > > +				__ATOMIC_RELAXED);
> > > +	}
> > >  }
> > >
> > >  static __rte_noinline int32_t
> > > @@ -1037,7 +1117,7 @@ add_depth_big_v1604(struct rte_lpm *lpm,
> > > uint32_t ip_masked, uint8_t depth,
> > >
> > >  	if (!lpm->tbl24[tbl24_index].valid) {
> > >  		/* Search for a free tbl8 group. */
> > > -		tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, lpm-
> > >number_tbl8s);
> > > +		tbl8_group_index = tbl8_alloc_v1604(lpm);
> > >
> > >  		/* Check tbl8 allocation was successful. */
> > >  		if (tbl8_group_index < 0) {
> > > @@ -1083,7 +1163,7 @@ add_depth_big_v1604(struct rte_lpm *lpm,
> > uint32_t ip_masked, uint8_t depth,
> > >  	} /* If valid entry but not extended calculate the index into Table8. */
> > >  	else if (lpm->tbl24[tbl24_index].valid_group == 0) {
> > >  		/* Search for free tbl8 group. */
> > > -		tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, lpm-
> > >number_tbl8s);
> > > +		tbl8_group_index = tbl8_alloc_v1604(lpm);
> > >
> > >  		if (tbl8_group_index < 0) {
> > >  			return tbl8_group_index;
> > > @@ -1818,7 +1898,7 @@ delete_depth_big_v1604(struct rte_lpm *lpm,
> > uint32_t ip_masked,
> > >  		 */
> > >  		lpm->tbl24[tbl24_index].valid = 0;
> > >  		__atomic_thread_fence(__ATOMIC_RELEASE);
> > > -		tbl8_free_v1604(lpm->tbl8, tbl8_group_start);
> > > +		tbl8_free_v1604(lpm, tbl8_group_start);
> > >  	} else if (tbl8_recycle_index > -1) {
> > >  		/* Update tbl24 entry. */
> > >  		struct rte_lpm_tbl_entry new_tbl24_entry = { @@ -1834,7
> > +1914,7 @@
> > > delete_depth_big_v1604(struct rte_lpm *lpm, uint32_t ip_masked,
> > >  		__atomic_store(&lpm->tbl24[tbl24_index],
> > &new_tbl24_entry,
> > >  				__ATOMIC_RELAXED);
> > >  		__atomic_thread_fence(__ATOMIC_RELEASE);
> > > -		tbl8_free_v1604(lpm->tbl8, tbl8_group_start);
> > > +		tbl8_free_v1604(lpm, tbl8_group_start);
> > >  	}
> > >  #undef group_idx
> > >  	return 0;
> > > diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h index
> > > 906ec4483..49c12a68d 100644
> > > --- a/lib/librte_lpm/rte_lpm.h
> > > +++ b/lib/librte_lpm/rte_lpm.h
> > > @@ -1,5 +1,6 @@
> > >  /* SPDX-License-Identifier: BSD-3-Clause
> > >   * Copyright(c) 2010-2014 Intel Corporation
> > > + * Copyright(c) 2019 Arm Limited
> > >   */
> > >
> > >  #ifndef _RTE_LPM_H_
> > > @@ -21,6 +22,7 @@
> > >  #include <rte_common.h>
> > >  #include <rte_vect.h>
> > >  #include <rte_compat.h>
> > > +#include <rte_rcu_qsbr.h>
> > >
> > >  #ifdef __cplusplus
> > >  extern "C" {
> > > @@ -186,6 +188,7 @@ struct rte_lpm {
> > >  			__rte_cache_aligned; /**< LPM tbl24 table. */
> > >  	struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */
> > >  	struct rte_lpm_rule *rules_tbl; /**< LPM rules. */
> > > +	struct rte_rcu_qsbr_dq *dq;	/**< RCU QSBR defer queue.*/
> > >  };
> > >
> > >  /**
> > > @@ -248,6 +251,24 @@ rte_lpm_free_v20(struct rte_lpm_v20 *lpm);
> > void
> > > rte_lpm_free_v1604(struct rte_lpm *lpm);
> > >
> > > +/**
> > > + * Associate RCU QSBR variable with an LPM object.
> > > + *
> > > + * @param lpm
> > > + *   the lpm object to add RCU QSBR
> > > + * @param v
> > > + *   RCU QSBR variable
> > > + * @return
> > > + *   On success - 0
> > > + *   On error - 1 with error code set in rte_errno.
> > > + *   Possible rte_errno codes are:
> > > + *   - EINVAL - invalid pointer
> > > + *   - EEXIST - already added QSBR
> > > + *   - ENOMEM - memory allocation failure
> > > + */
> > > +__rte_experimental
> > > +int rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr
> > > +*v);
> > > +
> > >  /**
> > >   * Add a rule to the LPM table.
> > >   *
> > > diff --git a/lib/librte_lpm/rte_lpm_version.map
> > > b/lib/librte_lpm/rte_lpm_version.map
> > > index 90beac853..b353aabd2 100644
> > > --- a/lib/librte_lpm/rte_lpm_version.map
> > > +++ b/lib/librte_lpm/rte_lpm_version.map
> > > @@ -44,3 +44,9 @@ DPDK_17.05 {
> > >  	rte_lpm6_lookup_bulk_func;
> > >
> > >  } DPDK_16.04;
> > > +
> > > +EXPERIMENTAL {
> > > +	global:
> > > +
> > > +	rte_lpm_rcu_qsbr_add;
> > > +};
> > > --
> > > 2.17.1
Honnappa Nagarahalli Oct. 18, 2019, 3:32 a.m. UTC | #6
<snip>

> >
> > > Hi guys,
> > I have tried to consolidate design related questions here. If I have missed
> anything, please add.
> >
> > >
> > > >
> > > > From: Ruifeng Wang <ruifeng.wang@arm.com>
> > > >
> > > > Currently, the tbl8 group is freed even though the readers might
> > > > be using the tbl8 group entries. The freed tbl8 group can be
> > > > reallocated quickly. This results in incorrect lookup results.
> > > >
> > > > RCU QSBR process is integrated for safe tbl8 group reclaim.
> > > > Refer to RCU documentation to understand various aspects of
> > > > integrating RCU library into other libraries.
> > > >
> > > > Signed-off-by: Ruifeng Wang <ruifeng.wang@arm.com>
> > > > Reviewed-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> > > > ---
> > > >  lib/librte_lpm/Makefile            |   3 +-
> > > >  lib/librte_lpm/meson.build         |   2 +
> > > >  lib/librte_lpm/rte_lpm.c           | 102 +++++++++++++++++++++++++----
> > > >  lib/librte_lpm/rte_lpm.h           |  21 ++++++
> > > >  lib/librte_lpm/rte_lpm_version.map |   6 ++
> > > >  5 files changed, 122 insertions(+), 12 deletions(-)
> > > >
> > > > diff --git a/lib/librte_lpm/Makefile b/lib/librte_lpm/Makefile
> > > > index
> > > > a7946a1c5..ca9e16312 100644
> > > > --- a/lib/librte_lpm/Makefile
> > > > +++ b/lib/librte_lpm/Makefile
> > > > @@ -6,9 +6,10 @@ include $(RTE_SDK)/mk/rte.vars.mk  # library name
> > > > LIB = librte_lpm.a
> > > >
> > > > +CFLAGS += -DALLOW_EXPERIMENTAL_API
> > > >  CFLAGS += -O3
> > > >  CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -LDLIBS += -lrte_eal
> > > > -lrte_hash
> > > > +LDLIBS += -lrte_eal -lrte_hash -lrte_rcu
> > > >
> > > >  EXPORT_MAP := rte_lpm_version.map
> > > >
> > > > diff --git a/lib/librte_lpm/meson.build
> > > > b/lib/librte_lpm/meson.build index a5176d8ae..19a35107f 100644
> > > > --- a/lib/librte_lpm/meson.build
> > > > +++ b/lib/librte_lpm/meson.build
> > > > @@ -2,9 +2,11 @@
> > > >  # Copyright(c) 2017 Intel Corporation
> > > >
> > > >  version = 2
> > > > +allow_experimental_apis = true
> > > >  sources = files('rte_lpm.c', 'rte_lpm6.c')  headers =
> > > > files('rte_lpm.h', 'rte_lpm6.h')  # since header files have
> > > > different names, we can install all vector headers  # without
> > > > worrying about which architecture we actually need  headers +=
> > > > files('rte_lpm_altivec.h', 'rte_lpm_neon.h', 'rte_lpm_sse.h')
> > > > deps += ['hash']
> > > > +deps += ['rcu']
> > > > diff --git a/lib/librte_lpm/rte_lpm.c b/lib/librte_lpm/rte_lpm.c
> > > > index
> > > > 3a929a1b1..ca58d4b35 100644
> > > > --- a/lib/librte_lpm/rte_lpm.c
> > > > +++ b/lib/librte_lpm/rte_lpm.c
> > > > @@ -1,5 +1,6 @@
> > > >  /* SPDX-License-Identifier: BSD-3-Clause
> > > >   * Copyright(c) 2010-2014 Intel Corporation
> > > > + * Copyright(c) 2019 Arm Limited
> > > >   */
> > > >
> > > >  #include <string.h>
> > > > @@ -381,6 +382,8 @@ rte_lpm_free_v1604(struct rte_lpm *lpm)
> > > >
> > > >  	rte_mcfg_tailq_write_unlock();
> > > >
> > > > +	if (lpm->dq)
> > > > +		rte_rcu_qsbr_dq_delete(lpm->dq);
> > > >  	rte_free(lpm->tbl8);
> > > >  	rte_free(lpm->rules_tbl);
> > > >  	rte_free(lpm);
> > > > @@ -390,6 +393,59 @@ BIND_DEFAULT_SYMBOL(rte_lpm_free, _v1604,
> > > 16.04);
> > > > MAP_STATIC_SYMBOL(void rte_lpm_free(struct rte_lpm *lpm),
> > > >  		rte_lpm_free_v1604);
> > > >
> > > > +struct __rte_lpm_rcu_dq_entry {
> > > > +	uint32_t tbl8_group_index;
> > > > +	uint32_t pad;
> > > > +};
> > > > +
> > > > +static void
> > > > +__lpm_rcu_qsbr_free_resource(void *p, void *data) {
> > > > +	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> > > > +	struct __rte_lpm_rcu_dq_entry *e =
> > > > +			(struct __rte_lpm_rcu_dq_entry *)data;
> > > > +	struct rte_lpm_tbl_entry *tbl8 = (struct rte_lpm_tbl_entry *)p;
> > > > +
> > > > +	/* Set tbl8 group invalid */
> > > > +	__atomic_store(&tbl8[e->tbl8_group_index], &zero_tbl8_entry,
> > > > +		__ATOMIC_RELAXED);
> > > > +}
> > > > +
> > > > +/* Associate QSBR variable with an LPM object.
> > > > + */
> > > > +int
> > > > +rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr *v) {
> > > > +	char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
> > > > +	struct rte_rcu_qsbr_dq_parameters params;
> > > > +
> > > > +	if ((lpm == NULL) || (v == NULL)) {
> > > > +		rte_errno = EINVAL;
> > > > +		return 1;
> > > > +	}
> > > > +
> > > > +	if (lpm->dq) {
> > > > +		rte_errno = EEXIST;
> > > > +		return 1;
> > > > +	}
> > > > +
> > > > +	/* Init QSBR defer queue. */
> > > > +	snprintf(rcu_dq_name, sizeof(rcu_dq_name), "LPM_RCU_%s", lpm-
> > > >name);
> > > > +	params.name = rcu_dq_name;
> > > > +	params.size = lpm->number_tbl8s;
> > > > +	params.esize = sizeof(struct __rte_lpm_rcu_dq_entry);
> > > > +	params.f = __lpm_rcu_qsbr_free_resource;
> > > > +	params.p = lpm->tbl8;
> > > > +	params.v = v;
> > > > +	lpm->dq = rte_rcu_qsbr_dq_create(&params);
> > > > +	if (lpm->dq == NULL) {
> > > > +		RTE_LOG(ERR, LPM, "LPM QS defer queue creation failed\n");
> > > > +		return 1;
> > > > +	}
> > >
> > > Few thoughts about that function:
> > Few things to keep in mind, the goal of the design is to make it easy
> > for the applications to adopt lock-free algorithms. The reclamation
> > process in the writer is a major portion of code one has to write for using
> lock-free algorithms. The current design is such that the writer does not have
> to change any code or write additional code other than calling
> 'rte_lpm_rcu_qsbr_add'.
> >
> > > It names rcu_qsbr_add() but in fact it allocates defer queue for give rcu var.
> > > So first thought - is it always necessary?
> > This is part of the design. If the application does not want to use
> > this integrated logic then, it does not have to call this API. It can
> > use the RCU defer APIs to implement its own logic. But, if I ask the question,
> does this integrated logic address most of the use cases of the LPM library, I
> think the answer is yes.
> >
> > > For some use-cases I suppose user might be ok to wait for quiescent
> > > state change inside tbl8_free()?
> > Yes, that is a possibility (for ex: no frequent route changes). But, I
> > think that is very trivial for the application to implement. Though, the LPM
> library has to separate the 'delete' and 'free' operations.
> 
> Exactly.
> That's why it is not trivial with current LPM library.
> In fact to do that himself right now, user would have to implement and support
> his own version of LPM code.
😊, well we definitely don't want them to write their own library (if DPDK LPM is enough)
IMO, we need to be consistent with other libraries in terms of APIs. That's another topic.
I do not see any problem to implement this or provide facility to implement this in the future in the APIs now. We can add 'flags' field which will allow for other methods of reclamation.

> 
> Honestly, I don't understand why you consider it as a drawback.
> From my perspective only few things need to be changed:
> 
> 1. Add 2 parameters to 'rte_lpm_rcu_qsbr_add():
>     number of elems in defer_queue
>     reclaim() threshold value.
> If the user doesn't want to provide any values, that's fine we can use default
> ones here (as you do it right now).
I think we have agreed on this, I see the value in doing this.

> 2. Make rte_lpm_rcu_qsbr_add() to return pointer to the defer_queue.
> Again if user doesn't want to call reclaim() himself, he can just ignore return
> value.
Given the goal of reducing the burden on the user, this is not in that direction. But if you see a use case for it, I don't have any issues. Vladimir asked for it as well in the other thread.

> 
> These 2 changes will provide us with necessary flexibility that would help to
> cover more use-cases:
> - user can decide how big should be the defer queue
> - user can decide when/how he wants to do reclaim()
> 
> Konstantin
> 
> >Similar operations are provided in rte_hash library. IMO, we should
> >follow  consistent approach.
> >
> > > Another thing you do allocate defer queue, but it is internal, so
> > > user can't call
> > > reclaim() manually, which looks strange.
> > > Why not to return defer_queue pointer to the user, so he can call
> > > reclaim() himself at appropriate time?
> > The intention of the design is to take the complexity away from the
> > user of LPM library. IMO, the current design will address most uses cases of
> LPM library. If we expose the 2 parameters (when to trigger reclamation and
> how much to reclaim) in the 'rte_lpm_rcu_qsbr_add'
> > API, it should provide enough flexibility to the application.
> >
> > > Third thing - you always allocate defer queue with size equal to
> > > number of tbl8.
> > > Though I understand it could be up to 16M tbl8 groups inside the LPM.
> > > Do we really need defer queue that long?
> > No, we do not need it to be this long. It is this long today to avoid returning
> no-space on the defer queue error.
> >
> > > Especially  considering that current rcu_defer_queue will start
> > > reclamation when 1/8 of defer_quueue becomes full and wouldn't
> > > reclaim more then
> > > 1/16 of it.
> > > Probably better to let user to decide himself how long defer_queue
> > > he needs for that LPM?
> > It makes sense to expose it to the user if the writer-writer
> > concurrency is lock-free (no memory allocation allowed to expand the
> > defer queue size when the queue is full). However, LPM is not lock-free on
> the writer side. If we think the writer could be lock-free in the future, it has to
> be exposed to the user.
> >
> > >
> > > Konstantin
> > Pulling questions/comments from other threads:
> > Can we leave reclamation to some other house-keeping thread to do (sort of
> garbage collector). Or such mode is not supported/planned?
> >
> > [Honnappa] If the reclamation cost is small, the current method
> > provides advantages over having a separate thread to do reclamation. I
> > did not plan to provide such an option. But may be it makes sense to keep the
> options open (especially from ABI perspective). May be we should add a flags
> field which will allow us to implement different methods in the future?
> >
> > >
> > >
> > > > +
> > > > +	return 0;
> > > > +}
> > > > +
> > > >  /*
> > > >   * Adds a rule to the rule table.
> > > >   *
> > > > @@ -679,14 +735,15 @@ tbl8_alloc_v20(struct rte_lpm_tbl_entry_v20
> > > > *tbl8)  }
> > > >
> > > >  static int32_t
> > > > -tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t
> > > > number_tbl8s)
> > > > +__tbl8_alloc_v1604(struct rte_lpm *lpm)
> > > >  {
> > > >  	uint32_t group_idx; /* tbl8 group index. */
> > > >  	struct rte_lpm_tbl_entry *tbl8_entry;
> > > >
> > > >  	/* Scan through tbl8 to find a free (i.e. INVALID) tbl8 group. */
> > > > -	for (group_idx = 0; group_idx < number_tbl8s; group_idx++) {
> > > > -		tbl8_entry = &tbl8[group_idx *
> > > RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
> > > > +	for (group_idx = 0; group_idx < lpm->number_tbl8s; group_idx++) {
> > > > +		tbl8_entry = &lpm->tbl8[group_idx *
> > > > +
> > > 	RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
> > > >  		/* If a free tbl8 group is found clean it and set as VALID. */
> > > >  		if (!tbl8_entry->valid_group) {
> > > >  			struct rte_lpm_tbl_entry new_tbl8_entry = { @@ -
> > > 712,6 +769,21 @@
> > > > tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
> > > >  	return -ENOSPC;
> > > >  }
> > > >
> > > > +static int32_t
> > > > +tbl8_alloc_v1604(struct rte_lpm *lpm) {
> > > > +	int32_t group_idx; /* tbl8 group index. */
> > > > +
> > > > +	group_idx = __tbl8_alloc_v1604(lpm);
> > > > +	if ((group_idx < 0) && (lpm->dq != NULL)) {
> > > > +		/* If there are no tbl8 groups try to reclaim some. */
> > > > +		if (rte_rcu_qsbr_dq_reclaim(lpm->dq) == 0)
> > > > +			group_idx = __tbl8_alloc_v1604(lpm);
> > > > +	}
> > > > +
> > > > +	return group_idx;
> > > > +}
> > > > +
> > > >  static void
> > > >  tbl8_free_v20(struct rte_lpm_tbl_entry_v20 *tbl8, uint32_t
> > > > tbl8_group_start)  { @@ -728,13 +800,21 @@ tbl8_free_v20(struct
> > > > rte_lpm_tbl_entry_v20 *tbl8, uint32_t tbl8_group_start)  }
> > > >
> > > >  static void
> > > > -tbl8_free_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t
> > > > tbl8_group_start)
> > > > +tbl8_free_v1604(struct rte_lpm *lpm, uint32_t tbl8_group_start)
> > > >  {
> > > > -	/* Set tbl8 group invalid*/
> > > >  	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> > > > +	struct __rte_lpm_rcu_dq_entry e;
> > > >
> > > > -	__atomic_store(&tbl8[tbl8_group_start], &zero_tbl8_entry,
> > > > -			__ATOMIC_RELAXED);
> > > > +	if (lpm->dq != NULL) {
> > > > +		e.tbl8_group_index = tbl8_group_start;
> > > > +		e.pad = 0;
> > > > +		/* Push into QSBR defer queue. */
> > > > +		rte_rcu_qsbr_dq_enqueue(lpm->dq, (void *)&e);
> > > > +	} else {
> > > > +		/* Set tbl8 group invalid*/
> > > > +		__atomic_store(&lpm->tbl8[tbl8_group_start],
> > > &zero_tbl8_entry,
> > > > +				__ATOMIC_RELAXED);
> > > > +	}
> > > >  }
> > > >
> > > >  static __rte_noinline int32_t
> > > > @@ -1037,7 +1117,7 @@ add_depth_big_v1604(struct rte_lpm *lpm,
> > > > uint32_t ip_masked, uint8_t depth,
> > > >
> > > >  	if (!lpm->tbl24[tbl24_index].valid) {
> > > >  		/* Search for a free tbl8 group. */
> > > > -		tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, lpm-
> > > >number_tbl8s);
> > > > +		tbl8_group_index = tbl8_alloc_v1604(lpm);
> > > >
> > > >  		/* Check tbl8 allocation was successful. */
> > > >  		if (tbl8_group_index < 0) {
> > > > @@ -1083,7 +1163,7 @@ add_depth_big_v1604(struct rte_lpm *lpm,
> > > uint32_t ip_masked, uint8_t depth,
> > > >  	} /* If valid entry but not extended calculate the index into Table8. */
> > > >  	else if (lpm->tbl24[tbl24_index].valid_group == 0) {
> > > >  		/* Search for free tbl8 group. */
> > > > -		tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, lpm-
> > > >number_tbl8s);
> > > > +		tbl8_group_index = tbl8_alloc_v1604(lpm);
> > > >
> > > >  		if (tbl8_group_index < 0) {
> > > >  			return tbl8_group_index;
> > > > @@ -1818,7 +1898,7 @@ delete_depth_big_v1604(struct rte_lpm *lpm,
> > > uint32_t ip_masked,
> > > >  		 */
> > > >  		lpm->tbl24[tbl24_index].valid = 0;
> > > >  		__atomic_thread_fence(__ATOMIC_RELEASE);
> > > > -		tbl8_free_v1604(lpm->tbl8, tbl8_group_start);
> > > > +		tbl8_free_v1604(lpm, tbl8_group_start);
> > > >  	} else if (tbl8_recycle_index > -1) {
> > > >  		/* Update tbl24 entry. */
> > > >  		struct rte_lpm_tbl_entry new_tbl24_entry = { @@ -1834,7
> > > +1914,7 @@
> > > > delete_depth_big_v1604(struct rte_lpm *lpm, uint32_t ip_masked,
> > > >  		__atomic_store(&lpm->tbl24[tbl24_index],
> > > &new_tbl24_entry,
> > > >  				__ATOMIC_RELAXED);
> > > >  		__atomic_thread_fence(__ATOMIC_RELEASE);
> > > > -		tbl8_free_v1604(lpm->tbl8, tbl8_group_start);
> > > > +		tbl8_free_v1604(lpm, tbl8_group_start);
> > > >  	}
> > > >  #undef group_idx
> > > >  	return 0;
> > > > diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h
> > > > index 906ec4483..49c12a68d 100644
> > > > --- a/lib/librte_lpm/rte_lpm.h
> > > > +++ b/lib/librte_lpm/rte_lpm.h
> > > > @@ -1,5 +1,6 @@
> > > >  /* SPDX-License-Identifier: BSD-3-Clause
> > > >   * Copyright(c) 2010-2014 Intel Corporation
> > > > + * Copyright(c) 2019 Arm Limited
> > > >   */
> > > >
> > > >  #ifndef _RTE_LPM_H_
> > > > @@ -21,6 +22,7 @@
> > > >  #include <rte_common.h>
> > > >  #include <rte_vect.h>
> > > >  #include <rte_compat.h>
> > > > +#include <rte_rcu_qsbr.h>
> > > >
> > > >  #ifdef __cplusplus
> > > >  extern "C" {
> > > > @@ -186,6 +188,7 @@ struct rte_lpm {
> > > >  			__rte_cache_aligned; /**< LPM tbl24 table. */
> > > >  	struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */
> > > >  	struct rte_lpm_rule *rules_tbl; /**< LPM rules. */
> > > > +	struct rte_rcu_qsbr_dq *dq;	/**< RCU QSBR defer queue.*/
> > > >  };
> > > >
> > > >  /**
> > > > @@ -248,6 +251,24 @@ rte_lpm_free_v20(struct rte_lpm_v20 *lpm);
> > > void
> > > > rte_lpm_free_v1604(struct rte_lpm *lpm);
> > > >
> > > > +/**
> > > > + * Associate RCU QSBR variable with an LPM object.
> > > > + *
> > > > + * @param lpm
> > > > + *   the lpm object to add RCU QSBR
> > > > + * @param v
> > > > + *   RCU QSBR variable
> > > > + * @return
> > > > + *   On success - 0
> > > > + *   On error - 1 with error code set in rte_errno.
> > > > + *   Possible rte_errno codes are:
> > > > + *   - EINVAL - invalid pointer
> > > > + *   - EEXIST - already added QSBR
> > > > + *   - ENOMEM - memory allocation failure
> > > > + */
> > > > +__rte_experimental
> > > > +int rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr
> > > > +*v);
> > > > +
> > > >  /**
> > > >   * Add a rule to the LPM table.
> > > >   *
> > > > diff --git a/lib/librte_lpm/rte_lpm_version.map
> > > > b/lib/librte_lpm/rte_lpm_version.map
> > > > index 90beac853..b353aabd2 100644
> > > > --- a/lib/librte_lpm/rte_lpm_version.map
> > > > +++ b/lib/librte_lpm/rte_lpm_version.map
> > > > @@ -44,3 +44,9 @@ DPDK_17.05 {
> > > >  	rte_lpm6_lookup_bulk_func;
> > > >
> > > >  } DPDK_16.04;
> > > > +
> > > > +EXPERIMENTAL {
> > > > +	global:
> > > > +
> > > > +	rte_lpm_rcu_qsbr_add;
> > > > +};
> > > > --
> > > > 2.17.1

Patch
diff mbox series

diff --git a/lib/librte_lpm/Makefile b/lib/librte_lpm/Makefile
index a7946a1c5..ca9e16312 100644
--- a/lib/librte_lpm/Makefile
+++ b/lib/librte_lpm/Makefile
@@ -6,9 +6,10 @@  include $(RTE_SDK)/mk/rte.vars.mk
 # library name
 LIB = librte_lpm.a
 
+CFLAGS += -DALLOW_EXPERIMENTAL_API
 CFLAGS += -O3
 CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
-LDLIBS += -lrte_eal -lrte_hash
+LDLIBS += -lrte_eal -lrte_hash -lrte_rcu
 
 EXPORT_MAP := rte_lpm_version.map
 
diff --git a/lib/librte_lpm/meson.build b/lib/librte_lpm/meson.build
index a5176d8ae..19a35107f 100644
--- a/lib/librte_lpm/meson.build
+++ b/lib/librte_lpm/meson.build
@@ -2,9 +2,11 @@ 
 # Copyright(c) 2017 Intel Corporation
 
 version = 2
+allow_experimental_apis = true
 sources = files('rte_lpm.c', 'rte_lpm6.c')
 headers = files('rte_lpm.h', 'rte_lpm6.h')
 # since header files have different names, we can install all vector headers
 # without worrying about which architecture we actually need
 headers += files('rte_lpm_altivec.h', 'rte_lpm_neon.h', 'rte_lpm_sse.h')
 deps += ['hash']
+deps += ['rcu']
diff --git a/lib/librte_lpm/rte_lpm.c b/lib/librte_lpm/rte_lpm.c
index 3a929a1b1..ca58d4b35 100644
--- a/lib/librte_lpm/rte_lpm.c
+++ b/lib/librte_lpm/rte_lpm.c
@@ -1,5 +1,6 @@ 
 /* SPDX-License-Identifier: BSD-3-Clause
  * Copyright(c) 2010-2014 Intel Corporation
+ * Copyright(c) 2019 Arm Limited
  */
 
 #include <string.h>
@@ -381,6 +382,8 @@  rte_lpm_free_v1604(struct rte_lpm *lpm)
 
 	rte_mcfg_tailq_write_unlock();
 
+	if (lpm->dq)
+		rte_rcu_qsbr_dq_delete(lpm->dq);
 	rte_free(lpm->tbl8);
 	rte_free(lpm->rules_tbl);
 	rte_free(lpm);
@@ -390,6 +393,59 @@  BIND_DEFAULT_SYMBOL(rte_lpm_free, _v1604, 16.04);
 MAP_STATIC_SYMBOL(void rte_lpm_free(struct rte_lpm *lpm),
 		rte_lpm_free_v1604);
 
+struct __rte_lpm_rcu_dq_entry {
+	uint32_t tbl8_group_index;
+	uint32_t pad;
+};
+
+static void
+__lpm_rcu_qsbr_free_resource(void *p, void *data)
+{
+	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
+	struct __rte_lpm_rcu_dq_entry *e =
+			(struct __rte_lpm_rcu_dq_entry *)data;
+	struct rte_lpm_tbl_entry *tbl8 = (struct rte_lpm_tbl_entry *)p;
+
+	/* Set tbl8 group invalid */
+	__atomic_store(&tbl8[e->tbl8_group_index], &zero_tbl8_entry,
+		__ATOMIC_RELAXED);
+}
+
+/* Associate QSBR variable with an LPM object.
+ */
+int
+rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr *v)
+{
+	char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
+	struct rte_rcu_qsbr_dq_parameters params;
+
+	if ((lpm == NULL) || (v == NULL)) {
+		rte_errno = EINVAL;
+		return 1;
+	}
+
+	if (lpm->dq) {
+		rte_errno = EEXIST;
+		return 1;
+	}
+
+	/* Init QSBR defer queue. */
+	snprintf(rcu_dq_name, sizeof(rcu_dq_name), "LPM_RCU_%s", lpm->name);
+	params.name = rcu_dq_name;
+	params.size = lpm->number_tbl8s;
+	params.esize = sizeof(struct __rte_lpm_rcu_dq_entry);
+	params.f = __lpm_rcu_qsbr_free_resource;
+	params.p = lpm->tbl8;
+	params.v = v;
+	lpm->dq = rte_rcu_qsbr_dq_create(&params);
+	if (lpm->dq == NULL) {
+		RTE_LOG(ERR, LPM, "LPM QS defer queue creation failed\n");
+		return 1;
+	}
+
+	return 0;
+}
+
 /*
  * Adds a rule to the rule table.
  *
@@ -679,14 +735,15 @@  tbl8_alloc_v20(struct rte_lpm_tbl_entry_v20 *tbl8)
 }
 
 static int32_t
-tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
+__tbl8_alloc_v1604(struct rte_lpm *lpm)
 {
 	uint32_t group_idx; /* tbl8 group index. */
 	struct rte_lpm_tbl_entry *tbl8_entry;
 
 	/* Scan through tbl8 to find a free (i.e. INVALID) tbl8 group. */
-	for (group_idx = 0; group_idx < number_tbl8s; group_idx++) {
-		tbl8_entry = &tbl8[group_idx * RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
+	for (group_idx = 0; group_idx < lpm->number_tbl8s; group_idx++) {
+		tbl8_entry = &lpm->tbl8[group_idx *
+					RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
 		/* If a free tbl8 group is found clean it and set as VALID. */
 		if (!tbl8_entry->valid_group) {
 			struct rte_lpm_tbl_entry new_tbl8_entry = {
@@ -712,6 +769,21 @@  tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
 	return -ENOSPC;
 }
 
+static int32_t
+tbl8_alloc_v1604(struct rte_lpm *lpm)
+{
+	int32_t group_idx; /* tbl8 group index. */
+
+	group_idx = __tbl8_alloc_v1604(lpm);
+	if ((group_idx < 0) && (lpm->dq != NULL)) {
+		/* If there are no tbl8 groups try to reclaim some. */
+		if (rte_rcu_qsbr_dq_reclaim(lpm->dq) == 0)
+			group_idx = __tbl8_alloc_v1604(lpm);
+	}
+
+	return group_idx;
+}
+
 static void
 tbl8_free_v20(struct rte_lpm_tbl_entry_v20 *tbl8, uint32_t tbl8_group_start)
 {
@@ -728,13 +800,21 @@  tbl8_free_v20(struct rte_lpm_tbl_entry_v20 *tbl8, uint32_t tbl8_group_start)
 }
 
 static void
-tbl8_free_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t tbl8_group_start)
+tbl8_free_v1604(struct rte_lpm *lpm, uint32_t tbl8_group_start)
 {
-	/* Set tbl8 group invalid*/
 	struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
+	struct __rte_lpm_rcu_dq_entry e;
 
-	__atomic_store(&tbl8[tbl8_group_start], &zero_tbl8_entry,
-			__ATOMIC_RELAXED);
+	if (lpm->dq != NULL) {
+		e.tbl8_group_index = tbl8_group_start;
+		e.pad = 0;
+		/* Push into QSBR defer queue. */
+		rte_rcu_qsbr_dq_enqueue(lpm->dq, (void *)&e);
+	} else {
+		/* Set tbl8 group invalid*/
+		__atomic_store(&lpm->tbl8[tbl8_group_start], &zero_tbl8_entry,
+				__ATOMIC_RELAXED);
+	}
 }
 
 static __rte_noinline int32_t
@@ -1037,7 +1117,7 @@  add_depth_big_v1604(struct rte_lpm *lpm, uint32_t ip_masked, uint8_t depth,
 
 	if (!lpm->tbl24[tbl24_index].valid) {
 		/* Search for a free tbl8 group. */
-		tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, lpm->number_tbl8s);
+		tbl8_group_index = tbl8_alloc_v1604(lpm);
 
 		/* Check tbl8 allocation was successful. */
 		if (tbl8_group_index < 0) {
@@ -1083,7 +1163,7 @@  add_depth_big_v1604(struct rte_lpm *lpm, uint32_t ip_masked, uint8_t depth,
 	} /* If valid entry but not extended calculate the index into Table8. */
 	else if (lpm->tbl24[tbl24_index].valid_group == 0) {
 		/* Search for free tbl8 group. */
-		tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, lpm->number_tbl8s);
+		tbl8_group_index = tbl8_alloc_v1604(lpm);
 
 		if (tbl8_group_index < 0) {
 			return tbl8_group_index;
@@ -1818,7 +1898,7 @@  delete_depth_big_v1604(struct rte_lpm *lpm, uint32_t ip_masked,
 		 */
 		lpm->tbl24[tbl24_index].valid = 0;
 		__atomic_thread_fence(__ATOMIC_RELEASE);
-		tbl8_free_v1604(lpm->tbl8, tbl8_group_start);
+		tbl8_free_v1604(lpm, tbl8_group_start);
 	} else if (tbl8_recycle_index > -1) {
 		/* Update tbl24 entry. */
 		struct rte_lpm_tbl_entry new_tbl24_entry = {
@@ -1834,7 +1914,7 @@  delete_depth_big_v1604(struct rte_lpm *lpm, uint32_t ip_masked,
 		__atomic_store(&lpm->tbl24[tbl24_index], &new_tbl24_entry,
 				__ATOMIC_RELAXED);
 		__atomic_thread_fence(__ATOMIC_RELEASE);
-		tbl8_free_v1604(lpm->tbl8, tbl8_group_start);
+		tbl8_free_v1604(lpm, tbl8_group_start);
 	}
 #undef group_idx
 	return 0;
diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h
index 906ec4483..49c12a68d 100644
--- a/lib/librte_lpm/rte_lpm.h
+++ b/lib/librte_lpm/rte_lpm.h
@@ -1,5 +1,6 @@ 
 /* SPDX-License-Identifier: BSD-3-Clause
  * Copyright(c) 2010-2014 Intel Corporation
+ * Copyright(c) 2019 Arm Limited
  */
 
 #ifndef _RTE_LPM_H_
@@ -21,6 +22,7 @@ 
 #include <rte_common.h>
 #include <rte_vect.h>
 #include <rte_compat.h>
+#include <rte_rcu_qsbr.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -186,6 +188,7 @@  struct rte_lpm {
 			__rte_cache_aligned; /**< LPM tbl24 table. */
 	struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */
 	struct rte_lpm_rule *rules_tbl; /**< LPM rules. */
+	struct rte_rcu_qsbr_dq *dq;	/**< RCU QSBR defer queue.*/
 };
 
 /**
@@ -248,6 +251,24 @@  rte_lpm_free_v20(struct rte_lpm_v20 *lpm);
 void
 rte_lpm_free_v1604(struct rte_lpm *lpm);
 
+/**
+ * Associate RCU QSBR variable with an LPM object.
+ *
+ * @param lpm
+ *   the lpm object to add RCU QSBR
+ * @param v
+ *   RCU QSBR variable
+ * @return
+ *   On success - 0
+ *   On error - 1 with error code set in rte_errno.
+ *   Possible rte_errno codes are:
+ *   - EINVAL - invalid pointer
+ *   - EEXIST - already added QSBR
+ *   - ENOMEM - memory allocation failure
+ */
+__rte_experimental
+int rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr *v);
+
 /**
  * Add a rule to the LPM table.
  *
diff --git a/lib/librte_lpm/rte_lpm_version.map b/lib/librte_lpm/rte_lpm_version.map
index 90beac853..b353aabd2 100644
--- a/lib/librte_lpm/rte_lpm_version.map
+++ b/lib/librte_lpm/rte_lpm_version.map
@@ -44,3 +44,9 @@  DPDK_17.05 {
 	rte_lpm6_lookup_bulk_func;
 
 } DPDK_16.04;
+
+EXPERIMENTAL {
+	global:
+
+	rte_lpm_rcu_qsbr_add;
+};