From patchwork Thu Jun 6 18:33:54 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Varghese, Vipin" X-Patchwork-Id: 54521 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 156F91B9A5; Thu, 6 Jun 2019 20:33:24 +0200 (CEST) Received: from mga18.intel.com (mga18.intel.com [134.134.136.126]) by dpdk.org (Postfix) with ESMTP id C16161B958 for ; Thu, 6 Jun 2019 20:33:21 +0200 (CEST) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga005.jf.intel.com ([10.7.209.41]) by orsmga106.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 06 Jun 2019 11:33:20 -0700 X-ExtLoop1: 1 Received: from unknown (HELO saesrv02-S2600CWR.intel.com) ([10.224.122.203]) by orsmga005.jf.intel.com with ESMTP; 06 Jun 2019 11:33:18 -0700 From: Vipin Varghese To: olivier.matz@6wind.com, reshma.pattan@intel.com, keith.wiles@intel.com, dev@dpdk.org Cc: sanjay.padubidri@intel.com, Vipin Varghese Date: Fri, 7 Jun 2019 00:03:54 +0530 Message-Id: <20190606183355.56734-1-vipin.varghese@intel.com> X-Mailer: git-send-email 2.17.1 Subject: [dpdk-dev] [PATCH v1 1/2] lib/ring: add enqueue-dequeue callabck X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Add callback event handler for enqueue dequeue operation on ring. The pre-enqueue and post-dequeue operation on ring is selected to invoke user callback handler. Signed-off-by: Vipin Varghese --- config/common_base | 1 + lib/librte_ring/Makefile | 1 + lib/librte_ring/meson.build | 2 + lib/librte_ring/rte_ring.c | 187 +++++++++++++++++++++++++++ lib/librte_ring/rte_ring.h | 117 ++++++++++++++++- lib/librte_ring/rte_ring_version.map | 9 ++ 6 files changed, 316 insertions(+), 1 deletion(-) diff --git a/config/common_base b/config/common_base index ec29455d2..022734f19 100644 --- a/config/common_base +++ b/config/common_base @@ -500,6 +500,7 @@ CONFIG_RTE_LIBRTE_PMD_PCAP=n CONFIG_RTE_LIBRTE_PMD_RING=y CONFIG_RTE_PMD_RING_MAX_RX_RINGS=16 CONFIG_RTE_PMD_RING_MAX_TX_RINGS=16 +CONFIG_RTE_RING_ENQDEQ_CALLBACKS=n # # Compile SOFTNIC PMD diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index 21a36770d..4f086e687 100644 --- a/lib/librte_ring/Makefile +++ b/lib/librte_ring/Makefile @@ -6,6 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk # library name LIB = librte_ring.a +CFLAGS += -DALLOW_EXPERIMENTAL_API CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 LDLIBS += -lrte_eal diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index ab8b0b469..b92dcf027 100644 --- a/lib/librte_ring/meson.build +++ b/lib/librte_ring/meson.build @@ -2,6 +2,8 @@ # Copyright(c) 2017 Intel Corporation version = 2 +allow_experimental_apis = true + sources = files('rte_ring.c') headers = files('rte_ring.h', 'rte_ring_c11_mem.h', diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index b89ecf999..ee740c401 100644 --- a/lib/librte_ring/rte_ring.c +++ b/lib/librte_ring/rte_ring.c @@ -43,6 +43,11 @@ EAL_REGISTER_TAILQ(rte_ring_tailq) /* true if x is a power of 2 */ #define POWEROF2(x) ((((x)-1) & (x)) == 0) +/* spinlock for pre-enqueue callback */ +rte_spinlock_t rte_ring_preenq_cb_lock = RTE_SPINLOCK_INITIALIZER; +/* spinlock for post-dequeue callback */ +rte_spinlock_t rte_ring_pstdeq_cb_lock = RTE_SPINLOCK_INITIALIZER; + /* return the size of memory occupied by a ring */ ssize_t rte_ring_get_memsize(unsigned count) @@ -103,6 +108,9 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count, r->prod.head = r->cons.head = 0; r->prod.tail = r->cons.tail = 0; + TAILQ_INIT(&(r->enq_cbs)); + TAILQ_INIT(&(r->deq_cbs)); + return 0; } @@ -220,6 +228,185 @@ rte_ring_free(struct rte_ring *r) rte_free(te); } +int __rte_experimental +rte_ring_preenq_callback_register(struct rte_ring *r, + rte_ring_cb_fn cb_fn, void *cb_arg) +{ +#ifndef RTE_RING_ENQDEQ_CALLBACKS + rte_errno = ENOTSUP; + return -ENOTSUP; +#endif + + struct rte_ring_callback *user_cb; + rte_spinlock_t *lock = &rte_ring_preenq_cb_lock; + + if (!cb_fn) + return -EINVAL; + + if (!rte_ring_get_capacity(r)) { + RTE_LOG(ERR, RING, "Invalid ring=%p", r); + return -EINVAL; + } + + rte_spinlock_lock(lock); + + TAILQ_FOREACH(user_cb, &(r->enq_cbs), next) { + if ((void *) user_cb->cb_fn == (void *) cb_fn && + user_cb->cb_arg == cb_arg) + break; + } + + /* create a new callback. */ + if (user_cb == NULL) { + user_cb = rte_zmalloc("RING_USER_CALLBACK", + sizeof(struct rte_ring_callback), 0); + if (user_cb != NULL) { + user_cb->cb_fn = cb_fn; + user_cb->cb_arg = cb_arg; + TAILQ_INSERT_TAIL(&(r->enq_cbs), user_cb, next); + } + } + + rte_spinlock_unlock(lock); + + return (user_cb == NULL) ? -ENOMEM : 0; +} + +int __rte_experimental +rte_ring_pstdeq_callback_register(struct rte_ring *r, + rte_ring_cb_fn cb_fn, void *cb_arg) +{ +#ifndef RTE_RING_ENQDEQ_CALLBACKS + rte_errno = ENOTSUP; + return -ENOTSUP; +#endif + + struct rte_ring_callback *user_cb; + rte_spinlock_t *lock = &rte_ring_pstdeq_cb_lock; + + if (!cb_fn) + return -EINVAL; + + if (!rte_ring_get_capacity(r)) { + RTE_LOG(ERR, RING, "Invalid ring=%p", r); + return -EINVAL; + } + + rte_spinlock_lock(lock); + + TAILQ_FOREACH(user_cb, &(r->deq_cbs), next) { + if ((void *) user_cb->cb_fn == (void *) cb_fn && + user_cb->cb_arg == cb_arg) + break; + } + + /* create a new callback. */ + if (user_cb == NULL) { + user_cb = rte_zmalloc("RING_USER_CALLBACK", + sizeof(struct rte_ring_callback), 0); + if (user_cb != NULL) { + user_cb->cb_fn = cb_fn; + user_cb->cb_arg = cb_arg; + TAILQ_INSERT_TAIL(&(r->deq_cbs), user_cb, next); + } + } + + rte_spinlock_unlock(lock); + + return (user_cb == NULL) ? -ENOMEM : 0; +} + +int __rte_experimental +rte_ring_preenq_callback_unregister(struct rte_ring *r, + rte_ring_cb_fn cb_fn, void *cb_arg) +{ +#ifndef RTE_RING_ENQDEQ_CALLBACKS + rte_errno = ENOTSUP; + return -ENOTSUP; +#endif + + int ret = 0; + struct rte_ring_callback *cb, *next; + rte_spinlock_t *lock = &rte_ring_preenq_cb_lock; + + if (!cb_fn) + return -EINVAL; + + if (!rte_ring_get_capacity(r)) { + RTE_LOG(ERR, RING, "Invalid ring=%p", r); + return -EINVAL; + } + + rte_spinlock_lock(lock); + + ret = -EINVAL; + for (cb = TAILQ_FIRST(&r->enq_cbs); cb != NULL; cb = next) { + next = TAILQ_NEXT(cb, next); + + if (cb->cb_fn != cb_fn || cb->cb_arg != cb_arg) + continue; + + if (cb->active == 0) { + TAILQ_REMOVE(&(r->enq_cbs), cb, next); + rte_free(cb); + ret = 0; + } else { + ret = -EAGAIN; + } + } + + rte_spinlock_unlock(lock); + + return ret; +} + +int __rte_experimental +rte_ring_pstdeq_callback_unregister(struct rte_ring *r, + rte_ring_cb_fn cb_fn, void *cb_arg) +{ +#ifndef RTE_RING_ENQDEQ_CALLBACKS + rte_errno = ENOTSUP; + return -ENOTSUP; +#endif + + int ret = 0; + struct rte_ring_callback *cb, *next; + rte_spinlock_t *lock = &rte_ring_pstdeq_cb_lock; + + if (!cb_fn) + return -EINVAL; + + if (!rte_ring_get_capacity(r)) { + RTE_LOG(ERR, RING, "Invalid ring=%p", r); + return -EINVAL; + } + + rte_spinlock_lock(lock); + + ret = -EINVAL; + for (cb = TAILQ_FIRST(&r->deq_cbs); cb != NULL; cb = next) { + next = TAILQ_NEXT(cb, next); + + if (cb->cb_fn != cb_fn || cb->cb_arg != cb_arg) + continue; + + if (cb->active == 0) { + TAILQ_REMOVE(&(r->deq_cbs), cb, next); + rte_free(cb); + ret = 0; + } else { + ret = -EAGAIN; + } + } + + rte_spinlock_unlock(lock); + + return ret; + + return 0; +} + + /* dump the status of the ring on the console */ void rte_ring_dump(FILE *f, const struct rte_ring *r) diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index e265e9479..fb0f3efb5 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -63,6 +63,11 @@ enum rte_ring_queue_behavior { struct rte_memzone; /* forward declaration, so as not to require memzone.h */ +struct rte_ring_callback; + +TAILQ_HEAD(rte_ring_enq_cb_list, rte_ring_callback); +TAILQ_HEAD(rte_ring_deq_cb_list, rte_ring_callback); + /* structure to hold a pair of head/tail values and other metadata */ struct rte_ring_headtail { volatile uint32_t head; /**< Prod/consumer head. */ @@ -103,6 +108,20 @@ struct rte_ring { /** Ring consumer status. */ struct rte_ring_headtail cons __rte_cache_aligned; char pad2 __rte_cache_aligned; /**< empty cache line */ + + struct rte_ring_enq_cb_list enq_cbs; + struct rte_ring_deq_cb_list deq_cbs; +}; + +typedef unsigned int (*rte_ring_cb_fn)(struct rte_ring *r, + void *obj_table, unsigned int n, + void *cb_arg); + +struct rte_ring_callback { + TAILQ_ENTRY(rte_ring_callback) next; /* Callbacks list */ + rte_ring_cb_fn cb_fn; /* Callback address */ + void *cb_arg; /* Parameter for callback */ + uint32_t active; /* Callback is executing */ }; #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */ @@ -850,9 +869,23 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, * - n: Actual number of objects enqueued. */ static __rte_always_inline unsigned -rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, +rte_ring_enqueue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *free_space) { +#ifdef RTE_RING_ENQDEQ_CALLBACKS + struct rte_ring_callback *cb = NULL; + + TAILQ_FOREACH(cb, &(r->enq_cbs), next) { + if (cb->cb_fn == NULL) + continue; + + cb->active = 1; + n = cb->cb_fn(r, obj_table, n, cb->cb_arg); + cb->active = 0; + } + +#endif + return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE, r->prod.single, free_space); } @@ -881,6 +914,20 @@ static __rte_always_inline unsigned rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { +#ifdef RTE_RING_ENQDEQ_CALLBACKS + struct rte_ring_callback *cb = NULL; + + TAILQ_FOREACH(cb, &(r->deq_cbs), next) { + if (cb->cb_fn == NULL) + continue; + + cb->active = 1; + n = cb->cb_fn(r, obj_table, n, cb->cb_arg); + cb->active = 0; + } + +#endif + return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE, __IS_MC, available); } @@ -938,6 +985,74 @@ rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, r->cons.single, available); } +/** + * Register user callback function with argument for pre-enqueue + * to the ring. + * + * @param r + * A pointer to the ring structure. + * @param cb_fn + * User callback function to be invoked. + * @param cb_arg + * user callback arguments to shared in callback process. + * @return + * 0 on success, or a negative value on error. + */ +int __rte_experimental +rte_ring_preenq_callback_register(struct rte_ring *r, + rte_ring_cb_fn cb_fn, void *cb_arg); + +/** + * Register user callback function with argument for post-dequeue + * to the ring. + * + * @param r + * A pointer to the ring structure. + * @param cb_fn + * User callback function to be invoked. + * @param cb_arg + * user callback arguments to shared in callback process. + * @return + * 0 on success, or a negative value on error. + */ +int __rte_experimental +rte_ring_pstdeq_callback_register(struct rte_ring *r, + rte_ring_cb_fn cb_fn, void *cb_arg); + +/** + * Unregister user callback function with argument for pre-enqueue + * to the ring. + * + * @param r + * A pointer to the ring structure. + * @param cb_fn + * User callback function to be invoked. + * @param cb_arg + * user callback arguments to shared in callback process. + * @return + * 0 on success, or a negative value on error. + */ +int __rte_experimental +rte_ring_preenq_callback_unregister(struct rte_ring *r, + rte_ring_cb_fn cb_fn, void *cb_arg); + +/** + * Unregister user callback function with argument for post-dequeue + * to the ring. + * + * @param r + * A pointer to the ring structure. + * @param cb_fn + * User callback function to be invoked. + * @param cb_arg + * user callback arguments to shared in callback process. + * @return + * 0 on success, or a negative value on error. + */ +int __rte_experimental +rte_ring_pstdeq_callback_unregister(struct rte_ring *r, + rte_ring_cb_fn cb_fn, void *cb_arg); + #ifdef __cplusplus } #endif diff --git a/lib/librte_ring/rte_ring_version.map b/lib/librte_ring/rte_ring_version.map index d935efd0d..101814fad 100644 --- a/lib/librte_ring/rte_ring_version.map +++ b/lib/librte_ring/rte_ring_version.map @@ -17,3 +17,12 @@ DPDK_2.2 { rte_ring_free; } DPDK_2.0; + +EXPERIMENTAL { + global: + + rte_ring_preenq_callback_register; + rte_ring_preenq_callback_unregister; + rte_ring_pstdeq_callback_register; + rte_ring_pstdeq_callback_unregister; +};