From patchwork Wed Mar 6 15:03:39 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Eads, Gage" X-Patchwork-Id: 50863 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 88D7D6C9B; Wed, 6 Mar 2019 16:04:16 +0100 (CET) Received: from mga12.intel.com (mga12.intel.com [192.55.52.136]) by dpdk.org (Postfix) with ESMTP id C3DBE5F11 for ; Wed, 6 Mar 2019 16:04:09 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga002.jf.intel.com ([10.7.209.21]) by fmsmga106.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 06 Mar 2019 07:04:08 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.58,448,1544515200"; d="scan'208";a="139634880" Received: from txasoft-yocto.an.intel.com ([10.123.72.192]) by orsmga002.jf.intel.com with ESMTP; 06 Mar 2019 07:04:07 -0800 From: Gage Eads To: dev@dpdk.org Cc: olivier.matz@6wind.com, arybchenko@solarflare.com, bruce.richardson@intel.com, konstantin.ananyev@intel.com, stephen@networkplumber.org, jerinj@marvell.com, mczekaj@marvell.com, nd@arm.com, Ola.Liljedahl@arm.com Date: Wed, 6 Mar 2019 09:03:39 -0600 Message-Id: <20190306150342.2894-4-gage.eads@intel.com> X-Mailer: git-send-email 2.13.6 In-Reply-To: <20190306150342.2894-1-gage.eads@intel.com> References: <20190305174019.9693-1-gage.eads@intel.com> <20190306150342.2894-1-gage.eads@intel.com> Subject: [dpdk-dev] [PATCH v6 3/6] ring: add a lock-free implementation X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" This commit adds support for lock-free circular ring enqueue and dequeue functions. The ring is supported on 32- and 64-bit architectures, however it uses a 128-bit compare-and-swap instruction when run on a 64-bit architecture, and thus is currently limited to x86_64. The algorithm is based on Ola Liljedahl's lfring, modified to fit within the rte ring API. With no contention, an enqueue of n pointers uses (1 + n) CAS operations and a dequeue of n pointers uses 1. This algorithm has worse average-case performance than the regular rte ring (particularly a highly-contended ring with large bulk accesses), however: - For applications with preemptible pthreads, the regular rte ring's worst-case performance (i.e. one thread being preempted in the update_tail() critical section) is much worse than the lock-free ring's. - Software caching can mitigate the average case performance for ring-based algorithms. For example, a lock-free ring based mempool (a likely use case for this ring) with per-thread caching. To avoid the ABA problem, each ring entry contains a modification counter. On a 64-bit architecture, the chance of ABA occurring are effectively zero; a 64-bit counter will take many years to wrap at current CPU frequencies. On a 32-bit architectures, a lock-free ring must be at least 1024-entries deep; assuming 100 cycles per ring entry access, this guarantees the ring's modification counters will wrap on the order of days. The lock-free ring is enabled via a new flag, RING_F_LF. Because the ring's memsize is now a function of its flags (the lock-free ring requires 128b for each entry), this commit adds a new argument ('flags') to rte_ring_get_memsize(). An API deprecation notice will be sent in a separate commit. For ease-of-use, existing ring enqueue and dequeue functions work on both regular and lock-free rings. This introduces an additional branch in the datapath, but this should be a highly predictable branch. ring_perf_autotest shows a negligible performance impact; it's hard to distinguish a real difference versus system noise. | ring_perf_autotest cycles with branch - Test | ring_perf_autotest cycles without ------------------------------------------------------------------ SP/SC single enq/dequeue | 0.33 MP/MC single enq/dequeue | -4.00 SP/SC burst enq/dequeue (size 8) | 0.00 MP/MC burst enq/dequeue (size 8) | 0.00 SP/SC burst enq/dequeue (size 32) | 0.00 MP/MC burst enq/dequeue (size 32) | 0.00 SC empty dequeue | 1.00 MC empty dequeue | 0.00 Single lcore: SP/SC bulk enq/dequeue (size 8) | 0.49 MP/MC bulk enq/dequeue (size 8) | 0.08 SP/SC bulk enq/dequeue (size 32) | 0.07 MP/MC bulk enq/dequeue (size 32) | 0.09 Two physical cores: SP/SC bulk enq/dequeue (size 8) | 0.19 MP/MC bulk enq/dequeue (size 8) | -0.37 SP/SC bulk enq/dequeue (size 32) | 0.09 MP/MC bulk enq/dequeue (size 32) | -0.05 Two NUMA nodes: SP/SC bulk enq/dequeue (size 8) | -1.96 MP/MC bulk enq/dequeue (size 8) | 0.88 SP/SC bulk enq/dequeue (size 32) | 0.10 MP/MC bulk enq/dequeue (size 32) | 0.46 Test setup: x86_64 build with default config, dual-socket Xeon E5-2699 v4, running on isolcpus cores with a tickless scheduler. Each test run three times and the results averaged. Signed-off-by: Gage Eads --- lib/librte_ring/rte_ring.c | 92 +++++++-- lib/librte_ring/rte_ring.h | 308 ++++++++++++++++++++++++++--- lib/librte_ring/rte_ring_c11_mem.h | 366 ++++++++++++++++++++++++++++++++++- lib/librte_ring/rte_ring_generic.h | 354 +++++++++++++++++++++++++++++++++ lib/librte_ring/rte_ring_version.map | 7 + 5 files changed, 1080 insertions(+), 47 deletions(-) diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index d215acecc..d4a176f57 100644 --- a/lib/librte_ring/rte_ring.c +++ b/lib/librte_ring/rte_ring.c @@ -45,9 +45,9 @@ EAL_REGISTER_TAILQ(rte_ring_tailq) /* return the size of memory occupied by a ring */ ssize_t -rte_ring_get_memsize(unsigned count) +rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags) { - ssize_t sz; + ssize_t sz, elt_sz; /* count must be a power of 2 */ if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) { @@ -57,10 +57,23 @@ rte_ring_get_memsize(unsigned count) return -EINVAL; } - sz = sizeof(struct rte_ring) + count * sizeof(void *); + elt_sz = (flags & RING_F_LF) ? 2 * sizeof(void *) : sizeof(void *); + + sz = sizeof(struct rte_ring) + count * elt_sz; sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); return sz; } +BIND_DEFAULT_SYMBOL(rte_ring_get_memsize, _v1905, 19.05); +MAP_STATIC_SYMBOL(ssize_t rte_ring_get_memsize(unsigned int count, + unsigned int flags), + rte_ring_get_memsize_v1905); + +ssize_t +rte_ring_get_memsize_v20(unsigned int count) +{ + return rte_ring_get_memsize_v1905(count, 0); +} +VERSION_SYMBOL(rte_ring_get_memsize, _v20, 2.0); int rte_ring_init(struct rte_ring *r, const char *name, unsigned count, @@ -75,6 +88,8 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count, RTE_CACHE_LINE_MASK) != 0); RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) & RTE_CACHE_LINE_MASK) != 0); + RTE_BUILD_BUG_ON(sizeof(struct rte_ring_lf_entry) != + 2 * sizeof(void *)); /* init the ring structure */ memset(r, 0, sizeof(*r)); @@ -82,8 +97,6 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count, if (ret < 0 || ret >= (int)sizeof(r->name)) return -ENAMETOOLONG; r->flags = flags; - r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP; - r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC; if (flags & RING_F_EXACT_SZ) { r->size = rte_align32pow2(count + 1); @@ -100,12 +113,46 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count, r->mask = count - 1; r->capacity = r->mask; } - r->prod.head = r->cons.head = 0; - r->prod.tail = r->cons.tail = 0; + + r->log2_size = rte_log2_u64(r->size); + + if (flags & RING_F_LF) { + uint32_t i; + + r->prod_ptr.single = + (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP; + r->cons_ptr.single = + (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC; + r->prod_ptr.head = r->cons_ptr.head = 0; + r->prod_ptr.tail = r->cons_ptr.tail = 0; + + for (i = 0; i < r->size; i++) { + struct rte_ring_lf_entry *ring_ptr, *base; + + base = (struct rte_ring_lf_entry *)&r->ring; + + ring_ptr = &base[i & r->mask]; + + ring_ptr->cnt = 0; + } + } else { + r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP; + r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC; + r->prod.head = r->cons.head = 0; + r->prod.tail = r->cons.tail = 0; + } return 0; } +/* If a ring entry is written on average every M cycles, then a ring entry is + * reused every M*count cycles, and a ring entry's counter repeats every + * M*count*2^32 cycles. If M=100 on a 2GHz system, then a 1024-entry ring's + * counters would repeat every 2.37 days. The likelihood of ABA occurring is + * considered sufficiently low for 1024-entry and larger rings. + */ +#define MIN_32_BIT_LF_RING_SIZE 1024 + /* create the ring */ struct rte_ring * rte_ring_create(const char *name, unsigned count, int socket_id, @@ -123,11 +170,25 @@ rte_ring_create(const char *name, unsigned count, int socket_id, ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list); +#ifdef RTE_ARCH_64 +#if !defined(RTE_ARCH_X86_64) + printf("This platform does not support the atomic operation required for RING_F_LF\n"); + rte_errno = EINVAL; + return NULL; +#endif +#else + if ((flags & RING_F_LF) && count < MIN_32_BIT_LF_RING_SIZE) { + printf("RING_F_LF is only supported on 32-bit platforms for rings with at least 1024 entries.\n"); + rte_errno = EINVAL; + return NULL; + } +#endif + /* for an exact size ring, round up from count to a power of two */ if (flags & RING_F_EXACT_SZ) count = rte_align32pow2(count + 1); - ring_size = rte_ring_get_memsize(count); + ring_size = rte_ring_get_memsize(count, flags); if (ring_size < 0) { rte_errno = ring_size; return NULL; @@ -227,10 +288,17 @@ rte_ring_dump(FILE *f, const struct rte_ring *r) fprintf(f, " flags=%x\n", r->flags); fprintf(f, " size=%"PRIu32"\n", r->size); fprintf(f, " capacity=%"PRIu32"\n", r->capacity); - fprintf(f, " ct=%"PRIu32"\n", r->cons.tail); - fprintf(f, " ch=%"PRIu32"\n", r->cons.head); - fprintf(f, " pt=%"PRIu32"\n", r->prod.tail); - fprintf(f, " ph=%"PRIu32"\n", r->prod.head); + if (r->flags & RING_F_LF) { + fprintf(f, " ct=%"PRIuPTR"\n", r->cons_ptr.tail); + fprintf(f, " ch=%"PRIuPTR"\n", r->cons_ptr.head); + fprintf(f, " pt=%"PRIuPTR"\n", r->prod_ptr.tail); + fprintf(f, " ph=%"PRIuPTR"\n", r->prod_ptr.head); + } else { + fprintf(f, " ct=%"PRIu32"\n", r->cons.tail); + fprintf(f, " ch=%"PRIu32"\n", r->cons.head); + fprintf(f, " pt=%"PRIu32"\n", r->prod.tail); + fprintf(f, " ph=%"PRIu32"\n", r->prod.head); + } fprintf(f, " used=%u\n", rte_ring_count(r)); fprintf(f, " avail=%u\n", rte_ring_free_count(r)); } diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index f16d77b8a..200d7b2a0 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -20,7 +20,7 @@ * * - FIFO (First In First Out) * - Maximum size is fixed; the pointers are stored in a table. - * - Lockless implementation. + * - Lockless (and optionally, non-blocking/lock-free) implementation. * - Multi- or single-consumer dequeue. * - Multi- or single-producer enqueue. * - Bulk dequeue. @@ -98,6 +98,7 @@ struct rte_ring { const struct rte_memzone *memzone; /**< Memzone, if any, containing the rte_ring */ uint32_t size; /**< Size of ring. */ + uint32_t log2_size; /**< log2(size of ring) */ uint32_t mask; /**< Mask (size-1) of ring. */ uint32_t capacity; /**< Usable size of ring */ @@ -133,6 +134,18 @@ struct rte_ring { */ #define RING_F_EXACT_SZ 0x0004 #define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */ +/** + * The ring uses lock-free enqueue and dequeue functions. These functions + * do not have the "non-preemptive" constraint of a regular rte ring, and thus + * are suited for applications using preemptible pthreads. However, the + * lock-free functions have worse average-case performance than their regular + * rte ring counterparts. When used as the handler for a mempool, per-thread + * caching can mitigate the performance difference by reducing the number (and + * contention) of ring accesses. + * + * This flag is only supported on 32-bit and x86_64 platforms. + */ +#define RING_F_LF 0x0008 /* @internal defines for passing to the enqueue dequeue worker functions */ #define __IS_SP 1 @@ -150,11 +163,15 @@ struct rte_ring { * * @param count * The number of elements in the ring (must be a power of 2). + * @param flags + * The flags the ring will be created with. * @return * - The memory size needed for the ring on success. * - -EINVAL if count is not a power of 2. */ -ssize_t rte_ring_get_memsize(unsigned count); +ssize_t rte_ring_get_memsize(unsigned int count, unsigned int flags); +ssize_t rte_ring_get_memsize_v20(unsigned int count); +ssize_t rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags); /** * Initialize a ring structure. @@ -187,6 +204,10 @@ ssize_t rte_ring_get_memsize(unsigned count); * - RING_F_SC_DEQ: If this flag is set, the default behavior when * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` * is "single-consumer". Otherwise, it is "multi-consumers". + * - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2 + * number, but up to half the ring space may be wasted. + * - RING_F_LF: If this flag is set, the ring uses lock-free variants of the + * dequeue and enqueue functions. * @return * 0 on success, or a negative value on error. */ @@ -222,12 +243,17 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned count, * - RING_F_SC_DEQ: If this flag is set, the default behavior when * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` * is "single-consumer". Otherwise, it is "multi-consumers". + * - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2 + * number, but up to half the ring space may be wasted. + * - RING_F_LF: If this flag is set, the ring uses lock-free variants of the + * dequeue and enqueue functions. * @return * On success, the pointer to the new allocated ring. NULL on error with * rte_errno set appropriately. Possible errno values include: * - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure * - E_RTE_SECONDARY - function was called from a secondary process instance - * - EINVAL - count provided is not a power of 2 + * - EINVAL - count provided is not a power of 2, or RING_F_LF is used on an + * unsupported platform * - ENOSPC - the maximum number of memzones has already been allocated * - EEXIST - a memzone with the same name already exists * - ENOMEM - no appropriate memory area found in which to create memzone @@ -283,6 +309,50 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r); } \ } while (0) +/* The actual enqueue of pointers on the lock-free ring, used by the + * single-producer lock-free enqueue function. + */ +#define ENQUEUE_PTRS_LF(r, base, prod_head, obj_table, n) do { \ + unsigned int i; \ + const uint32_t size = (r)->size; \ + size_t idx = prod_head & (r)->mask; \ + size_t new_cnt = prod_head + size; \ + struct rte_ring_lf_entry *ring = (struct rte_ring_lf_entry *)base; \ + unsigned int mask = ~0x3; \ + if (likely(idx + n < size)) { \ + for (i = 0; i < (n & mask); i += 4, idx += 4) { \ + ring[idx].ptr = obj_table[i]; \ + ring[idx].cnt = (new_cnt + i) >> r->log2_size; \ + ring[idx + 1].ptr = obj_table[i + 1]; \ + ring[idx + 1].cnt = (new_cnt + i + 1) >> r->log2_size; \ + ring[idx + 2].ptr = obj_table[i + 2]; \ + ring[idx + 2].cnt = (new_cnt + i + 2) >> r->log2_size; \ + ring[idx + 3].ptr = obj_table[i + 3]; \ + ring[idx + 3].cnt = (new_cnt + i + 3) >> r->log2_size; \ + } \ + switch (n & 0x3) { \ + case 3: \ + ring[idx].cnt = (new_cnt + i) >> r->log2_size; \ + ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \ + case 2: \ + ring[idx].cnt = (new_cnt + i) >> r->log2_size; \ + ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \ + case 1: \ + ring[idx].cnt = (new_cnt + i) >> r->log2_size; \ + ring[idx++].ptr = obj_table[i++]; \ + } \ + } else { \ + for (i = 0; idx < size; i++, idx++) { \ + ring[idx].cnt = (new_cnt + i) >> r->log2_size; \ + ring[idx].ptr = obj_table[i]; \ + } \ + for (idx = 0; i < n; i++, idx++) { \ + ring[idx].cnt = (new_cnt + i) >> r->log2_size; \ + ring[idx].ptr = obj_table[i]; \ + } \ + } \ +} while (0) + /* the actual copy of pointers on the ring to obj_table. * Placed here since identical code needed in both * single and multi consumer dequeue functions */ @@ -314,6 +384,43 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r); } \ } while (0) +/* The actual copy of pointers on the lock-free ring to obj_table. */ +#define DEQUEUE_PTRS_LF(r, base, cons_head, obj_table, n) do { \ + unsigned int i; \ + size_t idx = cons_head & (r)->mask; \ + const uint32_t size = (r)->size; \ + struct rte_ring_lf_entry *ring = (struct rte_ring_lf_entry *)base; \ + unsigned int mask = ~0x3; \ + if (likely(idx + n < size)) { \ + for (i = 0; i < (n & mask); i += 4, idx += 4) {\ + obj_table[i] = ring[idx].ptr; \ + obj_table[i + 1] = ring[idx + 1].ptr; \ + obj_table[i + 2] = ring[idx + 2].ptr; \ + obj_table[i + 3] = ring[idx + 3].ptr; \ + } \ + switch (n & 0x3) { \ + case 3: \ + obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \ + case 2: \ + obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \ + case 1: \ + obj_table[i++] = ring[idx++].ptr; \ + } \ + } else { \ + for (i = 0; idx < size; i++, idx++) \ + obj_table[i] = ring[idx].ptr; \ + for (idx = 0; i < n; i++, idx++) \ + obj_table[i] = ring[idx].ptr; \ + } \ +} while (0) + + +/* @internal 128-bit structure used by the lock-free ring */ +struct rte_ring_lf_entry { + void *ptr; /**< Data pointer */ + uintptr_t cnt; /**< Modification counter */ +}; + /* Between load and load. there might be cpu reorder in weak model * (powerpc/arm). * There are 2 choices for the users @@ -330,6 +437,70 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r); #endif /** + * @internal Enqueue several objects on the lock-free ring + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring + * @param is_sp + * Indicates whether to use single producer or multi-producer head update + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_lf_enqueue(struct rte_ring *r, void * const *obj_table, + unsigned int n, enum rte_ring_queue_behavior behavior, + unsigned int is_sp, unsigned int *free_space) +{ + if (is_sp) + return __rte_ring_do_lf_enqueue_sp(r, obj_table, n, + behavior, free_space); + else + return __rte_ring_do_lf_enqueue_mp(r, obj_table, n, + behavior, free_space); +} + +/** + * @internal Dequeue several objects from the lock-free ring + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_lf_dequeue(struct rte_ring *r, void **obj_table, + unsigned int n, enum rte_ring_queue_behavior behavior, + unsigned int is_sc, unsigned int *available) +{ + if (is_sc) + return __rte_ring_do_lf_dequeue_sc(r, obj_table, n, + behavior, available); + else + return __rte_ring_do_lf_dequeue_mc(r, obj_table, n, + behavior, available); +} + +/** * @internal Enqueue several objects on the ring * * @param r @@ -436,8 +607,14 @@ static __rte_always_inline unsigned int rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - __IS_MP, free_space); + if (r->flags & RING_F_LF) + return __rte_ring_do_lf_enqueue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, __IS_MP, + free_space); + else + return __rte_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, __IS_MP, + free_space); } /** @@ -459,8 +636,14 @@ static __rte_always_inline unsigned int rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - __IS_SP, free_space); + if (r->flags & RING_F_LF) + return __rte_ring_do_lf_enqueue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, __IS_SP, + free_space); + else + return __rte_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, __IS_SP, + free_space); } /** @@ -486,8 +669,14 @@ static __rte_always_inline unsigned int rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - r->prod.single, free_space); + if (r->flags & RING_F_LF) + return __rte_ring_do_lf_enqueue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, + r->prod_ptr.single, free_space); + else + return __rte_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, + r->prod.single, free_space); } /** @@ -570,8 +759,14 @@ static __rte_always_inline unsigned int rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - __IS_MC, available); + if (r->flags & RING_F_LF) + return __rte_ring_do_lf_dequeue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, __IS_MC, + available); + else + return __rte_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, __IS_MC, + available); } /** @@ -594,8 +789,14 @@ static __rte_always_inline unsigned int rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - __IS_SC, available); + if (r->flags & RING_F_LF) + return __rte_ring_do_lf_dequeue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, __IS_SC, + available); + else + return __rte_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, __IS_SC, + available); } /** @@ -621,8 +822,14 @@ static __rte_always_inline unsigned int rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - r->cons.single, available); + if (r->flags & RING_F_LF) + return __rte_ring_do_lf_dequeue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, + r->cons_ptr.single, available); + else + return __rte_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, + r->cons.single, available); } /** @@ -697,9 +904,13 @@ rte_ring_dequeue(struct rte_ring *r, void **obj_p) static inline unsigned rte_ring_count(const struct rte_ring *r) { - uint32_t prod_tail = r->prod.tail; - uint32_t cons_tail = r->cons.tail; - uint32_t count = (prod_tail - cons_tail) & r->mask; + uint32_t count; + + if (r->flags & RING_F_LF) + count = (r->prod_ptr.tail - r->cons_ptr.tail) & r->mask; + else + count = (r->prod.tail - r->cons.tail) & r->mask; + return (count > r->capacity) ? r->capacity : count; } @@ -819,8 +1030,14 @@ static __rte_always_inline unsigned rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space); + if (r->flags & RING_F_LF) + return __rte_ring_do_lf_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + __IS_MP, free_space); + else + return __rte_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + __IS_MP, free_space); } /** @@ -842,8 +1059,14 @@ static __rte_always_inline unsigned rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space); + if (r->flags & RING_F_LF) + return __rte_ring_do_lf_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + __IS_SP, free_space); + else + return __rte_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + __IS_SP, free_space); } /** @@ -869,8 +1092,14 @@ static __rte_always_inline unsigned rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE, - r->prod.single, free_space); + if (r->flags & RING_F_LF) + return __rte_ring_do_lf_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + r->prod_ptr.single, free_space); + else + return __rte_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + r->prod.single, free_space); } /** @@ -897,8 +1126,14 @@ static __rte_always_inline unsigned rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, __IS_MC, available); + if (r->flags & RING_F_LF) + return __rte_ring_do_lf_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + __IS_MC, available); + else + return __rte_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + __IS_MC, available); } /** @@ -922,8 +1157,14 @@ static __rte_always_inline unsigned rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, __IS_SC, available); + if (r->flags & RING_F_LF) + return __rte_ring_do_lf_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + __IS_SC, available); + else + return __rte_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + __IS_SC, available); } /** @@ -949,9 +1190,14 @@ static __rte_always_inline unsigned rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, - r->cons.single, available); + if (r->flags & RING_F_LF) + return __rte_ring_do_lf_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + r->cons_ptr.single, available); + else + return __rte_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + r->cons.single, available); } #ifdef __cplusplus diff --git a/lib/librte_ring/rte_ring_c11_mem.h b/lib/librte_ring/rte_ring_c11_mem.h index 545caf257..a672d161e 100644 --- a/lib/librte_ring/rte_ring_c11_mem.h +++ b/lib/librte_ring/rte_ring_c11_mem.h @@ -221,8 +221,8 @@ __rte_ring_move_prod_head_ptr(struct rte_ring *r, unsigned int is_sp, /* Ensure the head is read before tail */ __atomic_thread_fence(__ATOMIC_ACQUIRE); - /* load-acquire synchronize with store-release of ht->tail - * in update_tail. + /* load-acquire synchronize with store-release of tail in + * __rte_ring_do_lf_dequeue_{sc, mc}. */ cons_tail = __atomic_load_n(&r->cons_ptr.tail, __ATOMIC_ACQUIRE); @@ -247,6 +247,7 @@ __rte_ring_move_prod_head_ptr(struct rte_ring *r, unsigned int is_sp, 0, __ATOMIC_RELAXED, __ATOMIC_RELAXED); } while (unlikely(success == 0)); + return n; } @@ -293,8 +294,8 @@ __rte_ring_move_cons_head_ptr(struct rte_ring *r, unsigned int is_sc, /* Ensure the head is read before tail */ __atomic_thread_fence(__ATOMIC_ACQUIRE); - /* this load-acquire synchronize with store-release of ht->tail - * in update_tail. + /* load-acquire synchronize with store-release of tail in + * __rte_ring_do_lf_enqueue_{sp, mp}. */ prod_tail = __atomic_load_n(&r->prod_ptr.tail, __ATOMIC_ACQUIRE); @@ -318,6 +319,363 @@ __rte_ring_move_cons_head_ptr(struct rte_ring *r, unsigned int is_sc, 0, __ATOMIC_RELAXED, __ATOMIC_RELAXED); } while (unlikely(success == 0)); + + return n; +} + +/** + * @internal + * Enqueue several objects on the lock-free ring (single-producer only) + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_lf_enqueue_sp(struct rte_ring *r, void * const *obj_table, + unsigned int n, + enum rte_ring_queue_behavior behavior, + unsigned int *free_space) +{ + uint32_t free_entries; + uintptr_t head, next; + + n = __rte_ring_move_prod_head_ptr(r, 1, n, behavior, + &head, &next, &free_entries); + if (n == 0) + goto end; + + ENQUEUE_PTRS_LF(r, &r->ring, head, obj_table, n); + + __atomic_store_n(&r->prod_ptr.tail, + r->prod_ptr.tail + n, + __ATOMIC_RELEASE); +end: + if (free_space != NULL) + *free_space = free_entries - n; + return n; +} + +/* This macro defines the number of times an enqueueing thread can fail to find + * a free ring slot before reloading its producer tail index. + */ +#define ENQ_RETRY_LIMIT 32 + +/** + * @internal + * Get the next producer tail index. + * + * @param r + * A pointer to the ring structure. + * @param idx + * The local tail index + * @return + * If the ring's tail is ahead of the local tail, return the shared tail. + * Else, return tail + 1. + */ +static __rte_always_inline uintptr_t +__rte_ring_reload_tail(struct rte_ring *r, uintptr_t idx) +{ + uintptr_t fresh = __atomic_load_n(&r->prod_ptr.tail, __ATOMIC_RELAXED); + + if ((intptr_t)(idx - fresh) < 0) + idx = fresh; /* fresh is after idx, use it instead */ + else + idx++; /* Continue with next slot */ + + return idx; +} + +/** + * @internal + * Update the ring's producer tail index. If another thread already updated + * the index beyond the caller's tail value, do nothing. + * + * @param r + * A pointer to the ring structure. + * @param idx + * The local tail index + * @return + * If the shared tail is ahead of the local tail, return the shared tail. + * Else, return tail + 1. + */ +static __rte_always_inline uintptr_t +__rte_ring_lf_update_tail(struct rte_ring *r, uintptr_t val) +{ + volatile uintptr_t *loc = &r->prod_ptr.tail; + uintptr_t old = __atomic_load_n(loc, __ATOMIC_RELAXED); + + do { + /* Check if the tail has already been updated. */ + if ((intptr_t)(val - old) < 0) + return old; + + /* Else val >= old, need to update *loc */ + } while (!__atomic_compare_exchange_n(loc, &old, val, + 1, __ATOMIC_RELEASE, + __ATOMIC_RELAXED)); + + return val; +} + +/** + * @internal + * Enqueue several objects on the lock-free ring (multi-producer safe) + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_lf_enqueue_mp(struct rte_ring *r, void * const *obj_table, + unsigned int n, + enum rte_ring_queue_behavior behavior, + unsigned int *free_space) +{ +#if !defined(ALLOW_EXPERIMENTAL_API) + RTE_SET_USED(r); + RTE_SET_USED(obj_table); + RTE_SET_USED(n); + RTE_SET_USED(behavior); + RTE_SET_USED(free_space); + printf("[%s()] RING_F_LF requires an experimental API." + " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n" + , __func__); + return 0; +#else + struct rte_ring_lf_entry *base; + uintptr_t head, next, tail; + unsigned int i; + uint32_t avail; + + /* Atomically update the prod head to reserve n slots. The prod tail + * is modified at the end of the function. + */ + n = __rte_ring_move_prod_head_ptr(r, 0, n, behavior, + &head, &next, &avail); + + tail = __atomic_load_n(&r->prod_ptr.tail, __ATOMIC_RELAXED); + head = __atomic_load_n(&r->cons_ptr.tail, __ATOMIC_ACQUIRE); + + if (unlikely(n == 0)) + goto end; + + base = (struct rte_ring_lf_entry *)&r->ring; + + for (i = 0; i < n; i++) { + unsigned int retries = 0; + int success = 0; + + /* Enqueue to the tail entry. If another thread wins the race, + * retry with the new tail. + */ + do { + struct rte_ring_lf_entry old_value, new_value; + struct rte_ring_lf_entry *ring_ptr; + + ring_ptr = &base[tail & r->mask]; + + old_value = *ring_ptr; + + if (old_value.cnt != (tail >> r->log2_size)) { + /* This slot has already been used. Depending + * on how far behind this thread is, either go + * to the next slot or reload the tail. + */ + uintptr_t prev_tail; + + prev_tail = (tail + r->size) >> r->log2_size; + + if (old_value.cnt != prev_tail || + ++retries == ENQ_RETRY_LIMIT) { + /* This thread either fell 2+ laps + * behind or hit the retry limit, so + * reload the tail index. + */ + tail = __rte_ring_reload_tail(r, tail); + retries = 0; + } else { + /* Slot already used, try the next. */ + tail++; + + } + + continue; + } + + /* Found a free slot, try to enqueue next element. */ + new_value.ptr = obj_table[i]; + new_value.cnt = (tail + r->size) >> r->log2_size; + +#ifdef RTE_ARCH_64 + success = rte_atomic128_cmp_exchange( + (rte_int128_t *)ring_ptr, + (rte_int128_t *)&old_value, + (rte_int128_t *)&new_value, + 1, __ATOMIC_RELEASE, + __ATOMIC_RELAXED); +#else + success = __atomic_compare_exchange( + (uint64_t *)ring_ptr, + &old_value, + &new_value, + 1, __ATOMIC_RELEASE, + __ATOMIC_RELAXED); +#endif + } while (success == 0); + + /* Only increment tail if the CAS succeeds, since it can + * spuriously fail on some architectures. + */ + tail++; + } + +end: + tail = __rte_ring_lf_update_tail(r, tail); + + if (free_space != NULL) + *free_space = avail - n; + return n; +#endif +} + +/** + * @internal + * Dequeue several objects from the lock-free ring (single-consumer only) + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_lf_dequeue_sc(struct rte_ring *r, void **obj_table, + unsigned int n, + enum rte_ring_queue_behavior behavior, + unsigned int *available) +{ + uintptr_t cons_tail, prod_tail, avail; + + cons_tail = __atomic_load_n(&r->cons_ptr.tail, __ATOMIC_RELAXED); + prod_tail = __atomic_load_n(&r->prod_ptr.tail, __ATOMIC_ACQUIRE); + + avail = prod_tail - cons_tail; + + /* Set the actual entries for dequeue */ + if (unlikely(avail < n)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail; + + if (unlikely(n == 0)) + goto end; + + DEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n); + + /* Use a read barrier and store-relaxed so we don't unnecessarily order + * writes. + */ + rte_smp_rmb(); + + __atomic_store_n(&r->cons_ptr.tail, cons_tail + n, __ATOMIC_RELAXED); +end: + if (available != NULL) + *available = avail - n; + + return n; +} + +/** + * @internal + * Dequeue several objects from the lock-free ring (multi-consumer safe) + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_lf_dequeue_mc(struct rte_ring *r, void **obj_table, + unsigned int n, + enum rte_ring_queue_behavior behavior, + unsigned int *available) +{ + uintptr_t cons_tail, prod_tail, avail; + + cons_tail = __atomic_load_n(&r->cons_ptr.tail, __ATOMIC_RELAXED); + + do { + /* Load tail on every iteration to avoid spurious queue empty + * situations. + */ + prod_tail = __atomic_load_n(&r->prod_ptr.tail, + __ATOMIC_ACQUIRE); + + avail = prod_tail - cons_tail; + + /* Set the actual entries for dequeue */ + if (unlikely(avail < n)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail; + + if (unlikely(n == 0)) + goto end; + + DEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n); + + /* Use a read barrier and store-relaxed so we don't + * unnecessarily order writes. + */ + rte_smp_rmb(); + + } while (!__atomic_compare_exchange_n(&r->cons_ptr.tail, + &cons_tail, cons_tail + n, + 0, __ATOMIC_RELAXED, + __ATOMIC_RELAXED)); + +end: + if (available != NULL) + *available = avail - n; + return n; } diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h index 6a0e1bbfb..944b353f4 100644 --- a/lib/librte_ring/rte_ring_generic.h +++ b/lib/librte_ring/rte_ring_generic.h @@ -297,4 +297,358 @@ __rte_ring_move_cons_head_ptr(struct rte_ring *r, unsigned int is_sc, return n; } +/** + * @internal + * Enqueue several objects on the lock-free ring (single-producer only) + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_lf_enqueue_sp(struct rte_ring *r, void * const *obj_table, + unsigned int n, + enum rte_ring_queue_behavior behavior, + unsigned int *free_space) +{ + uint32_t free_entries; + uintptr_t head, next; + + n = __rte_ring_move_prod_head_ptr(r, 1, n, behavior, + &head, &next, &free_entries); + if (n == 0) + goto end; + + ENQUEUE_PTRS_LF(r, &r->ring, head, obj_table, n); + + rte_smp_wmb(); + + r->prod_ptr.tail += n; +end: + if (free_space != NULL) + *free_space = free_entries - n; + return n; +} + +/* This macro defines the number of times an enqueueing thread can fail to find + * a free ring slot before reloading its producer tail index. + */ +#define ENQ_RETRY_LIMIT 32 + +/** + * @internal + * Get the next producer tail index. + * + * @param r + * A pointer to the ring structure. + * @param idx + * The local tail index + * @return + * If the ring's tail is ahead of the local tail, return the shared tail. + * Else, return tail + 1. + */ +static __rte_always_inline uintptr_t +__rte_ring_reload_tail(struct rte_ring *r, uintptr_t idx) +{ + uintptr_t fresh = r->prod_ptr.tail; + + if ((intptr_t)(idx - fresh) < 0) + /* fresh is after idx, use it instead */ + idx = fresh; + else + /* Continue with next slot */ + idx++; + + return idx; +} + +/** + * @internal + * Update the ring's producer tail index. If another thread already updated + * the index beyond the caller's tail value, do nothing. + * + * @param r + * A pointer to the ring structure. + * @param idx + * The local tail index + * @return + * If the shared tail is ahead of the local tail, return the shared tail. + * Else, return tail + 1. + */ +static __rte_always_inline uintptr_t +__rte_ring_lf_update_tail(struct rte_ring *r, uintptr_t val) +{ + volatile uintptr_t *loc = &r->prod_ptr.tail; + uintptr_t old = *loc; + + do { + /* Check if the tail has already been updated. */ + if ((intptr_t)(val - old) < 0) + return old; + + /* Else val >= old, need to update *loc */ + } while (!__sync_bool_compare_and_swap(loc, old, val)); + + return val; +} + +/** + * @internal + * Enqueue several objects on the lock-free ring (multi-producer safe) + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_lf_enqueue_mp(struct rte_ring *r, void * const *obj_table, + unsigned int n, + enum rte_ring_queue_behavior behavior, + unsigned int *free_space) +{ +#if !defined(ALLOW_EXPERIMENTAL_API) + RTE_SET_USED(r); + RTE_SET_USED(obj_table); + RTE_SET_USED(n); + RTE_SET_USED(behavior); + RTE_SET_USED(free_space); + printf("[%s()] RING_F_LF requires an experimental API." + " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n" + , __func__); + return 0; +#else + struct rte_ring_lf_entry *base; + uintptr_t head, next, tail; + unsigned int i; + uint32_t avail; + + /* Atomically update the prod head to reserve n slots. The prod tail + * is modified at the end of the function. + */ + n = __rte_ring_move_prod_head_ptr(r, 0, n, behavior, + &head, &next, &avail); + + tail = r->prod_ptr.tail; + + rte_smp_rmb(); + + head = r->cons_ptr.tail; + + if (unlikely(n == 0)) + goto end; + + base = (struct rte_ring_lf_entry *)&r->ring; + + for (i = 0; i < n; i++) { + unsigned int retries = 0; + int success = 0; + + /* Enqueue to the tail entry. If another thread wins the race, + * retry with the new tail. + */ + do { + struct rte_ring_lf_entry old_value, new_value; + struct rte_ring_lf_entry *ring_ptr; + + ring_ptr = &base[tail & r->mask]; + + old_value = *ring_ptr; + + if (old_value.cnt != (tail >> r->log2_size)) { + /* This slot has already been used. Depending + * on how far behind this thread is, either go + * to the next slot or reload the tail. + */ + uintptr_t prev_tail; + + prev_tail = (tail + r->size) >> r->log2_size; + + if (old_value.cnt != prev_tail || + ++retries == ENQ_RETRY_LIMIT) { + /* This thread either fell 2+ laps + * behind or hit the retry limit, so + * reload the tail index. + */ + tail = __rte_ring_reload_tail(r, tail); + retries = 0; + } else { + /* Slot already used, try the next. */ + tail++; + + } + + continue; + } + + /* Found a free slot, try to enqueue next element. */ + new_value.ptr = obj_table[i]; + new_value.cnt = (tail + r->size) >> r->log2_size; + +#ifdef RTE_ARCH_64 + success = rte_atomic128_cmp_exchange( + (rte_int128_t *)ring_ptr, + (rte_int128_t *)&old_value, + (rte_int128_t *)&new_value, + 1, __ATOMIC_RELEASE, + __ATOMIC_RELAXED); +#else + uint64_t *old_ptr = (uint64_t *)&old_value; + uint64_t *new_ptr = (uint64_t *)&new_value; + + success = rte_atomic64_cmpset( + (volatile uint64_t *)ring_ptr, + *old_ptr, *new_ptr); +#endif + } while (success == 0); + + /* Only increment tail if the CAS succeeds, since it can + * spuriously fail on some architectures. + */ + tail++; + } + +end: + + tail = __rte_ring_lf_update_tail(r, tail); + + if (free_space != NULL) + *free_space = avail - n; + return n; +#endif +} + +/** + * @internal + * Dequeue several objects from the lock-free ring (single-consumer only) + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_lf_dequeue_sc(struct rte_ring *r, void **obj_table, + unsigned int n, + enum rte_ring_queue_behavior behavior, + unsigned int *available) +{ + uintptr_t cons_tail, prod_tail, avail; + + cons_tail = r->cons_ptr.tail; + + rte_smp_rmb(); + + prod_tail = r->prod_ptr.tail; + + avail = prod_tail - cons_tail; + + /* Set the actual entries for dequeue */ + if (unlikely(avail < n)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail; + + if (unlikely(n == 0)) + goto end; + + DEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n); + + rte_smp_rmb(); + + r->cons_ptr.tail += n; +end: + if (available != NULL) + *available = avail - n; + + return n; +} + +/** + * @internal + * Dequeue several objects from the lock-free ring (multi-consumer safe) + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_lf_dequeue_mc(struct rte_ring *r, void **obj_table, + unsigned int n, + enum rte_ring_queue_behavior behavior, + unsigned int *available) +{ + uintptr_t cons_tail, prod_tail, avail; + + cons_tail = r->cons_ptr.tail; + + do { + rte_smp_rmb(); + + /* Load tail on every iteration to avoid spurious queue empty + * situations. + */ + prod_tail = r->prod_ptr.tail; + + avail = prod_tail - cons_tail; + + /* Set the actual entries for dequeue */ + if (unlikely(avail < n)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail; + + if (unlikely(n == 0)) + goto end; + + DEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n); + + } while (!__sync_bool_compare_and_swap(&r->cons_ptr.tail, + cons_tail, cons_tail + n)); + +end: + if (available != NULL) + *available = avail - n; + + return n; +} + #endif /* _RTE_RING_GENERIC_H_ */ diff --git a/lib/librte_ring/rte_ring_version.map b/lib/librte_ring/rte_ring_version.map index d935efd0d..8969467af 100644 --- a/lib/librte_ring/rte_ring_version.map +++ b/lib/librte_ring/rte_ring_version.map @@ -17,3 +17,10 @@ DPDK_2.2 { rte_ring_free; } DPDK_2.0; + +DPDK_19.05 { + global: + + rte_ring_get_memsize; + +} DPDK_2.2;