[v1,1/2] lib/ring: add enqueue-dequeue callabck
Checks
Commit Message
Add callback event handler for enqueue dequeue operation on ring.
The pre-enqueue and post-dequeue operation on ring is selected to
invoke user callback handler.
Signed-off-by: Vipin Varghese <vipin.varghese@intel.com>
---
config/common_base | 1 +
lib/librte_ring/Makefile | 1 +
lib/librte_ring/meson.build | 2 +
lib/librte_ring/rte_ring.c | 187 +++++++++++++++++++++++++++
lib/librte_ring/rte_ring.h | 117 ++++++++++++++++-
lib/librte_ring/rte_ring_version.map | 9 ++
6 files changed, 316 insertions(+), 1 deletion(-)
Comments
On Fri, 7 Jun 2019 00:03:54 +0530
Vipin Varghese <vipin.varghese@intel.com> wrote:
> Add callback event handler for enqueue dequeue operation on ring.
> The pre-enqueue and post-dequeue operation on ring is selected to
> invoke user callback handler.
>
> Signed-off-by: Vipin Varghese <vipin.varghese@intel.com>
> ---
NAK
What is the use case for this? Rings are one of the most used elements
of DPDK and anything like this will have performance impact.
And as DPDK goes to more of distribution model, all features have to be
enabled.
Also, this looks like a problem:
+struct rte_ring_callback {
+ TAILQ_ENTRY(rte_ring_callback) next; /* Callbacks list */
+ rte_ring_cb_fn cb_fn; /* Callback address */
+ void *cb_arg; /* Parameter for callback */
+ uint32_t active; /* Callback is executing */
The active element is likely updated in one thread and queried in another
it needs to be volatile, or better yet an atomic to ensure memory ordering.
>
> Add callback event handler for enqueue dequeue operation on ring.
> The pre-enqueue and post-dequeue operation on ring is selected to invoke
> user callback handler.
Can you provide a use case for this to better understand the need?
>
> Signed-off-by: Vipin Varghese <vipin.varghese@intel.com>
> ---
> config/common_base | 1 +
> lib/librte_ring/Makefile | 1 +
> lib/librte_ring/meson.build | 2 +
> lib/librte_ring/rte_ring.c | 187 +++++++++++++++++++++++++++
> lib/librte_ring/rte_ring.h | 117 ++++++++++++++++-
> lib/librte_ring/rte_ring_version.map | 9 ++
> 6 files changed, 316 insertions(+), 1 deletion(-)
>
> diff --git a/config/common_base b/config/common_base index
> ec29455d2..022734f19 100644
> --- a/config/common_base
> +++ b/config/common_base
> @@ -500,6 +500,7 @@ CONFIG_RTE_LIBRTE_PMD_PCAP=n
> CONFIG_RTE_LIBRTE_PMD_RING=y
> CONFIG_RTE_PMD_RING_MAX_RX_RINGS=16
> CONFIG_RTE_PMD_RING_MAX_TX_RINGS=16
> +CONFIG_RTE_RING_ENQDEQ_CALLBACKS=n
>
> #
> # Compile SOFTNIC PMD
> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index
> 21a36770d..4f086e687 100644
> --- a/lib/librte_ring/Makefile
> +++ b/lib/librte_ring/Makefile
> @@ -6,6 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk # library name LIB =
> librte_ring.a
>
> +CFLAGS += -DALLOW_EXPERIMENTAL_API
> CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 LDLIBS += -lrte_eal
>
> diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index
> ab8b0b469..b92dcf027 100644
> --- a/lib/librte_ring/meson.build
> +++ b/lib/librte_ring/meson.build
> @@ -2,6 +2,8 @@
> # Copyright(c) 2017 Intel Corporation
>
> version = 2
> +allow_experimental_apis = true
> +
> sources = files('rte_ring.c')
> headers = files('rte_ring.h',
> 'rte_ring_c11_mem.h',
> diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index
> b89ecf999..ee740c401 100644
> --- a/lib/librte_ring/rte_ring.c
> +++ b/lib/librte_ring/rte_ring.c
> @@ -43,6 +43,11 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
> /* true if x is a power of 2 */
> #define POWEROF2(x) ((((x)-1) & (x)) == 0)
>
> +/* spinlock for pre-enqueue callback */ rte_spinlock_t
> +rte_ring_preenq_cb_lock = RTE_SPINLOCK_INITIALIZER;
> +/* spinlock for post-dequeue callback */ rte_spinlock_t
> +rte_ring_pstdeq_cb_lock = RTE_SPINLOCK_INITIALIZER;
> +
> /* return the size of memory occupied by a ring */ ssize_t
> rte_ring_get_memsize(unsigned count) @@ -103,6 +108,9 @@
> rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
> r->prod.head = r->cons.head = 0;
> r->prod.tail = r->cons.tail = 0;
>
> + TAILQ_INIT(&(r->enq_cbs));
> + TAILQ_INIT(&(r->deq_cbs));
> +
> return 0;
> }
>
> @@ -220,6 +228,185 @@ rte_ring_free(struct rte_ring *r)
> rte_free(te);
> }
>
> +int __rte_experimental
> +rte_ring_preenq_callback_register(struct rte_ring *r,
> + rte_ring_cb_fn cb_fn, void *cb_arg)
> +{
> +#ifndef RTE_RING_ENQDEQ_CALLBACKS
> + rte_errno = ENOTSUP;
> + return -ENOTSUP;
> +#endif
> +
> + struct rte_ring_callback *user_cb;
> + rte_spinlock_t *lock = &rte_ring_preenq_cb_lock;
> +
> + if (!cb_fn)
> + return -EINVAL;
> +
> + if (!rte_ring_get_capacity(r)) {
> + RTE_LOG(ERR, RING, "Invalid ring=%p", r);
> + return -EINVAL;
> + }
> +
> + rte_spinlock_lock(lock);
> +
> + TAILQ_FOREACH(user_cb, &(r->enq_cbs), next) {
> + if ((void *) user_cb->cb_fn == (void *) cb_fn &&
> + user_cb->cb_arg == cb_arg)
> + break;
> + }
> +
> + /* create a new callback. */
> + if (user_cb == NULL) {
> + user_cb = rte_zmalloc("RING_USER_CALLBACK",
> + sizeof(struct rte_ring_callback), 0);
> + if (user_cb != NULL) {
> + user_cb->cb_fn = cb_fn;
> + user_cb->cb_arg = cb_arg;
> + TAILQ_INSERT_TAIL(&(r->enq_cbs), user_cb, next);
> + }
> + }
> +
> + rte_spinlock_unlock(lock);
> +
> + return (user_cb == NULL) ? -ENOMEM : 0; }
> +
> +int __rte_experimental
> +rte_ring_pstdeq_callback_register(struct rte_ring *r,
> + rte_ring_cb_fn cb_fn, void *cb_arg)
> +{
> +#ifndef RTE_RING_ENQDEQ_CALLBACKS
> + rte_errno = ENOTSUP;
> + return -ENOTSUP;
> +#endif
> +
> + struct rte_ring_callback *user_cb;
> + rte_spinlock_t *lock = &rte_ring_pstdeq_cb_lock;
> +
> + if (!cb_fn)
> + return -EINVAL;
> +
> + if (!rte_ring_get_capacity(r)) {
> + RTE_LOG(ERR, RING, "Invalid ring=%p", r);
> + return -EINVAL;
> + }
> +
> + rte_spinlock_lock(lock);
> +
> + TAILQ_FOREACH(user_cb, &(r->deq_cbs), next) {
> + if ((void *) user_cb->cb_fn == (void *) cb_fn &&
> + user_cb->cb_arg == cb_arg)
> + break;
> + }
> +
> + /* create a new callback. */
> + if (user_cb == NULL) {
> + user_cb = rte_zmalloc("RING_USER_CALLBACK",
> + sizeof(struct rte_ring_callback), 0);
> + if (user_cb != NULL) {
> + user_cb->cb_fn = cb_fn;
> + user_cb->cb_arg = cb_arg;
> + TAILQ_INSERT_TAIL(&(r->deq_cbs), user_cb, next);
> + }
> + }
> +
> + rte_spinlock_unlock(lock);
> +
> + return (user_cb == NULL) ? -ENOMEM : 0; }
> +
> +int __rte_experimental
> +rte_ring_preenq_callback_unregister(struct rte_ring *r,
> + rte_ring_cb_fn cb_fn, void *cb_arg)
> +{
> +#ifndef RTE_RING_ENQDEQ_CALLBACKS
> + rte_errno = ENOTSUP;
> + return -ENOTSUP;
> +#endif
> +
> + int ret = 0;
> + struct rte_ring_callback *cb, *next;
> + rte_spinlock_t *lock = &rte_ring_preenq_cb_lock;
> +
> + if (!cb_fn)
> + return -EINVAL;
> +
> + if (!rte_ring_get_capacity(r)) {
> + RTE_LOG(ERR, RING, "Invalid ring=%p", r);
> + return -EINVAL;
> + }
> +
> + rte_spinlock_lock(lock);
> +
> + ret = -EINVAL;
> + for (cb = TAILQ_FIRST(&r->enq_cbs); cb != NULL; cb = next) {
> + next = TAILQ_NEXT(cb, next);
> +
> + if (cb->cb_fn != cb_fn || cb->cb_arg != cb_arg)
> + continue;
> +
> + if (cb->active == 0) {
> + TAILQ_REMOVE(&(r->enq_cbs), cb, next);
> + rte_free(cb);
> + ret = 0;
> + } else {
> + ret = -EAGAIN;
> + }
> + }
> +
> + rte_spinlock_unlock(lock);
> +
> + return ret;
> +}
> +
> +int __rte_experimental
> +rte_ring_pstdeq_callback_unregister(struct rte_ring *r,
> + rte_ring_cb_fn cb_fn, void *cb_arg)
> +{
> +#ifndef RTE_RING_ENQDEQ_CALLBACKS
> + rte_errno = ENOTSUP;
> + return -ENOTSUP;
> +#endif
> +
> + int ret = 0;
> + struct rte_ring_callback *cb, *next;
> + rte_spinlock_t *lock = &rte_ring_pstdeq_cb_lock;
> +
> + if (!cb_fn)
> + return -EINVAL;
> +
> + if (!rte_ring_get_capacity(r)) {
> + RTE_LOG(ERR, RING, "Invalid ring=%p", r);
> + return -EINVAL;
> + }
> +
> + rte_spinlock_lock(lock);
> +
> + ret = -EINVAL;
> + for (cb = TAILQ_FIRST(&r->deq_cbs); cb != NULL; cb = next) {
> + next = TAILQ_NEXT(cb, next);
> +
> + if (cb->cb_fn != cb_fn || cb->cb_arg != cb_arg)
> + continue;
> +
> + if (cb->active == 0) {
> + TAILQ_REMOVE(&(r->deq_cbs), cb, next);
> + rte_free(cb);
> + ret = 0;
> + } else {
> + ret = -EAGAIN;
> + }
> + }
> +
> + rte_spinlock_unlock(lock);
> +
> + return ret;
> +
> + return 0;
> +}
> +
> +
> /* dump the status of the ring on the console */ void rte_ring_dump(FILE *f,
> const struct rte_ring *r) diff --git a/lib/librte_ring/rte_ring.h
> b/lib/librte_ring/rte_ring.h index e265e9479..fb0f3efb5 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -63,6 +63,11 @@ enum rte_ring_queue_behavior {
>
> struct rte_memzone; /* forward declaration, so as not to require memzone.h
> */
>
> +struct rte_ring_callback;
> +
> +TAILQ_HEAD(rte_ring_enq_cb_list, rte_ring_callback);
> +TAILQ_HEAD(rte_ring_deq_cb_list, rte_ring_callback);
> +
> /* structure to hold a pair of head/tail values and other metadata */ struct
> rte_ring_headtail {
> volatile uint32_t head; /**< Prod/consumer head. */ @@ -103,6
> +108,20 @@ struct rte_ring {
> /** Ring consumer status. */
> struct rte_ring_headtail cons __rte_cache_aligned;
> char pad2 __rte_cache_aligned; /**< empty cache line */
> +
> + struct rte_ring_enq_cb_list enq_cbs;
> + struct rte_ring_deq_cb_list deq_cbs;
> +};
This breaks ABI compatibility
> +
> +typedef unsigned int (*rte_ring_cb_fn)(struct rte_ring *r,
> + void *obj_table, unsigned int n,
> + void *cb_arg);
> +
> +struct rte_ring_callback {
> + TAILQ_ENTRY(rte_ring_callback) next; /* Callbacks list */
> + rte_ring_cb_fn cb_fn; /* Callback address */
> + void *cb_arg; /* Parameter for callback */
> + uint32_t active; /* Callback is executing */
> };
>
> #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-
> producer". */ @@ -850,9 +869,23 @@ rte_ring_sp_enqueue_burst(struct
> rte_ring *r, void * const *obj_table,
> * - n: Actual number of objects enqueued.
> */
> static __rte_always_inline unsigned
> -rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
> +rte_ring_enqueue_burst(struct rte_ring *r, void **obj_table,
> unsigned int n, unsigned int *free_space) {
> +#ifdef RTE_RING_ENQDEQ_CALLBACKS
> + struct rte_ring_callback *cb = NULL;
> +
> + TAILQ_FOREACH(cb, &(r->enq_cbs), next) {
Need to take the TAILQ lock before this. For ex: what happens if a un-register is called concurrently?
Also, traversing a linked list for every enqueue call would be too costly. May be, understanding the use case will help.
> + if (cb->cb_fn == NULL)
> + continue;
> +
> + cb->active = 1;
> + n = cb->cb_fn(r, obj_table, n, cb->cb_arg);
> + cb->active = 0;
> + }
> +
> +#endif
> +
> return __rte_ring_do_enqueue(r, obj_table, n,
> RTE_RING_QUEUE_VARIABLE,
> r->prod.single, free_space);
> }
<snip>
On Thu, Jun 06, 2019 at 12:49:27PM -0700, Stephen Hemminger wrote:
> On Fri, 7 Jun 2019 00:03:54 +0530
> Vipin Varghese <vipin.varghese@intel.com> wrote:
>
> > Add callback event handler for enqueue dequeue operation on ring.
> > The pre-enqueue and post-dequeue operation on ring is selected to
> > invoke user callback handler.
> >
> > Signed-off-by: Vipin Varghese <vipin.varghese@intel.com>
> > ---
>
> NAK
> What is the use case for this? Rings are one of the most used elements
> of DPDK and anything like this will have performance impact.
>
> And as DPDK goes to more of distribution model, all features have to be
> enabled.
>
To add callback handlers to a ring, I'd suggest wrapping the ring as an
ethdev using the ring pmd, and then using the ethdev callbacks.
/Bruce
> -----Original Message-----
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Stephen Hemminger
> Sent: Thursday, June 6, 2019 8:49 PM
> To: Varghese, Vipin <vipin.varghese@intel.com>
> Cc: olivier.matz@6wind.com; Pattan, Reshma <reshma.pattan@intel.com>; Wiles, Keith <keith.wiles@intel.com>; dev@dpdk.org;
> Padubidri, Sanjay A <sanjay.padubidri@intel.com>
> Subject: Re: [dpdk-dev] [PATCH v1 1/2] lib/ring: add enqueue-dequeue callabck
>
> On Fri, 7 Jun 2019 00:03:54 +0530
> Vipin Varghese <vipin.varghese@intel.com> wrote:
>
> > Add callback event handler for enqueue dequeue operation on ring.
> > The pre-enqueue and post-dequeue operation on ring is selected to
> > invoke user callback handler.
> >
> > Signed-off-by: Vipin Varghese <vipin.varghese@intel.com>
> > ---
>
> NAK
> What is the use case for this? Rings are one of the most used elements
> of DPDK and anything like this will have performance impact.
>
> And as DPDK goes to more of distribution model, all features have to be
> enabled.
+1 here.
Approach in general looks quite expensive, most users wouldn't need it.
For those who want such functionality, it is not big deal to create a wrapper
on top of current rte_ring (drivers/net/ring could be one example).
Konstantin
>
>
> Also, this looks like a problem:
> +struct rte_ring_callback {
> + TAILQ_ENTRY(rte_ring_callback) next; /* Callbacks list */
> + rte_ring_cb_fn cb_fn; /* Callback address */
> + void *cb_arg; /* Parameter for callback */
> + uint32_t active; /* Callback is executing */
>
> The active element is likely updated in one thread and queried in another
> it needs to be volatile, or better yet an atomic to ensure memory ordering.
Hi Stephen,
snipped
> NAK
> What is the use case for this?
Use cases:
- allow user to investigate the contents pre-enqueue.
- allow user to investigate the contents post-dequeue.
- modify pre-enqueue and post-dequeue stage content.
- investigate PMD meta data.
Rings are one of the most used elements of DPDK
> and anything like this will have performance impact.
Based on the current testing with 10G, the impact with callback handlers enabled with no or one has less than 0.1% with SP-SC. I will test for more cases like SP-MC, MP-SC, and MP-MC.
>
> And as DPDK goes to more of distribution model, all features have to be enabled.
>
>
> Also, this looks like a problem:
> +struct rte_ring_callback {
> + TAILQ_ENTRY(rte_ring_callback) next; /* Callbacks list */
> + rte_ring_cb_fn cb_fn; /* Callback address */
> + void *cb_arg; /* Parameter for callback */
> + uint32_t active; /* Callback is executing */
>
> The active element is likely updated in one thread and queried in another it needs
> to be volatile, or better yet an atomic to ensure memory ordering.
Yes, thanks you for pointing this out, Let me correct the same.
Hi Honnappa,
snipped
> >
> > Add callback event handler for enqueue dequeue operation on ring.
> > The pre-enqueue and post-dequeue operation on ring is selected to
> > invoke user callback handler.
> Can you provide a use case for this to better understand the need?
Use cases:
- allow user to investigate the contents pre-enqueue.
- allow user to investigate the contents post-dequeue.
- modify pre-enqueue and post-dequeue stage content.
- investigate PMD meta data for debug in field nodes.
snipped
> > + struct rte_ring_enq_cb_list enq_cbs;
> > + struct rte_ring_deq_cb_list deq_cbs; };
> This breaks ABI compatibility
Can you help me understand this more clearly?
snipped
> > -rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
> > +rte_ring_enqueue_burst(struct rte_ring *r, void **obj_table,
> > unsigned int n, unsigned int *free_space) {
> > +#ifdef RTE_RING_ENQDEQ_CALLBACKS
> > + struct rte_ring_callback *cb = NULL;
> > +
> > + TAILQ_FOREACH(cb, &(r->enq_cbs), next) {
> Need to take the TAILQ lock before this. For ex: what happens if a un-register is
> called concurrently?
Let me check this, using rx|tx callback as reference.
> Also, traversing a linked list for every enqueue call would be too costly. May be,
> understanding the use case will help.
Internal testing with SP-SC is 0.1% (with extra cost of user function). But I will try to explore other alternatives too.
Snipped
> -----Original Message-----
> From: Richardson, Bruce
> Sent: Friday, June 7, 2019 3:01 PM
> To: Stephen Hemminger <stephen@networkplumber.org>
> Cc: Varghese, Vipin <vipin.varghese@intel.com>; olivier.matz@6wind.com;
> Pattan, Reshma <reshma.pattan@intel.com>; Wiles, Keith
> <keith.wiles@intel.com>; dev@dpdk.org; Padubidri, Sanjay A
> <sanjay.padubidri@intel.com>
> Subject: Re: [dpdk-dev] [PATCH v1 1/2] lib/ring: add enqueue-dequeue callabck
>
> On Thu, Jun 06, 2019 at 12:49:27PM -0700, Stephen Hemminger wrote:
> > On Fri, 7 Jun 2019 00:03:54 +0530
> > Vipin Varghese <vipin.varghese@intel.com> wrote:
> >
> > > Add callback event handler for enqueue dequeue operation on ring.
> > > The pre-enqueue and post-dequeue operation on ring is selected to
> > > invoke user callback handler.
> > >
> > > Signed-off-by: Vipin Varghese <vipin.varghese@intel.com>
> > > ---
> >
> > NAK
> > What is the use case for this? Rings are one of the most used elements
> > of DPDK and anything like this will have performance impact.
> >
> > And as DPDK goes to more of distribution model, all features have to
> > be enabled.
> >
> To add callback handlers to a ring, I'd suggest wrapping the ring as an ethdev
> using the ring pmd, and then using the ethdev callbacks.
Thanks Bruce, I will check the same and get back.
>
> /Bruce
> -----Original Message-----
> From: Ananyev, Konstantin
> Sent: Friday, June 7, 2019 4:15 PM
> To: Stephen Hemminger <stephen@networkplumber.org>; Varghese, Vipin
> <vipin.varghese@intel.com>
> Cc: olivier.matz@6wind.com; Pattan, Reshma <reshma.pattan@intel.com>;
> Wiles, Keith <keith.wiles@intel.com>; dev@dpdk.org; Padubidri, Sanjay A
> <sanjay.padubidri@intel.com>
> Subject: RE: [dpdk-dev] [PATCH v1 1/2] lib/ring: add enqueue-dequeue callabck
>
>
>
> > -----Original Message-----
> > From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Stephen Hemminger
> > Sent: Thursday, June 6, 2019 8:49 PM
> > To: Varghese, Vipin <vipin.varghese@intel.com>
> > Cc: olivier.matz@6wind.com; Pattan, Reshma <reshma.pattan@intel.com>;
> > Wiles, Keith <keith.wiles@intel.com>; dev@dpdk.org; Padubidri, Sanjay
> > A <sanjay.padubidri@intel.com>
> > Subject: Re: [dpdk-dev] [PATCH v1 1/2] lib/ring: add enqueue-dequeue
> > callabck
> >
> > On Fri, 7 Jun 2019 00:03:54 +0530
> > Vipin Varghese <vipin.varghese@intel.com> wrote:
> >
> > > Add callback event handler for enqueue dequeue operation on ring.
> > > The pre-enqueue and post-dequeue operation on ring is selected to
> > > invoke user callback handler.
> > >
> > > Signed-off-by: Vipin Varghese <vipin.varghese@intel.com>
> > > ---
> >
> > NAK
> > What is the use case for this? Rings are one of the most used elements
> > of DPDK and anything like this will have performance impact.
> >
> > And as DPDK goes to more of distribution model, all features have to
> > be enabled.
>
> +1 here.
I am not sure if I have the understanding correct,, so if all features are to enabled 'why is features set as no in `common/config_base`'.
> Approach in general looks quite expensive, most users wouldn't need it.
> For those who want such functionality, it is not big deal to create a wrapper on
> top of current rte_ring (drivers/net/ring could be one example).
Thanks, will explore and get back.
> Konstantin
>
> >
> >
> > Also, this looks like a problem:
> > +struct rte_ring_callback {
> > + TAILQ_ENTRY(rte_ring_callback) next; /* Callbacks list */
> > + rte_ring_cb_fn cb_fn; /* Callback address */
> > + void *cb_arg; /* Parameter for callback */
> > + uint32_t active; /* Callback is executing */
> >
> > The active element is likely updated in one thread and queried in
> > another it needs to be volatile, or better yet an atomic to ensure memory
> ordering.
>
>
>
> Hi Honnappa,
>
> snipped
> > >
> > > Add callback event handler for enqueue dequeue operation on ring.
> > > The pre-enqueue and post-dequeue operation on ring is selected to
> > > invoke user callback handler.
> > Can you provide a use case for this to better understand the need?
> Use cases:
> - allow user to investigate the contents pre-enqueue.
> - allow user to investigate the contents post-dequeue.
> - modify pre-enqueue and post-dequeue stage content.
> - investigate PMD meta data for debug in field nodes.
>
> snipped
> > > + struct rte_ring_enq_cb_list enq_cbs;
> > > + struct rte_ring_deq_cb_list deq_cbs; };
> > This breaks ABI compatibility
> Can you help me understand this more clearly?
'struct rte_ring' is exposed to the application. It is possible that the application has declared a variable of type 'struct rte_ring'. Any addition to this structure will increase the size of 'struct rte_ring'. Hence, it won't be compatible.
>
> snipped
> > > -rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
> > > +rte_ring_enqueue_burst(struct rte_ring *r, void **obj_table,
> > > unsigned int n, unsigned int *free_space) {
> > > +#ifdef RTE_RING_ENQDEQ_CALLBACKS
> > > + struct rte_ring_callback *cb = NULL;
> > > +
> > > + TAILQ_FOREACH(cb, &(r->enq_cbs), next) {
> > Need to take the TAILQ lock before this. For ex: what happens if a
> > un-register is called concurrently?
> Let me check this, using rx|tx callback as reference.
>
> > Also, traversing a linked list for every enqueue call would be too
> > costly. May be, understanding the use case will help.
> Internal testing with SP-SC is 0.1% (with extra cost of user function). But I will
> try to explore other alternatives too.
>
> Snipped
Thanks Honnappa, will work on the changes suggested.
> -----Original Message-----
> From: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>
> Sent: Tuesday, June 11, 2019 9:48 AM
> To: Varghese, Vipin <vipin.varghese@intel.com>; olivier.matz@6wind.com;
> Pattan, Reshma <reshma.pattan@intel.com>; Wiles, Keith
> <keith.wiles@intel.com>; dev@dpdk.org
> Cc: Padubidri, Sanjay A <sanjay.padubidri@intel.com>; Honnappa Nagarahalli
> <Honnappa.Nagarahalli@arm.com>; nd <nd@arm.com>; nd <nd@arm.com>
> Subject: RE: [dpdk-dev] [PATCH v1 1/2] lib/ring: add enqueue-dequeue callabck
>
> >
> > Hi Honnappa,
> >
> > snipped
> > > >
> > > > Add callback event handler for enqueue dequeue operation on ring.
> > > > The pre-enqueue and post-dequeue operation on ring is selected to
> > > > invoke user callback handler.
> > > Can you provide a use case for this to better understand the need?
> > Use cases:
> > - allow user to investigate the contents pre-enqueue.
> > - allow user to investigate the contents post-dequeue.
> > - modify pre-enqueue and post-dequeue stage content.
> > - investigate PMD meta data for debug in field nodes.
> >
> > snipped
> > > > + struct rte_ring_enq_cb_list enq_cbs;
> > > > + struct rte_ring_deq_cb_list deq_cbs; };
> > > This breaks ABI compatibility
> > Can you help me understand this more clearly?
> 'struct rte_ring' is exposed to the application. It is possible that the application has
> declared a variable of type 'struct rte_ring'. Any addition to this structure will
> increase the size of 'struct rte_ring'. Hence, it won't be compatible.
>
> >
> > snipped
> > > > -rte_ring_enqueue_burst(struct rte_ring *r, void * const
> > > > *obj_table,
> > > > +rte_ring_enqueue_burst(struct rte_ring *r, void **obj_table,
> > > > unsigned int n, unsigned int *free_space) {
> > > > +#ifdef RTE_RING_ENQDEQ_CALLBACKS
> > > > + struct rte_ring_callback *cb = NULL;
> > > > +
> > > > + TAILQ_FOREACH(cb, &(r->enq_cbs), next) {
> > > Need to take the TAILQ lock before this. For ex: what happens if a
> > > un-register is called concurrently?
> > Let me check this, using rx|tx callback as reference.
> >
> > > Also, traversing a linked list for every enqueue call would be too
> > > costly. May be, understanding the use case will help.
> > Internal testing with SP-SC is 0.1% (with extra cost of user
> > function). But I will try to explore other alternatives too.
> >
> > Snipped
@@ -500,6 +500,7 @@ CONFIG_RTE_LIBRTE_PMD_PCAP=n
CONFIG_RTE_LIBRTE_PMD_RING=y
CONFIG_RTE_PMD_RING_MAX_RX_RINGS=16
CONFIG_RTE_PMD_RING_MAX_TX_RINGS=16
+CONFIG_RTE_RING_ENQDEQ_CALLBACKS=n
#
# Compile SOFTNIC PMD
@@ -6,6 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk
# library name
LIB = librte_ring.a
+CFLAGS += -DALLOW_EXPERIMENTAL_API
CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
LDLIBS += -lrte_eal
@@ -2,6 +2,8 @@
# Copyright(c) 2017 Intel Corporation
version = 2
+allow_experimental_apis = true
+
sources = files('rte_ring.c')
headers = files('rte_ring.h',
'rte_ring_c11_mem.h',
@@ -43,6 +43,11 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
/* true if x is a power of 2 */
#define POWEROF2(x) ((((x)-1) & (x)) == 0)
+/* spinlock for pre-enqueue callback */
+rte_spinlock_t rte_ring_preenq_cb_lock = RTE_SPINLOCK_INITIALIZER;
+/* spinlock for post-dequeue callback */
+rte_spinlock_t rte_ring_pstdeq_cb_lock = RTE_SPINLOCK_INITIALIZER;
+
/* return the size of memory occupied by a ring */
ssize_t
rte_ring_get_memsize(unsigned count)
@@ -103,6 +108,9 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
r->prod.head = r->cons.head = 0;
r->prod.tail = r->cons.tail = 0;
+ TAILQ_INIT(&(r->enq_cbs));
+ TAILQ_INIT(&(r->deq_cbs));
+
return 0;
}
@@ -220,6 +228,185 @@ rte_ring_free(struct rte_ring *r)
rte_free(te);
}
+int __rte_experimental
+rte_ring_preenq_callback_register(struct rte_ring *r,
+ rte_ring_cb_fn cb_fn, void *cb_arg)
+{
+#ifndef RTE_RING_ENQDEQ_CALLBACKS
+ rte_errno = ENOTSUP;
+ return -ENOTSUP;
+#endif
+
+ struct rte_ring_callback *user_cb;
+ rte_spinlock_t *lock = &rte_ring_preenq_cb_lock;
+
+ if (!cb_fn)
+ return -EINVAL;
+
+ if (!rte_ring_get_capacity(r)) {
+ RTE_LOG(ERR, RING, "Invalid ring=%p", r);
+ return -EINVAL;
+ }
+
+ rte_spinlock_lock(lock);
+
+ TAILQ_FOREACH(user_cb, &(r->enq_cbs), next) {
+ if ((void *) user_cb->cb_fn == (void *) cb_fn &&
+ user_cb->cb_arg == cb_arg)
+ break;
+ }
+
+ /* create a new callback. */
+ if (user_cb == NULL) {
+ user_cb = rte_zmalloc("RING_USER_CALLBACK",
+ sizeof(struct rte_ring_callback), 0);
+ if (user_cb != NULL) {
+ user_cb->cb_fn = cb_fn;
+ user_cb->cb_arg = cb_arg;
+ TAILQ_INSERT_TAIL(&(r->enq_cbs), user_cb, next);
+ }
+ }
+
+ rte_spinlock_unlock(lock);
+
+ return (user_cb == NULL) ? -ENOMEM : 0;
+}
+
+int __rte_experimental
+rte_ring_pstdeq_callback_register(struct rte_ring *r,
+ rte_ring_cb_fn cb_fn, void *cb_arg)
+{
+#ifndef RTE_RING_ENQDEQ_CALLBACKS
+ rte_errno = ENOTSUP;
+ return -ENOTSUP;
+#endif
+
+ struct rte_ring_callback *user_cb;
+ rte_spinlock_t *lock = &rte_ring_pstdeq_cb_lock;
+
+ if (!cb_fn)
+ return -EINVAL;
+
+ if (!rte_ring_get_capacity(r)) {
+ RTE_LOG(ERR, RING, "Invalid ring=%p", r);
+ return -EINVAL;
+ }
+
+ rte_spinlock_lock(lock);
+
+ TAILQ_FOREACH(user_cb, &(r->deq_cbs), next) {
+ if ((void *) user_cb->cb_fn == (void *) cb_fn &&
+ user_cb->cb_arg == cb_arg)
+ break;
+ }
+
+ /* create a new callback. */
+ if (user_cb == NULL) {
+ user_cb = rte_zmalloc("RING_USER_CALLBACK",
+ sizeof(struct rte_ring_callback), 0);
+ if (user_cb != NULL) {
+ user_cb->cb_fn = cb_fn;
+ user_cb->cb_arg = cb_arg;
+ TAILQ_INSERT_TAIL(&(r->deq_cbs), user_cb, next);
+ }
+ }
+
+ rte_spinlock_unlock(lock);
+
+ return (user_cb == NULL) ? -ENOMEM : 0;
+}
+
+int __rte_experimental
+rte_ring_preenq_callback_unregister(struct rte_ring *r,
+ rte_ring_cb_fn cb_fn, void *cb_arg)
+{
+#ifndef RTE_RING_ENQDEQ_CALLBACKS
+ rte_errno = ENOTSUP;
+ return -ENOTSUP;
+#endif
+
+ int ret = 0;
+ struct rte_ring_callback *cb, *next;
+ rte_spinlock_t *lock = &rte_ring_preenq_cb_lock;
+
+ if (!cb_fn)
+ return -EINVAL;
+
+ if (!rte_ring_get_capacity(r)) {
+ RTE_LOG(ERR, RING, "Invalid ring=%p", r);
+ return -EINVAL;
+ }
+
+ rte_spinlock_lock(lock);
+
+ ret = -EINVAL;
+ for (cb = TAILQ_FIRST(&r->enq_cbs); cb != NULL; cb = next) {
+ next = TAILQ_NEXT(cb, next);
+
+ if (cb->cb_fn != cb_fn || cb->cb_arg != cb_arg)
+ continue;
+
+ if (cb->active == 0) {
+ TAILQ_REMOVE(&(r->enq_cbs), cb, next);
+ rte_free(cb);
+ ret = 0;
+ } else {
+ ret = -EAGAIN;
+ }
+ }
+
+ rte_spinlock_unlock(lock);
+
+ return ret;
+}
+
+int __rte_experimental
+rte_ring_pstdeq_callback_unregister(struct rte_ring *r,
+ rte_ring_cb_fn cb_fn, void *cb_arg)
+{
+#ifndef RTE_RING_ENQDEQ_CALLBACKS
+ rte_errno = ENOTSUP;
+ return -ENOTSUP;
+#endif
+
+ int ret = 0;
+ struct rte_ring_callback *cb, *next;
+ rte_spinlock_t *lock = &rte_ring_pstdeq_cb_lock;
+
+ if (!cb_fn)
+ return -EINVAL;
+
+ if (!rte_ring_get_capacity(r)) {
+ RTE_LOG(ERR, RING, "Invalid ring=%p", r);
+ return -EINVAL;
+ }
+
+ rte_spinlock_lock(lock);
+
+ ret = -EINVAL;
+ for (cb = TAILQ_FIRST(&r->deq_cbs); cb != NULL; cb = next) {
+ next = TAILQ_NEXT(cb, next);
+
+ if (cb->cb_fn != cb_fn || cb->cb_arg != cb_arg)
+ continue;
+
+ if (cb->active == 0) {
+ TAILQ_REMOVE(&(r->deq_cbs), cb, next);
+ rte_free(cb);
+ ret = 0;
+ } else {
+ ret = -EAGAIN;
+ }
+ }
+
+ rte_spinlock_unlock(lock);
+
+ return ret;
+
+ return 0;
+}
+
+
/* dump the status of the ring on the console */
void
rte_ring_dump(FILE *f, const struct rte_ring *r)
@@ -63,6 +63,11 @@ enum rte_ring_queue_behavior {
struct rte_memzone; /* forward declaration, so as not to require memzone.h */
+struct rte_ring_callback;
+
+TAILQ_HEAD(rte_ring_enq_cb_list, rte_ring_callback);
+TAILQ_HEAD(rte_ring_deq_cb_list, rte_ring_callback);
+
/* structure to hold a pair of head/tail values and other metadata */
struct rte_ring_headtail {
volatile uint32_t head; /**< Prod/consumer head. */
@@ -103,6 +108,20 @@ struct rte_ring {
/** Ring consumer status. */
struct rte_ring_headtail cons __rte_cache_aligned;
char pad2 __rte_cache_aligned; /**< empty cache line */
+
+ struct rte_ring_enq_cb_list enq_cbs;
+ struct rte_ring_deq_cb_list deq_cbs;
+};
+
+typedef unsigned int (*rte_ring_cb_fn)(struct rte_ring *r,
+ void *obj_table, unsigned int n,
+ void *cb_arg);
+
+struct rte_ring_callback {
+ TAILQ_ENTRY(rte_ring_callback) next; /* Callbacks list */
+ rte_ring_cb_fn cb_fn; /* Callback address */
+ void *cb_arg; /* Parameter for callback */
+ uint32_t active; /* Callback is executing */
};
#define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
@@ -850,9 +869,23 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
* - n: Actual number of objects enqueued.
*/
static __rte_always_inline unsigned
-rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+rte_ring_enqueue_burst(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *free_space)
{
+#ifdef RTE_RING_ENQDEQ_CALLBACKS
+ struct rte_ring_callback *cb = NULL;
+
+ TAILQ_FOREACH(cb, &(r->enq_cbs), next) {
+ if (cb->cb_fn == NULL)
+ continue;
+
+ cb->active = 1;
+ n = cb->cb_fn(r, obj_table, n, cb->cb_arg);
+ cb->active = 0;
+ }
+
+#endif
+
return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE,
r->prod.single, free_space);
}
@@ -881,6 +914,20 @@ static __rte_always_inline unsigned
rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
+#ifdef RTE_RING_ENQDEQ_CALLBACKS
+ struct rte_ring_callback *cb = NULL;
+
+ TAILQ_FOREACH(cb, &(r->deq_cbs), next) {
+ if (cb->cb_fn == NULL)
+ continue;
+
+ cb->active = 1;
+ n = cb->cb_fn(r, obj_table, n, cb->cb_arg);
+ cb->active = 0;
+ }
+
+#endif
+
return __rte_ring_do_dequeue(r, obj_table, n,
RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
}
@@ -938,6 +985,74 @@ rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
r->cons.single, available);
}
+/**
+ * Register user callback function with argument for pre-enqueue
+ * to the ring.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param cb_fn
+ * User callback function to be invoked.
+ * @param cb_arg
+ * user callback arguments to shared in callback process.
+ * @return
+ * 0 on success, or a negative value on error.
+ */
+int __rte_experimental
+rte_ring_preenq_callback_register(struct rte_ring *r,
+ rte_ring_cb_fn cb_fn, void *cb_arg);
+
+/**
+ * Register user callback function with argument for post-dequeue
+ * to the ring.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param cb_fn
+ * User callback function to be invoked.
+ * @param cb_arg
+ * user callback arguments to shared in callback process.
+ * @return
+ * 0 on success, or a negative value on error.
+ */
+int __rte_experimental
+rte_ring_pstdeq_callback_register(struct rte_ring *r,
+ rte_ring_cb_fn cb_fn, void *cb_arg);
+
+/**
+ * Unregister user callback function with argument for pre-enqueue
+ * to the ring.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param cb_fn
+ * User callback function to be invoked.
+ * @param cb_arg
+ * user callback arguments to shared in callback process.
+ * @return
+ * 0 on success, or a negative value on error.
+ */
+int __rte_experimental
+rte_ring_preenq_callback_unregister(struct rte_ring *r,
+ rte_ring_cb_fn cb_fn, void *cb_arg);
+
+/**
+ * Unregister user callback function with argument for post-dequeue
+ * to the ring.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param cb_fn
+ * User callback function to be invoked.
+ * @param cb_arg
+ * user callback arguments to shared in callback process.
+ * @return
+ * 0 on success, or a negative value on error.
+ */
+int __rte_experimental
+rte_ring_pstdeq_callback_unregister(struct rte_ring *r,
+ rte_ring_cb_fn cb_fn, void *cb_arg);
+
#ifdef __cplusplus
}
#endif
@@ -17,3 +17,12 @@ DPDK_2.2 {
rte_ring_free;
} DPDK_2.0;
+
+EXPERIMENTAL {
+ global:
+
+ rte_ring_preenq_callback_register;
+ rte_ring_preenq_callback_unregister;
+ rte_ring_pstdeq_callback_register;
+ rte_ring_pstdeq_callback_unregister;
+};