[v2,1/2] fib: implement RCU rule reclamation

Message ID 20241008175524.450829-1-vladimir.medvedkin@intel.com (mailing list archive)
State Superseded, archived
Delegated to: David Marchand
Headers
Series [v2,1/2] fib: implement RCU rule reclamation |

Checks

Context Check Description
ci/checkpatch warning coding style issues

Commit Message

Medvedkin, Vladimir Oct. 8, 2024, 5:55 p.m. UTC
Currently, for DIR24-8 algorithm, the tbl8 group is freed even though the
readers might be using the tbl8 group entries. The freed tbl8 group can
be reallocated quickly. As a result, lookup may be performed incorrectly.

To address that, RCU QSBR is integrated for safe tbl8 group reclamation.

Signed-off-by: Vladimir Medvedkin <vladimir.medvedkin@intel.com>
---
 lib/fib/dir24_8.c   | 104 +++++++++++++++++++++++++++++++++++++++-----
 lib/fib/dir24_8.h   |   9 ++++
 lib/fib/meson.build |   5 ++-
 lib/fib/rte_fib.c   |  11 +++++
 lib/fib/rte_fib.h   |  50 ++++++++++++++++++++-
 lib/fib/version.map |   7 +++
 6 files changed, 173 insertions(+), 13 deletions(-)
  

Comments

Stephen Hemminger Oct. 8, 2024, 6:18 p.m. UTC | #1
On Tue,  8 Oct 2024 17:55:23 +0000
Vladimir Medvedkin <vladimir.medvedkin@intel.com> wrote:

> @@ -569,7 +600,60 @@ dir24_8_free(void *p)
>  {
>  	struct dir24_8_tbl *dp = (struct dir24_8_tbl *)p;
>  
> +	if (dp->dq != NULL)
> +		rte_rcu_qsbr_dq_delete(dp->dq);
> +

Side note:
rte_rcu_qsbr_dq_delete should be changed to accept NULL as nop.
Like all the other free routines
  
Stephen Hemminger Oct. 8, 2024, 6:28 p.m. UTC | #2
On Tue,  8 Oct 2024 17:55:23 +0000
Vladimir Medvedkin <vladimir.medvedkin@intel.com> wrote:
> +	if ((tbl8_idx == -ENOSPC) && dp->dq != NULL) {

Better to either drop the parenthesis here, or put it on both conditions.

> +		/* If there are no tbl8 groups try to reclaim one. */
> +		if (rte_rcu_qsbr_dq_reclaim(dp->dq, 1,
> +				NULL, NULL, NULL) == 0)
> +			tbl8_idx = tbl8_get_idx(dp);
> +	}

Could add unlikely() to this expression.

	/* If there are no tbl8 groups try to reclaim one. */
	if (unlikely(tbl8_idx == -ENOSPC && dp->dq &&
		     !rte_rcu_qsbr_dq_reclaim(dp->dq, 1, NULL, NULL, NULL)))
	    tbl8_idx = tbl8_get_idx(dp);


> +static void
> +__rcu_qsbr_free_resource(void *p, void *data, unsigned int n)
> +{
> +	struct dir24_8_tbl *dp = p;
> +	uint64_t tbl8_idx = *(uint64_t *)data;
> +	RTE_SET_USED(n);
> +
> +	tbl8_cleanup_and_free(dp, tbl8_idx);
> +}

My preference (not a requirement) is to use __rte_unused attribute
instead of RTE_SET_USED

> +	if (dp->v == NULL)
> +		tbl8_cleanup_and_free(dp, tbl8_idx);
> +	else if (dp->rcu_mode == RTE_FIB_QSBR_MODE_SYNC) {
> +		rte_rcu_qsbr_synchronize(dp->v,
> +			RTE_QSBR_THRID_INVALID);
> +		tbl8_cleanup_and_free(dp, tbl8_idx);
> +	} else { /* RTE_FIB_QSBR_MODE_DQ */
> +		if (rte_rcu_qsbr_dq_enqueue(dp->dq,
> +				(void *)&tbl8_idx))

Minor nit: cast to void * is not necessary in C (only in C++).
And can fit on one line; max line length now for DPDK is 100 characters.

Overall, looks good.

Acked-by: Stephen Hemminger <stephen@networkplumber.org>
  
Doug Foster Oct. 9, 2024, 7:12 p.m. UTC | #3
The check for NULL is not necessary before calling rte_rcu_qsbr_dq_delete. Similar to other free routines, an error will not occur when the dq pointer is NULL.
However, it will give a debug log statement to indicate an invalid parameter and return 0 to indicate success.

-----Original Message-----
From: Stephen Hemminger <stephen@networkplumber.org>
Sent: Tuesday, October 8, 2024 1:18 PM
To: Vladimir Medvedkin <vladimir.medvedkin@intel.com>
Cc: dev@dpdk.org; rjarry@redhat.com; Ruifeng Wang <Ruifeng.Wang@arm.com>; Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>; david.marchand@redhat.com
Subject: Re: [PATCH v2 1/2] fib: implement RCU rule reclamation

On Tue,  8 Oct 2024 17:55:23 +0000
Vladimir Medvedkin <vladimir.medvedkin@intel.com> wrote:

> @@ -569,7 +600,60 @@ dir24_8_free(void *p)  {
>       struct dir24_8_tbl *dp = (struct dir24_8_tbl *)p;
>
> +     if (dp->dq != NULL)
> +             rte_rcu_qsbr_dq_delete(dp->dq);
> +

Side note:
rte_rcu_qsbr_dq_delete should be changed to accept NULL as nop.
Like all the other free routines
IMPORTANT NOTICE: The contents of this email and any attachments are confidential and may also be privileged. If you are not the intended recipient, please notify the sender immediately and do not disclose the contents to any other person, use it for any purpose, or store or copy the information in any medium. Thank you.
  
Medvedkin, Vladimir Oct. 10, 2024, 11:21 a.m. UTC | #4
Hi Stephen,

Thanks for the review, I'll address your comments in v3


On 08/10/2024 19:28, Stephen Hemminger wrote:
> On Tue,  8 Oct 2024 17:55:23 +0000
> Vladimir Medvedkin <vladimir.medvedkin@intel.com> wrote:
>> +	if ((tbl8_idx == -ENOSPC) && dp->dq != NULL) {
> Better to either drop the parenthesis here, or put it on both conditions.
>
>> +		/* If there are no tbl8 groups try to reclaim one. */
>> +		if (rte_rcu_qsbr_dq_reclaim(dp->dq, 1,
>> +				NULL, NULL, NULL) == 0)
>> +			tbl8_idx = tbl8_get_idx(dp);
>> +	}
> Could add unlikely() to this expression.
>
> 	/* If there are no tbl8 groups try to reclaim one. */
> 	if (unlikely(tbl8_idx == -ENOSPC && dp->dq &&
> 		     !rte_rcu_qsbr_dq_reclaim(dp->dq, 1, NULL, NULL, NULL)))
> 	    tbl8_idx = tbl8_get_idx(dp);
>
>
>> +static void
>> +__rcu_qsbr_free_resource(void *p, void *data, unsigned int n)
>> +{
>> +	struct dir24_8_tbl *dp = p;
>> +	uint64_t tbl8_idx = *(uint64_t *)data;
>> +	RTE_SET_USED(n);
>> +
>> +	tbl8_cleanup_and_free(dp, tbl8_idx);
>> +}
> My preference (not a requirement) is to use __rte_unused attribute
> instead of RTE_SET_USED
>
>> +	if (dp->v == NULL)
>> +		tbl8_cleanup_and_free(dp, tbl8_idx);
>> +	else if (dp->rcu_mode == RTE_FIB_QSBR_MODE_SYNC) {
>> +		rte_rcu_qsbr_synchronize(dp->v,
>> +			RTE_QSBR_THRID_INVALID);
>> +		tbl8_cleanup_and_free(dp, tbl8_idx);
>> +	} else { /* RTE_FIB_QSBR_MODE_DQ */
>> +		if (rte_rcu_qsbr_dq_enqueue(dp->dq,
>> +				(void *)&tbl8_idx))
> Minor nit: cast to void * is not necessary in C (only in C++).
> And can fit on one line; max line length now for DPDK is 100 characters.
>
> Overall, looks good.
>
> Acked-by: Stephen Hemminger <stephen@networkplumber.org>
  

Patch

diff --git a/lib/fib/dir24_8.c b/lib/fib/dir24_8.c
index c739e92304..32a71c157d 100644
--- a/lib/fib/dir24_8.c
+++ b/lib/fib/dir24_8.c
@@ -14,6 +14,7 @@ 
 #include <rte_rib.h>
 #include <rte_fib.h>
 #include "dir24_8.h"
+#include "fib_log.h"
 
 #ifdef CC_DIR24_8_AVX512_SUPPORT
 
@@ -176,6 +177,13 @@  tbl8_alloc(struct dir24_8_tbl *dp, uint64_t nh)
 	uint8_t	*tbl8_ptr;
 
 	tbl8_idx = tbl8_get_idx(dp);
+	if ((tbl8_idx == -ENOSPC) && dp->dq != NULL) {
+		/* If there are no tbl8 groups try to reclaim one. */
+		if (rte_rcu_qsbr_dq_reclaim(dp->dq, 1,
+				NULL, NULL, NULL) == 0)
+			tbl8_idx = tbl8_get_idx(dp);
+	}
+
 	if (tbl8_idx < 0)
 		return tbl8_idx;
 	tbl8_ptr = (uint8_t *)dp->tbl8 +
@@ -189,6 +197,27 @@  tbl8_alloc(struct dir24_8_tbl *dp, uint64_t nh)
 	return tbl8_idx;
 }
 
+static void
+tbl8_cleanup_and_free(struct dir24_8_tbl *dp, uint64_t tbl8_idx)
+{
+	uint8_t *ptr = (uint8_t *)dp->tbl8 +
+		(tbl8_idx * DIR24_8_TBL8_GRP_NUM_ENT << dp->nh_sz);
+
+	memset(ptr, 0, DIR24_8_TBL8_GRP_NUM_ENT << dp->nh_sz);
+	tbl8_free_idx(dp, tbl8_idx);
+	dp->cur_tbl8s--;
+}
+
+static void
+__rcu_qsbr_free_resource(void *p, void *data, unsigned int n)
+{
+	struct dir24_8_tbl *dp = p;
+	uint64_t tbl8_idx = *(uint64_t *)data;
+	RTE_SET_USED(n);
+
+	tbl8_cleanup_and_free(dp, tbl8_idx);
+}
+
 static void
 tbl8_recycle(struct dir24_8_tbl *dp, uint32_t ip, uint64_t tbl8_idx)
 {
@@ -210,8 +239,6 @@  tbl8_recycle(struct dir24_8_tbl *dp, uint32_t ip, uint64_t tbl8_idx)
 		}
 		((uint8_t *)dp->tbl24)[ip >> 8] =
 			nh & ~DIR24_8_EXT_ENT;
-		for (i = 0; i < DIR24_8_TBL8_GRP_NUM_ENT; i++)
-			ptr8[i] = 0;
 		break;
 	case RTE_FIB_DIR24_8_2B:
 		ptr16 = &((uint16_t *)dp->tbl8)[tbl8_idx *
@@ -223,8 +250,6 @@  tbl8_recycle(struct dir24_8_tbl *dp, uint32_t ip, uint64_t tbl8_idx)
 		}
 		((uint16_t *)dp->tbl24)[ip >> 8] =
 			nh & ~DIR24_8_EXT_ENT;
-		for (i = 0; i < DIR24_8_TBL8_GRP_NUM_ENT; i++)
-			ptr16[i] = 0;
 		break;
 	case RTE_FIB_DIR24_8_4B:
 		ptr32 = &((uint32_t *)dp->tbl8)[tbl8_idx *
@@ -236,8 +261,6 @@  tbl8_recycle(struct dir24_8_tbl *dp, uint32_t ip, uint64_t tbl8_idx)
 		}
 		((uint32_t *)dp->tbl24)[ip >> 8] =
 			nh & ~DIR24_8_EXT_ENT;
-		for (i = 0; i < DIR24_8_TBL8_GRP_NUM_ENT; i++)
-			ptr32[i] = 0;
 		break;
 	case RTE_FIB_DIR24_8_8B:
 		ptr64 = &((uint64_t *)dp->tbl8)[tbl8_idx *
@@ -249,12 +272,20 @@  tbl8_recycle(struct dir24_8_tbl *dp, uint32_t ip, uint64_t tbl8_idx)
 		}
 		((uint64_t *)dp->tbl24)[ip >> 8] =
 			nh & ~DIR24_8_EXT_ENT;
-		for (i = 0; i < DIR24_8_TBL8_GRP_NUM_ENT; i++)
-			ptr64[i] = 0;
 		break;
 	}
-	tbl8_free_idx(dp, tbl8_idx);
-	dp->cur_tbl8s--;
+
+	if (dp->v == NULL)
+		tbl8_cleanup_and_free(dp, tbl8_idx);
+	else if (dp->rcu_mode == RTE_FIB_QSBR_MODE_SYNC) {
+		rte_rcu_qsbr_synchronize(dp->v,
+			RTE_QSBR_THRID_INVALID);
+		tbl8_cleanup_and_free(dp, tbl8_idx);
+	} else { /* RTE_FIB_QSBR_MODE_DQ */
+		if (rte_rcu_qsbr_dq_enqueue(dp->dq,
+				(void *)&tbl8_idx))
+			FIB_LOG(ERR, "Failed to push QSBR FIFO");
+	}
 }
 
 static int
@@ -569,7 +600,60 @@  dir24_8_free(void *p)
 {
 	struct dir24_8_tbl *dp = (struct dir24_8_tbl *)p;
 
+	if (dp->dq != NULL)
+		rte_rcu_qsbr_dq_delete(dp->dq);
+
 	rte_free(dp->tbl8_idxes);
 	rte_free(dp->tbl8);
 	rte_free(dp);
 }
+
+int
+dir24_8_rcu_qsbr_add(struct dir24_8_tbl *dp, struct rte_fib_rcu_config *cfg,
+	const char *name)
+{
+	struct rte_rcu_qsbr_dq_parameters params = {0};
+	char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
+
+	if (dp == NULL || cfg == NULL) {
+		rte_errno = EINVAL;
+		return 1;
+	}
+
+	if (dp->v != NULL) {
+		rte_errno = EEXIST;
+		return 1;
+	}
+
+	if (cfg->mode == RTE_FIB_QSBR_MODE_SYNC) {
+		/* No other things to do. */
+	} else if (cfg->mode == RTE_FIB_QSBR_MODE_DQ) {
+		/* Init QSBR defer queue. */
+		snprintf(rcu_dq_name, sizeof(rcu_dq_name),
+				"FIB_RCU_%s", name);
+		params.name = rcu_dq_name;
+		params.size = cfg->dq_size;
+		if (params.size == 0)
+			params.size = RTE_FIB_RCU_DQ_RECLAIM_SZ;
+		params.trigger_reclaim_limit = cfg->reclaim_thd;
+		params.max_reclaim_size = cfg->reclaim_max;
+		if (params.max_reclaim_size == 0)
+			params.max_reclaim_size = RTE_FIB_RCU_DQ_RECLAIM_MAX;
+		params.esize = sizeof(uint64_t);
+		params.free_fn = __rcu_qsbr_free_resource;
+		params.p = dp;
+		params.v = cfg->v;
+		dp->dq = rte_rcu_qsbr_dq_create(&params);
+		if (dp->dq == NULL) {
+			FIB_LOG(ERR, "LPM defer queue creation failed");
+			return 1;
+		}
+	} else {
+		rte_errno = EINVAL;
+		return 1;
+	}
+	dp->rcu_mode = cfg->mode;
+	dp->v = cfg->v;
+
+	return 0;
+}
diff --git a/lib/fib/dir24_8.h b/lib/fib/dir24_8.h
index 7125049f15..08fd818ce4 100644
--- a/lib/fib/dir24_8.h
+++ b/lib/fib/dir24_8.h
@@ -10,6 +10,7 @@ 
 
 #include <rte_prefetch.h>
 #include <rte_branch_prediction.h>
+#include <rte_rcu_qsbr.h>
 
 /**
  * @file
@@ -30,6 +31,10 @@  struct dir24_8_tbl {
 	uint32_t	rsvd_tbl8s;	/**< Number of reserved tbl8s */
 	uint32_t	cur_tbl8s;	/**< Current number of tbl8s */
 	enum rte_fib_dir24_8_nh_sz	nh_sz;	/**< Size of nexthop entry */
+	/* RCU config. */
+	enum rte_fib_qsbr_mode rcu_mode;/* Blocking, defer queue. */
+	struct rte_rcu_qsbr *v;		/* RCU QSBR variable. */
+	struct rte_rcu_qsbr_dq *dq;	/* RCU QSBR defer queue. */
 	uint64_t	def_nh;		/**< Default next hop */
 	uint64_t	*tbl8;		/**< tbl8 table. */
 	uint64_t	*tbl8_idxes;	/**< bitmap containing free tbl8 idxes*/
@@ -250,4 +255,8 @@  int
 dir24_8_modify(struct rte_fib *fib, uint32_t ip, uint8_t depth,
 	uint64_t next_hop, int op);
 
+int
+dir24_8_rcu_qsbr_add(struct dir24_8_tbl *dp, struct rte_fib_rcu_config *cfg,
+	const char *name);
+
 #endif /* _DIR24_8_H_ */
diff --git a/lib/fib/meson.build b/lib/fib/meson.build
index 6795f41a0a..9b7477c756 100644
--- a/lib/fib/meson.build
+++ b/lib/fib/meson.build
@@ -11,6 +11,7 @@  endif
 sources = files('rte_fib.c', 'rte_fib6.c', 'dir24_8.c', 'trie.c')
 headers = files('rte_fib.h', 'rte_fib6.h')
 deps += ['rib']
+deps += ['rcu']
 
 # compile AVX512 version if:
 # we are building 64-bit binary AND binutils can generate proper code
@@ -45,7 +46,7 @@  if dpdk_conf.has('RTE_ARCH_X86_64') and binutils_ok
     elif cc.has_multi_arguments('-mavx512f', '-mavx512dq')
         dir24_8_avx512_tmp = static_library('dir24_8_avx512_tmp',
                 'dir24_8_avx512.c',
-                dependencies: static_rte_eal,
+                dependencies: [static_rte_eal, static_rte_rcu],
                 c_args: cflags + ['-mavx512f', '-mavx512dq'])
         objs += dir24_8_avx512_tmp.extract_objects('dir24_8_avx512.c')
         cflags += ['-DCC_DIR24_8_AVX512_SUPPORT']
@@ -54,7 +55,7 @@  if dpdk_conf.has('RTE_ARCH_X86_64') and binutils_ok
         if cc.has_argument('-mavx512bw')
             trie_avx512_tmp = static_library('trie_avx512_tmp',
                 'trie_avx512.c',
-                dependencies: static_rte_eal,
+                dependencies: [static_rte_eal, static_rte_rcu],
                 c_args: cflags + ['-mavx512f', \
                     '-mavx512dq', '-mavx512bw'])
             objs += trie_avx512_tmp.extract_objects('trie_avx512.c')
diff --git a/lib/fib/rte_fib.c b/lib/fib/rte_fib.c
index 4f9fba5a4f..730f50c1ba 100644
--- a/lib/fib/rte_fib.c
+++ b/lib/fib/rte_fib.c
@@ -338,3 +338,14 @@  rte_fib_select_lookup(struct rte_fib *fib,
 		return -EINVAL;
 	}
 }
+
+int
+rte_fib_rcu_qsbr_add(struct rte_fib *fib, struct rte_fib_rcu_config *cfg)
+{
+	switch (fib->type) {
+	case RTE_FIB_DIR24_8:
+		return dir24_8_rcu_qsbr_add(fib->dp, cfg, fib->name);
+	default:
+		return -ENOTSUP;
+	}
+}
diff --git a/lib/fib/rte_fib.h b/lib/fib/rte_fib.h
index d7a5aafe53..346eb7f149 100644
--- a/lib/fib/rte_fib.h
+++ b/lib/fib/rte_fib.h
@@ -16,7 +16,7 @@ 
  */
 
 #include <stdint.h>
-
+#include <rte_rcu_qsbr.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -28,6 +28,19 @@  struct rte_rib;
 /** Maximum depth value possible for IPv4 FIB. */
 #define RTE_FIB_MAXDEPTH	32
 
+/** @internal Default RCU defer queue entries to reclaim in one go. */
+#define RTE_FIB_RCU_DQ_RECLAIM_MAX	16
+/** @internal Default RCU defer queue size. */
+#define RTE_FIB_RCU_DQ_RECLAIM_SZ	128
+
+/** RCU reclamation modes */
+enum rte_fib_qsbr_mode {
+	/** Create defer queue for reclaim. */
+	RTE_FIB_QSBR_MODE_DQ = 0,
+	/** Use blocking mode reclaim. No defer queue created. */
+	RTE_FIB_QSBR_MODE_SYNC
+};
+
 /** Type of FIB struct */
 enum rte_fib_type {
 	RTE_FIB_DUMMY,		/**< RIB tree based FIB */
@@ -89,6 +102,22 @@  struct rte_fib_conf {
 	};
 };
 
+/** FIB RCU QSBR configuration structure. */
+struct rte_fib_rcu_config {
+	struct rte_rcu_qsbr *v;	/* RCU QSBR variable. */
+	/* Mode of RCU QSBR. RTE_FIB_QSBR_MODE_xxx
+	 * '0' for default: create defer queue for reclaim.
+	 */
+	enum rte_fib_qsbr_mode mode;
+	uint32_t dq_size;	/* RCU defer queue size.
+				 * default: RTE_FIB_RCU_DQ_RECLAIM_SZ.
+				 */
+	uint32_t reclaim_thd;	/* Threshold to trigger auto reclaim. */
+	uint32_t reclaim_max;	/* Max entries to reclaim in one go.
+				 * default: RTE_FIB_RCU_DQ_RECLAIM_MAX.
+				 */
+};
+
 /**
  * Create FIB
  *
@@ -219,6 +248,25 @@  rte_fib_get_rib(struct rte_fib *fib);
 int
 rte_fib_select_lookup(struct rte_fib *fib, enum rte_fib_lookup_type type);
 
+/**
+ * Associate RCU QSBR variable with a FIB object.
+ *
+ * @param fib
+ *   the fib object to add RCU QSBR
+ * @param cfg
+ *   RCU QSBR configuration
+ * @return
+ *   On success - 0
+ *   On error - 1 with error code set in rte_errno.
+ *   Possible rte_errno codes are:
+ *   - EINVAL - invalid pointer
+ *   - EEXIST - already added QSBR
+ *   - ENOMEM - memory allocation failure
+ *   - ENOTSUP - not supported by configured dataplane algorithm
+ */
+__rte_experimental
+int rte_fib_rcu_qsbr_add(struct rte_fib *fib, struct rte_fib_rcu_config *cfg);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/lib/fib/version.map b/lib/fib/version.map
index c6d2769611..df8f113df3 100644
--- a/lib/fib/version.map
+++ b/lib/fib/version.map
@@ -22,3 +22,10 @@  DPDK_25 {
 
 	local: *;
 };
+
+EXPERIMENTAL {
+	global:
+
+	# added in 24.11
+	rte_fib_rcu_qsbr_add;
+};