[v2,1/2] fib: implement RCU rule reclamation
Checks
Commit Message
Currently, for DIR24-8 algorithm, the tbl8 group is freed even though the
readers might be using the tbl8 group entries. The freed tbl8 group can
be reallocated quickly. As a result, lookup may be performed incorrectly.
To address that, RCU QSBR is integrated for safe tbl8 group reclamation.
Signed-off-by: Vladimir Medvedkin <vladimir.medvedkin@intel.com>
---
lib/fib/dir24_8.c | 104 +++++++++++++++++++++++++++++++++++++++-----
lib/fib/dir24_8.h | 9 ++++
lib/fib/meson.build | 5 ++-
lib/fib/rte_fib.c | 11 +++++
lib/fib/rte_fib.h | 50 ++++++++++++++++++++-
lib/fib/version.map | 7 +++
6 files changed, 173 insertions(+), 13 deletions(-)
Comments
On Tue, 8 Oct 2024 17:55:23 +0000
Vladimir Medvedkin <vladimir.medvedkin@intel.com> wrote:
> @@ -569,7 +600,60 @@ dir24_8_free(void *p)
> {
> struct dir24_8_tbl *dp = (struct dir24_8_tbl *)p;
>
> + if (dp->dq != NULL)
> + rte_rcu_qsbr_dq_delete(dp->dq);
> +
Side note:
rte_rcu_qsbr_dq_delete should be changed to accept NULL as nop.
Like all the other free routines
On Tue, 8 Oct 2024 17:55:23 +0000
Vladimir Medvedkin <vladimir.medvedkin@intel.com> wrote:
> + if ((tbl8_idx == -ENOSPC) && dp->dq != NULL) {
Better to either drop the parenthesis here, or put it on both conditions.
> + /* If there are no tbl8 groups try to reclaim one. */
> + if (rte_rcu_qsbr_dq_reclaim(dp->dq, 1,
> + NULL, NULL, NULL) == 0)
> + tbl8_idx = tbl8_get_idx(dp);
> + }
Could add unlikely() to this expression.
/* If there are no tbl8 groups try to reclaim one. */
if (unlikely(tbl8_idx == -ENOSPC && dp->dq &&
!rte_rcu_qsbr_dq_reclaim(dp->dq, 1, NULL, NULL, NULL)))
tbl8_idx = tbl8_get_idx(dp);
> +static void
> +__rcu_qsbr_free_resource(void *p, void *data, unsigned int n)
> +{
> + struct dir24_8_tbl *dp = p;
> + uint64_t tbl8_idx = *(uint64_t *)data;
> + RTE_SET_USED(n);
> +
> + tbl8_cleanup_and_free(dp, tbl8_idx);
> +}
My preference (not a requirement) is to use __rte_unused attribute
instead of RTE_SET_USED
> + if (dp->v == NULL)
> + tbl8_cleanup_and_free(dp, tbl8_idx);
> + else if (dp->rcu_mode == RTE_FIB_QSBR_MODE_SYNC) {
> + rte_rcu_qsbr_synchronize(dp->v,
> + RTE_QSBR_THRID_INVALID);
> + tbl8_cleanup_and_free(dp, tbl8_idx);
> + } else { /* RTE_FIB_QSBR_MODE_DQ */
> + if (rte_rcu_qsbr_dq_enqueue(dp->dq,
> + (void *)&tbl8_idx))
Minor nit: cast to void * is not necessary in C (only in C++).
And can fit on one line; max line length now for DPDK is 100 characters.
Overall, looks good.
Acked-by: Stephen Hemminger <stephen@networkplumber.org>
The check for NULL is not necessary before calling rte_rcu_qsbr_dq_delete. Similar to other free routines, an error will not occur when the dq pointer is NULL.
However, it will give a debug log statement to indicate an invalid parameter and return 0 to indicate success.
-----Original Message-----
From: Stephen Hemminger <stephen@networkplumber.org>
Sent: Tuesday, October 8, 2024 1:18 PM
To: Vladimir Medvedkin <vladimir.medvedkin@intel.com>
Cc: dev@dpdk.org; rjarry@redhat.com; Ruifeng Wang <Ruifeng.Wang@arm.com>; Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>; david.marchand@redhat.com
Subject: Re: [PATCH v2 1/2] fib: implement RCU rule reclamation
On Tue, 8 Oct 2024 17:55:23 +0000
Vladimir Medvedkin <vladimir.medvedkin@intel.com> wrote:
> @@ -569,7 +600,60 @@ dir24_8_free(void *p) {
> struct dir24_8_tbl *dp = (struct dir24_8_tbl *)p;
>
> + if (dp->dq != NULL)
> + rte_rcu_qsbr_dq_delete(dp->dq);
> +
Side note:
rte_rcu_qsbr_dq_delete should be changed to accept NULL as nop.
Like all the other free routines
IMPORTANT NOTICE: The contents of this email and any attachments are confidential and may also be privileged. If you are not the intended recipient, please notify the sender immediately and do not disclose the contents to any other person, use it for any purpose, or store or copy the information in any medium. Thank you.
Hi Stephen,
Thanks for the review, I'll address your comments in v3
On 08/10/2024 19:28, Stephen Hemminger wrote:
> On Tue, 8 Oct 2024 17:55:23 +0000
> Vladimir Medvedkin <vladimir.medvedkin@intel.com> wrote:
>> + if ((tbl8_idx == -ENOSPC) && dp->dq != NULL) {
> Better to either drop the parenthesis here, or put it on both conditions.
>
>> + /* If there are no tbl8 groups try to reclaim one. */
>> + if (rte_rcu_qsbr_dq_reclaim(dp->dq, 1,
>> + NULL, NULL, NULL) == 0)
>> + tbl8_idx = tbl8_get_idx(dp);
>> + }
> Could add unlikely() to this expression.
>
> /* If there are no tbl8 groups try to reclaim one. */
> if (unlikely(tbl8_idx == -ENOSPC && dp->dq &&
> !rte_rcu_qsbr_dq_reclaim(dp->dq, 1, NULL, NULL, NULL)))
> tbl8_idx = tbl8_get_idx(dp);
>
>
>> +static void
>> +__rcu_qsbr_free_resource(void *p, void *data, unsigned int n)
>> +{
>> + struct dir24_8_tbl *dp = p;
>> + uint64_t tbl8_idx = *(uint64_t *)data;
>> + RTE_SET_USED(n);
>> +
>> + tbl8_cleanup_and_free(dp, tbl8_idx);
>> +}
> My preference (not a requirement) is to use __rte_unused attribute
> instead of RTE_SET_USED
>
>> + if (dp->v == NULL)
>> + tbl8_cleanup_and_free(dp, tbl8_idx);
>> + else if (dp->rcu_mode == RTE_FIB_QSBR_MODE_SYNC) {
>> + rte_rcu_qsbr_synchronize(dp->v,
>> + RTE_QSBR_THRID_INVALID);
>> + tbl8_cleanup_and_free(dp, tbl8_idx);
>> + } else { /* RTE_FIB_QSBR_MODE_DQ */
>> + if (rte_rcu_qsbr_dq_enqueue(dp->dq,
>> + (void *)&tbl8_idx))
> Minor nit: cast to void * is not necessary in C (only in C++).
> And can fit on one line; max line length now for DPDK is 100 characters.
>
> Overall, looks good.
>
> Acked-by: Stephen Hemminger <stephen@networkplumber.org>
@@ -14,6 +14,7 @@
#include <rte_rib.h>
#include <rte_fib.h>
#include "dir24_8.h"
+#include "fib_log.h"
#ifdef CC_DIR24_8_AVX512_SUPPORT
@@ -176,6 +177,13 @@ tbl8_alloc(struct dir24_8_tbl *dp, uint64_t nh)
uint8_t *tbl8_ptr;
tbl8_idx = tbl8_get_idx(dp);
+ if ((tbl8_idx == -ENOSPC) && dp->dq != NULL) {
+ /* If there are no tbl8 groups try to reclaim one. */
+ if (rte_rcu_qsbr_dq_reclaim(dp->dq, 1,
+ NULL, NULL, NULL) == 0)
+ tbl8_idx = tbl8_get_idx(dp);
+ }
+
if (tbl8_idx < 0)
return tbl8_idx;
tbl8_ptr = (uint8_t *)dp->tbl8 +
@@ -189,6 +197,27 @@ tbl8_alloc(struct dir24_8_tbl *dp, uint64_t nh)
return tbl8_idx;
}
+static void
+tbl8_cleanup_and_free(struct dir24_8_tbl *dp, uint64_t tbl8_idx)
+{
+ uint8_t *ptr = (uint8_t *)dp->tbl8 +
+ (tbl8_idx * DIR24_8_TBL8_GRP_NUM_ENT << dp->nh_sz);
+
+ memset(ptr, 0, DIR24_8_TBL8_GRP_NUM_ENT << dp->nh_sz);
+ tbl8_free_idx(dp, tbl8_idx);
+ dp->cur_tbl8s--;
+}
+
+static void
+__rcu_qsbr_free_resource(void *p, void *data, unsigned int n)
+{
+ struct dir24_8_tbl *dp = p;
+ uint64_t tbl8_idx = *(uint64_t *)data;
+ RTE_SET_USED(n);
+
+ tbl8_cleanup_and_free(dp, tbl8_idx);
+}
+
static void
tbl8_recycle(struct dir24_8_tbl *dp, uint32_t ip, uint64_t tbl8_idx)
{
@@ -210,8 +239,6 @@ tbl8_recycle(struct dir24_8_tbl *dp, uint32_t ip, uint64_t tbl8_idx)
}
((uint8_t *)dp->tbl24)[ip >> 8] =
nh & ~DIR24_8_EXT_ENT;
- for (i = 0; i < DIR24_8_TBL8_GRP_NUM_ENT; i++)
- ptr8[i] = 0;
break;
case RTE_FIB_DIR24_8_2B:
ptr16 = &((uint16_t *)dp->tbl8)[tbl8_idx *
@@ -223,8 +250,6 @@ tbl8_recycle(struct dir24_8_tbl *dp, uint32_t ip, uint64_t tbl8_idx)
}
((uint16_t *)dp->tbl24)[ip >> 8] =
nh & ~DIR24_8_EXT_ENT;
- for (i = 0; i < DIR24_8_TBL8_GRP_NUM_ENT; i++)
- ptr16[i] = 0;
break;
case RTE_FIB_DIR24_8_4B:
ptr32 = &((uint32_t *)dp->tbl8)[tbl8_idx *
@@ -236,8 +261,6 @@ tbl8_recycle(struct dir24_8_tbl *dp, uint32_t ip, uint64_t tbl8_idx)
}
((uint32_t *)dp->tbl24)[ip >> 8] =
nh & ~DIR24_8_EXT_ENT;
- for (i = 0; i < DIR24_8_TBL8_GRP_NUM_ENT; i++)
- ptr32[i] = 0;
break;
case RTE_FIB_DIR24_8_8B:
ptr64 = &((uint64_t *)dp->tbl8)[tbl8_idx *
@@ -249,12 +272,20 @@ tbl8_recycle(struct dir24_8_tbl *dp, uint32_t ip, uint64_t tbl8_idx)
}
((uint64_t *)dp->tbl24)[ip >> 8] =
nh & ~DIR24_8_EXT_ENT;
- for (i = 0; i < DIR24_8_TBL8_GRP_NUM_ENT; i++)
- ptr64[i] = 0;
break;
}
- tbl8_free_idx(dp, tbl8_idx);
- dp->cur_tbl8s--;
+
+ if (dp->v == NULL)
+ tbl8_cleanup_and_free(dp, tbl8_idx);
+ else if (dp->rcu_mode == RTE_FIB_QSBR_MODE_SYNC) {
+ rte_rcu_qsbr_synchronize(dp->v,
+ RTE_QSBR_THRID_INVALID);
+ tbl8_cleanup_and_free(dp, tbl8_idx);
+ } else { /* RTE_FIB_QSBR_MODE_DQ */
+ if (rte_rcu_qsbr_dq_enqueue(dp->dq,
+ (void *)&tbl8_idx))
+ FIB_LOG(ERR, "Failed to push QSBR FIFO");
+ }
}
static int
@@ -569,7 +600,60 @@ dir24_8_free(void *p)
{
struct dir24_8_tbl *dp = (struct dir24_8_tbl *)p;
+ if (dp->dq != NULL)
+ rte_rcu_qsbr_dq_delete(dp->dq);
+
rte_free(dp->tbl8_idxes);
rte_free(dp->tbl8);
rte_free(dp);
}
+
+int
+dir24_8_rcu_qsbr_add(struct dir24_8_tbl *dp, struct rte_fib_rcu_config *cfg,
+ const char *name)
+{
+ struct rte_rcu_qsbr_dq_parameters params = {0};
+ char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
+
+ if (dp == NULL || cfg == NULL) {
+ rte_errno = EINVAL;
+ return 1;
+ }
+
+ if (dp->v != NULL) {
+ rte_errno = EEXIST;
+ return 1;
+ }
+
+ if (cfg->mode == RTE_FIB_QSBR_MODE_SYNC) {
+ /* No other things to do. */
+ } else if (cfg->mode == RTE_FIB_QSBR_MODE_DQ) {
+ /* Init QSBR defer queue. */
+ snprintf(rcu_dq_name, sizeof(rcu_dq_name),
+ "FIB_RCU_%s", name);
+ params.name = rcu_dq_name;
+ params.size = cfg->dq_size;
+ if (params.size == 0)
+ params.size = RTE_FIB_RCU_DQ_RECLAIM_SZ;
+ params.trigger_reclaim_limit = cfg->reclaim_thd;
+ params.max_reclaim_size = cfg->reclaim_max;
+ if (params.max_reclaim_size == 0)
+ params.max_reclaim_size = RTE_FIB_RCU_DQ_RECLAIM_MAX;
+ params.esize = sizeof(uint64_t);
+ params.free_fn = __rcu_qsbr_free_resource;
+ params.p = dp;
+ params.v = cfg->v;
+ dp->dq = rte_rcu_qsbr_dq_create(¶ms);
+ if (dp->dq == NULL) {
+ FIB_LOG(ERR, "LPM defer queue creation failed");
+ return 1;
+ }
+ } else {
+ rte_errno = EINVAL;
+ return 1;
+ }
+ dp->rcu_mode = cfg->mode;
+ dp->v = cfg->v;
+
+ return 0;
+}
@@ -10,6 +10,7 @@
#include <rte_prefetch.h>
#include <rte_branch_prediction.h>
+#include <rte_rcu_qsbr.h>
/**
* @file
@@ -30,6 +31,10 @@ struct dir24_8_tbl {
uint32_t rsvd_tbl8s; /**< Number of reserved tbl8s */
uint32_t cur_tbl8s; /**< Current number of tbl8s */
enum rte_fib_dir24_8_nh_sz nh_sz; /**< Size of nexthop entry */
+ /* RCU config. */
+ enum rte_fib_qsbr_mode rcu_mode;/* Blocking, defer queue. */
+ struct rte_rcu_qsbr *v; /* RCU QSBR variable. */
+ struct rte_rcu_qsbr_dq *dq; /* RCU QSBR defer queue. */
uint64_t def_nh; /**< Default next hop */
uint64_t *tbl8; /**< tbl8 table. */
uint64_t *tbl8_idxes; /**< bitmap containing free tbl8 idxes*/
@@ -250,4 +255,8 @@ int
dir24_8_modify(struct rte_fib *fib, uint32_t ip, uint8_t depth,
uint64_t next_hop, int op);
+int
+dir24_8_rcu_qsbr_add(struct dir24_8_tbl *dp, struct rte_fib_rcu_config *cfg,
+ const char *name);
+
#endif /* _DIR24_8_H_ */
@@ -11,6 +11,7 @@ endif
sources = files('rte_fib.c', 'rte_fib6.c', 'dir24_8.c', 'trie.c')
headers = files('rte_fib.h', 'rte_fib6.h')
deps += ['rib']
+deps += ['rcu']
# compile AVX512 version if:
# we are building 64-bit binary AND binutils can generate proper code
@@ -45,7 +46,7 @@ if dpdk_conf.has('RTE_ARCH_X86_64') and binutils_ok
elif cc.has_multi_arguments('-mavx512f', '-mavx512dq')
dir24_8_avx512_tmp = static_library('dir24_8_avx512_tmp',
'dir24_8_avx512.c',
- dependencies: static_rte_eal,
+ dependencies: [static_rte_eal, static_rte_rcu],
c_args: cflags + ['-mavx512f', '-mavx512dq'])
objs += dir24_8_avx512_tmp.extract_objects('dir24_8_avx512.c')
cflags += ['-DCC_DIR24_8_AVX512_SUPPORT']
@@ -54,7 +55,7 @@ if dpdk_conf.has('RTE_ARCH_X86_64') and binutils_ok
if cc.has_argument('-mavx512bw')
trie_avx512_tmp = static_library('trie_avx512_tmp',
'trie_avx512.c',
- dependencies: static_rte_eal,
+ dependencies: [static_rte_eal, static_rte_rcu],
c_args: cflags + ['-mavx512f', \
'-mavx512dq', '-mavx512bw'])
objs += trie_avx512_tmp.extract_objects('trie_avx512.c')
@@ -338,3 +338,14 @@ rte_fib_select_lookup(struct rte_fib *fib,
return -EINVAL;
}
}
+
+int
+rte_fib_rcu_qsbr_add(struct rte_fib *fib, struct rte_fib_rcu_config *cfg)
+{
+ switch (fib->type) {
+ case RTE_FIB_DIR24_8:
+ return dir24_8_rcu_qsbr_add(fib->dp, cfg, fib->name);
+ default:
+ return -ENOTSUP;
+ }
+}
@@ -16,7 +16,7 @@
*/
#include <stdint.h>
-
+#include <rte_rcu_qsbr.h>
#ifdef __cplusplus
extern "C" {
@@ -28,6 +28,19 @@ struct rte_rib;
/** Maximum depth value possible for IPv4 FIB. */
#define RTE_FIB_MAXDEPTH 32
+/** @internal Default RCU defer queue entries to reclaim in one go. */
+#define RTE_FIB_RCU_DQ_RECLAIM_MAX 16
+/** @internal Default RCU defer queue size. */
+#define RTE_FIB_RCU_DQ_RECLAIM_SZ 128
+
+/** RCU reclamation modes */
+enum rte_fib_qsbr_mode {
+ /** Create defer queue for reclaim. */
+ RTE_FIB_QSBR_MODE_DQ = 0,
+ /** Use blocking mode reclaim. No defer queue created. */
+ RTE_FIB_QSBR_MODE_SYNC
+};
+
/** Type of FIB struct */
enum rte_fib_type {
RTE_FIB_DUMMY, /**< RIB tree based FIB */
@@ -89,6 +102,22 @@ struct rte_fib_conf {
};
};
+/** FIB RCU QSBR configuration structure. */
+struct rte_fib_rcu_config {
+ struct rte_rcu_qsbr *v; /* RCU QSBR variable. */
+ /* Mode of RCU QSBR. RTE_FIB_QSBR_MODE_xxx
+ * '0' for default: create defer queue for reclaim.
+ */
+ enum rte_fib_qsbr_mode mode;
+ uint32_t dq_size; /* RCU defer queue size.
+ * default: RTE_FIB_RCU_DQ_RECLAIM_SZ.
+ */
+ uint32_t reclaim_thd; /* Threshold to trigger auto reclaim. */
+ uint32_t reclaim_max; /* Max entries to reclaim in one go.
+ * default: RTE_FIB_RCU_DQ_RECLAIM_MAX.
+ */
+};
+
/**
* Create FIB
*
@@ -219,6 +248,25 @@ rte_fib_get_rib(struct rte_fib *fib);
int
rte_fib_select_lookup(struct rte_fib *fib, enum rte_fib_lookup_type type);
+/**
+ * Associate RCU QSBR variable with a FIB object.
+ *
+ * @param fib
+ * the fib object to add RCU QSBR
+ * @param cfg
+ * RCU QSBR configuration
+ * @return
+ * On success - 0
+ * On error - 1 with error code set in rte_errno.
+ * Possible rte_errno codes are:
+ * - EINVAL - invalid pointer
+ * - EEXIST - already added QSBR
+ * - ENOMEM - memory allocation failure
+ * - ENOTSUP - not supported by configured dataplane algorithm
+ */
+__rte_experimental
+int rte_fib_rcu_qsbr_add(struct rte_fib *fib, struct rte_fib_rcu_config *cfg);
+
#ifdef __cplusplus
}
#endif
@@ -22,3 +22,10 @@ DPDK_25 {
local: *;
};
+
+EXPERIMENTAL {
+ global:
+
+ # added in 24.11
+ rte_fib_rcu_qsbr_add;
+};