From patchwork Thu Jan 10 21:01:17 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Eads, Gage" X-Patchwork-Id: 49639 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 74D391B926; Thu, 10 Jan 2019 22:02:37 +0100 (CET) Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id 3EB7B1B90A for ; Thu, 10 Jan 2019 22:02:34 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga003.fm.intel.com ([10.253.24.29]) by orsmga101.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 10 Jan 2019 13:02:32 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.56,462,1539673200"; d="scan'208";a="124971949" Received: from txasoft-yocto.an.intel.com (HELO txasoft-yocto.an.intel.com.) ([10.123.72.192]) by FMSMGA003.fm.intel.com with ESMTP; 10 Jan 2019 13:02:32 -0800 From: Gage Eads To: dev@dpdk.org Cc: olivier.matz@6wind.com, arybchenko@solarflare.com, bruce.richardson@intel.com, konstantin.ananyev@intel.com Date: Thu, 10 Jan 2019 15:01:17 -0600 Message-Id: <20190110210122.24889-2-gage.eads@intel.com> X-Mailer: git-send-email 2.13.6 In-Reply-To: <20190110210122.24889-1-gage.eads@intel.com> References: <20190110210122.24889-1-gage.eads@intel.com> Subject: [dpdk-dev] [PATCH 1/6] ring: change head and tail to pointer-width size X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" For 64-bit architectures, doubling the head and tail index widths greatly increases the time it takes for them to wrap-around (with current CPU speeds, it won't happen within the author's lifetime). This is important in avoiding the ABA problem -- in which a thread mistakes reading the same tail index in two accesses to mean that the ring was not modified in the intervening time -- in the upcoming non-blocking ring implementation. Using a 64-bit index makes the possibility of this occurring effectively zero. I tested this commit's performance impact with an x86_64 build on a dual-socket Xeon E5-2699 v4 using ring_perf_autotest, and the change made no significant difference -- the few differences appear to be system noise. (The test ran on isolcpus cores using a tickless scheduler, but some variation was stll observed.) Each test was run three times and the results were averaged: | 64b head/tail cycle cost minus Test | 32b head/tail cycle cost ------------------------------------------------------------------ SP/SC single enq/dequeue | 0.33 MP/MC single enq/dequeue | 0.00 SP/SC burst enq/dequeue (size 8) | 0.00 MP/MC burst enq/dequeue (size 8) | 1.00 SP/SC burst enq/dequeue (size 32) | 0.00 MP/MC burst enq/dequeue (size 32) | -1.00 SC empty dequeue | 0.01 MC empty dequeue | 0.00 Single lcore: SP/SC bulk enq/dequeue (size 8) | -0.36 MP/MC bulk enq/dequeue (size 8) | 0.99 SP/SC bulk enq/dequeue (size 32) | -0.40 MP/MC bulk enq/dequeue (size 32) | -0.57 Two physical cores: SP/SC bulk enq/dequeue (size 8) | -0.49 MP/MC bulk enq/dequeue (size 8) | 0.19 SP/SC bulk enq/dequeue (size 32) | -0.28 MP/MC bulk enq/dequeue (size 32) | -0.62 Two NUMA nodes: SP/SC bulk enq/dequeue (size 8) | 3.25 MP/MC bulk enq/dequeue (size 8) | 1.87 SP/SC bulk enq/dequeue (size 32) | -0.44 MP/MC bulk enq/dequeue (size 32) | -1.10 An earlier version of this patch changed the head and tail indexes to uint64_t, but that caused a performance drop on 32-bit builds. With uintptr_t, no performance difference is observed on an i686 build. Signed-off-by: Gage Eads --- lib/librte_eventdev/rte_event_ring.h | 6 +++--- lib/librte_ring/rte_ring.c | 10 +++++----- lib/librte_ring/rte_ring.h | 20 ++++++++++---------- lib/librte_ring/rte_ring_generic.h | 16 +++++++++------- 4 files changed, 27 insertions(+), 25 deletions(-) diff --git a/lib/librte_eventdev/rte_event_ring.h b/lib/librte_eventdev/rte_event_ring.h index 827a3209e..eae70f904 100644 --- a/lib/librte_eventdev/rte_event_ring.h +++ b/lib/librte_eventdev/rte_event_ring.h @@ -1,5 +1,5 @@ /* SPDX-License-Identifier: BSD-3-Clause - * Copyright(c) 2016-2017 Intel Corporation + * Copyright(c) 2016-2019 Intel Corporation */ /** @@ -88,7 +88,7 @@ rte_event_ring_enqueue_burst(struct rte_event_ring *r, const struct rte_event *events, unsigned int n, uint16_t *free_space) { - uint32_t prod_head, prod_next; + uintptr_t prod_head, prod_next; uint32_t free_entries; n = __rte_ring_move_prod_head(&r->r, r->r.prod.single, n, @@ -129,7 +129,7 @@ rte_event_ring_dequeue_burst(struct rte_event_ring *r, struct rte_event *events, unsigned int n, uint16_t *available) { - uint32_t cons_head, cons_next; + uintptr_t cons_head, cons_next; uint32_t entries; n = __rte_ring_move_cons_head(&r->r, r->r.cons.single, n, diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index d215acecc..b15ee0eb3 100644 --- a/lib/librte_ring/rte_ring.c +++ b/lib/librte_ring/rte_ring.c @@ -1,6 +1,6 @@ /* SPDX-License-Identifier: BSD-3-Clause * - * Copyright (c) 2010-2015 Intel Corporation + * Copyright (c) 2010-2019 Intel Corporation * Copyright (c) 2007,2008 Kip Macy kmacy@freebsd.org * All rights reserved. * Derived from FreeBSD's bufring.h @@ -227,10 +227,10 @@ rte_ring_dump(FILE *f, const struct rte_ring *r) fprintf(f, " flags=%x\n", r->flags); fprintf(f, " size=%"PRIu32"\n", r->size); fprintf(f, " capacity=%"PRIu32"\n", r->capacity); - fprintf(f, " ct=%"PRIu32"\n", r->cons.tail); - fprintf(f, " ch=%"PRIu32"\n", r->cons.head); - fprintf(f, " pt=%"PRIu32"\n", r->prod.tail); - fprintf(f, " ph=%"PRIu32"\n", r->prod.head); + fprintf(f, " ct=%"PRIuPTR"\n", r->cons.tail); + fprintf(f, " ch=%"PRIuPTR"\n", r->cons.head); + fprintf(f, " pt=%"PRIuPTR"\n", r->prod.tail); + fprintf(f, " ph=%"PRIuPTR"\n", r->prod.head); fprintf(f, " used=%u\n", rte_ring_count(r)); fprintf(f, " avail=%u\n", rte_ring_free_count(r)); } diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index af5444a9f..12af64e13 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -1,6 +1,6 @@ /* SPDX-License-Identifier: BSD-3-Clause * - * Copyright (c) 2010-2017 Intel Corporation + * Copyright (c) 2010-2019 Intel Corporation * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org * All rights reserved. * Derived from FreeBSD's bufring.h @@ -65,8 +65,8 @@ struct rte_memzone; /* forward declaration, so as not to require memzone.h */ /* structure to hold a pair of head/tail values and other metadata */ struct rte_ring_headtail { - volatile uint32_t head; /**< Prod/consumer head. */ - volatile uint32_t tail; /**< Prod/consumer tail. */ + volatile uintptr_t head; /**< Prod/consumer head. */ + volatile uintptr_t tail; /**< Prod/consumer tail. */ uint32_t single; /**< True if single prod/cons */ }; @@ -242,7 +242,7 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r); #define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \ unsigned int i; \ const uint32_t size = (r)->size; \ - uint32_t idx = prod_head & (r)->mask; \ + uintptr_t idx = prod_head & (r)->mask; \ obj_type *ring = (obj_type *)ring_start; \ if (likely(idx + n < size)) { \ for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \ @@ -272,7 +272,7 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r); * single and multi consumer dequeue functions */ #define DEQUEUE_PTRS(r, ring_start, cons_head, obj_table, n, obj_type) do { \ unsigned int i; \ - uint32_t idx = cons_head & (r)->mask; \ + uintptr_t idx = cons_head & (r)->mask; \ const uint32_t size = (r)->size; \ obj_type *ring = (obj_type *)ring_start; \ if (likely(idx + n < size)) { \ @@ -338,7 +338,7 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, unsigned int n, enum rte_ring_queue_behavior behavior, unsigned int is_sp, unsigned int *free_space) { - uint32_t prod_head, prod_next; + uintptr_t prod_head, prod_next; uint32_t free_entries; n = __rte_ring_move_prod_head(r, is_sp, n, behavior, @@ -380,7 +380,7 @@ __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, unsigned int n, enum rte_ring_queue_behavior behavior, unsigned int is_sc, unsigned int *available) { - uint32_t cons_head, cons_next; + uintptr_t cons_head, cons_next; uint32_t entries; n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior, @@ -681,9 +681,9 @@ rte_ring_dequeue(struct rte_ring *r, void **obj_p) static inline unsigned rte_ring_count(const struct rte_ring *r) { - uint32_t prod_tail = r->prod.tail; - uint32_t cons_tail = r->cons.tail; - uint32_t count = (prod_tail - cons_tail) & r->mask; + uintptr_t prod_tail = r->prod.tail; + uintptr_t cons_tail = r->cons.tail; + uintptr_t count = (prod_tail - cons_tail) & r->mask; return (count > r->capacity) ? r->capacity : count; } diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h index ea7dbe5b9..3fd1150f6 100644 --- a/lib/librte_ring/rte_ring_generic.h +++ b/lib/librte_ring/rte_ring_generic.h @@ -1,6 +1,6 @@ /* SPDX-License-Identifier: BSD-3-Clause * - * Copyright (c) 2010-2017 Intel Corporation + * Copyright (c) 2010-2019 Intel Corporation * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org * All rights reserved. * Derived from FreeBSD's bufring.h @@ -11,7 +11,7 @@ #define _RTE_RING_GENERIC_H_ static __rte_always_inline void -update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, +update_tail(struct rte_ring_headtail *ht, uintptr_t old_val, uintptr_t new_val, uint32_t single, uint32_t enqueue) { if (enqueue) @@ -55,7 +55,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, static __rte_always_inline unsigned int __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, unsigned int n, enum rte_ring_queue_behavior behavior, - uint32_t *old_head, uint32_t *new_head, + uintptr_t *old_head, uintptr_t *new_head, uint32_t *free_entries) { const uint32_t capacity = r->capacity; @@ -93,7 +93,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, if (is_sp) r->prod.head = *new_head, success = 1; else - success = rte_atomic32_cmpset(&r->prod.head, + /* Built-in used to handle variable-sized head index. */ + success = __sync_bool_compare_and_swap(&r->prod.head, *old_head, *new_head); } while (unlikely(success == 0)); return n; @@ -125,7 +126,7 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, static __rte_always_inline unsigned int __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc, unsigned int n, enum rte_ring_queue_behavior behavior, - uint32_t *old_head, uint32_t *new_head, + uintptr_t *old_head, uintptr_t *new_head, uint32_t *entries) { unsigned int max = n; @@ -161,8 +162,9 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc, if (is_sc) r->cons.head = *new_head, success = 1; else - success = rte_atomic32_cmpset(&r->cons.head, *old_head, - *new_head); + /* Built-in used to handle variable-sized head index. */ + success = __sync_bool_compare_and_swap(&r->cons.head, + *old_head, *new_head); } while (unlikely(success == 0)); return n; } From patchwork Thu Jan 10 21:01:18 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Eads, Gage" X-Patchwork-Id: 49641 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 67E6A1B93F; Thu, 10 Jan 2019 22:02:40 +0100 (CET) Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id 99CAA1B90D for ; Thu, 10 Jan 2019 22:02:34 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga003.fm.intel.com ([10.253.24.29]) by orsmga101.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 10 Jan 2019 13:02:33 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.56,462,1539673200"; d="scan'208";a="124971953" Received: from txasoft-yocto.an.intel.com (HELO txasoft-yocto.an.intel.com.) ([10.123.72.192]) by FMSMGA003.fm.intel.com with ESMTP; 10 Jan 2019 13:02:32 -0800 From: Gage Eads To: dev@dpdk.org Cc: olivier.matz@6wind.com, arybchenko@solarflare.com, bruce.richardson@intel.com, konstantin.ananyev@intel.com Date: Thu, 10 Jan 2019 15:01:18 -0600 Message-Id: <20190110210122.24889-3-gage.eads@intel.com> X-Mailer: git-send-email 2.13.6 In-Reply-To: <20190110210122.24889-1-gage.eads@intel.com> References: <20190110210122.24889-1-gage.eads@intel.com> Subject: [dpdk-dev] [PATCH 2/6] ring: add a non-blocking implementation X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" This commit adds support for non-blocking circular ring enqueue and dequeue functions. The ring uses a 128-bit compare-and-swap instruction, and thus is limited to x86_64. The algorithm is based on the original rte ring (derived from FreeBSD's bufring.h) and inspired by Michael and Scott's non-blocking concurrent queue. Importantly, it adds a modification counter to each ring entry to ensure only one thread can write to an unused entry. ----- Algorithm: Multi-producer non-blocking enqueue: 1. Move the producer head index 'n' locations forward, effectively reserving 'n' locations. 2. For each pointer: a. Read the producer tail index, then ring[tail]. If ring[tail]'s modification counter isn't 'tail', retry. b. Construct the new entry: {pointer, tail + ring size} c. Compare-and-swap the old entry with the new. If unsuccessful, the next loop iteration will try to enqueue this pointer again. d. Compare-and-swap the tail index with 'tail + 1', whether or not step 2c succeeded. This guarantees threads can make forward progress. Multi-consumer non-blocking dequeue: 1. Move the consumer head index 'n' locations forward, effectively reserving 'n' pointers to be dequeued. 2. Copy 'n' pointers into the caller's object table (ignoring the modification counter), starting from ring[tail], then compare-and-swap the tail index with 'tail + n'. If unsuccessful, repeat step 2. ----- Discussion: There are two cases where the ABA problem is mitigated: 1. Enqueueing a pointer to the ring: without a modification counter tied to the tail index, the index could become stale by the time the enqueue happens, causing it to overwrite valid data. Tying the counter to the tail index gives us an expected value (as opposed to, say, a monotonically incrementing counter). Since the counter will eventually wrap, there is potential for the ABA problem. However, using a 64-bit counter makes this likelihood effectively zero. 2. Updating a tail index: the ABA problem can occur if the thread is preempted and the tail index wraps around. However, using 64-bit indexes makes this likelihood effectively zero. With no contention, an enqueue of n pointers uses (1 + 2n) CAS operations and a dequeue of n pointers uses 2. This algorithm has worse average-case performance than the regular rte ring (particularly a highly-contended ring with large bulk accesses), however: - For applications with preemptible pthreads, the regular rte ring's worst-case performance (i.e. one thread being preempted in the update_tail() critical section) is much worse than the non-blocking ring's. - Software caching can mitigate the average case performance for ring-based algorithms. For example, a non-blocking ring based mempool (a likely use case for this ring) with per-thread caching. The non-blocking ring is enabled via a new flag, RING_F_NB. Because the ring's memsize is now a function of its flags (the non-blocking ring requires 128b for each entry), this commit adds a new argument ('flags') to rte_ring_get_memsize(). For ease-of-use, existing ring enqueue and dequeue functions work on both regular and non-blocking rings. This introduces an additional branch in the datapath, but this should be a highly predictable branch. ring_perf_autotest shows a negligible performance impact; it's hard to distinguish a real difference versus system noise. | ring_perf_autotest cycles with branch - Test | ring_perf_autotest cycles without ------------------------------------------------------------------ SP/SC single enq/dequeue | 0.33 MP/MC single enq/dequeue | -4.00 SP/SC burst enq/dequeue (size 8) | 0.00 MP/MC burst enq/dequeue (size 8) | 0.00 SP/SC burst enq/dequeue (size 32) | 0.00 MP/MC burst enq/dequeue (size 32) | 0.00 SC empty dequeue | 1.00 MC empty dequeue | 0.00 Single lcore: SP/SC bulk enq/dequeue (size 8) | 0.49 MP/MC bulk enq/dequeue (size 8) | 0.08 SP/SC bulk enq/dequeue (size 32) | 0.07 MP/MC bulk enq/dequeue (size 32) | 0.09 Two physical cores: SP/SC bulk enq/dequeue (size 8) | 0.19 MP/MC bulk enq/dequeue (size 8) | -0.37 SP/SC bulk enq/dequeue (size 32) | 0.09 MP/MC bulk enq/dequeue (size 32) | -0.05 Two NUMA nodes: SP/SC bulk enq/dequeue (size 8) | -1.96 MP/MC bulk enq/dequeue (size 8) | 0.88 SP/SC bulk enq/dequeue (size 32) | 0.10 MP/MC bulk enq/dequeue (size 32) | 0.46 Test setup: x86_64 build with default config, dual-socket Xeon E5-2699 v4, running on isolcpus cores with a tickless scheduler. Each test run three times and the results averaged. Signed-off-by: Gage Eads --- lib/librte_ring/rte_ring.c | 43 ++- lib/librte_ring/rte_ring.h | 535 +++++++++++++++++++++++++++++++++-- lib/librte_ring/rte_ring_version.map | 7 + 3 files changed, 554 insertions(+), 31 deletions(-) diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index b15ee0eb3..bd1282eac 100644 --- a/lib/librte_ring/rte_ring.c +++ b/lib/librte_ring/rte_ring.c @@ -45,9 +45,9 @@ EAL_REGISTER_TAILQ(rte_ring_tailq) /* return the size of memory occupied by a ring */ ssize_t -rte_ring_get_memsize(unsigned count) +rte_ring_get_memsize_v1902(unsigned int count, unsigned int flags) { - ssize_t sz; + ssize_t sz, elt_sz; /* count must be a power of 2 */ if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) { @@ -57,10 +57,23 @@ rte_ring_get_memsize(unsigned count) return -EINVAL; } - sz = sizeof(struct rte_ring) + count * sizeof(void *); + elt_sz = (flags & RING_F_NB) ? 2 * sizeof(void *) : sizeof(void *); + + sz = sizeof(struct rte_ring) + count * elt_sz; sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); return sz; } +BIND_DEFAULT_SYMBOL(rte_ring_get_memsize, _v1902, 19.02); +MAP_STATIC_SYMBOL(ssize_t rte_ring_get_memsize(unsigned int count, + unsigned int flags), + rte_ring_get_memsize_v1902); + +ssize_t +rte_ring_get_memsize_v20(unsigned int count) +{ + return rte_ring_get_memsize_v1902(count, 0); +} +VERSION_SYMBOL(rte_ring_get_memsize, _v20, 2.0); int rte_ring_init(struct rte_ring *r, const char *name, unsigned count, @@ -103,6 +116,20 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count, r->prod.head = r->cons.head = 0; r->prod.tail = r->cons.tail = 0; + if (flags & RING_F_NB) { + uint64_t i; + + for (i = 0; i < r->size; i++) { + struct nb_ring_entry *ring_ptr, *base; + + base = ((struct nb_ring_entry *) &r[1]); + + ring_ptr = &base[i & r->mask]; + + ring_ptr->cnt = i; + } + } + return 0; } @@ -123,11 +150,19 @@ rte_ring_create(const char *name, unsigned count, int socket_id, ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list); +#if !defined(RTE_ARCH_X86_64) + if (flags & RING_F_NB) { + printf("RING_F_NB is only supported on x86-64 platforms\n"); + rte_errno = EINVAL; + return NULL; + } +#endif + /* for an exact size ring, round up from count to a power of two */ if (flags & RING_F_EXACT_SZ) count = rte_align32pow2(count + 1); - ring_size = rte_ring_get_memsize(count); + ring_size = rte_ring_get_memsize(count, flags); if (ring_size < 0) { rte_errno = ring_size; return NULL; diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index 12af64e13..95bcdc4db 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -117,6 +117,18 @@ struct rte_ring { */ #define RING_F_EXACT_SZ 0x0004 #define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */ +/** + * The ring uses non-blocking enqueue and dequeue functions. These functions + * do not have the "non-preemptive" constraint of a regular rte ring, and thus + * are suited for applications using preemptible pthreads. However, the + * non-blocking functions have worse average-case performance than their + * regular rte ring counterparts. When used as the handler for a mempool, + * per-thread caching can mitigate the performance difference by reducing the + * number (and contention) of ring accesses. + * + * This flag is only supported on x86_64 platforms. + */ +#define RING_F_NB 0x0008 /* @internal defines for passing to the enqueue dequeue worker functions */ #define __IS_SP 1 @@ -134,11 +146,15 @@ struct rte_ring { * * @param count * The number of elements in the ring (must be a power of 2). + * @param flags + * The flags the ring will be created with. * @return * - The memory size needed for the ring on success. * - -EINVAL if count is not a power of 2. */ -ssize_t rte_ring_get_memsize(unsigned count); +ssize_t rte_ring_get_memsize(unsigned int count, unsigned int flags); +ssize_t rte_ring_get_memsize_v20(unsigned int count); +ssize_t rte_ring_get_memsize_v1902(unsigned int count, unsigned int flags); /** * Initialize a ring structure. @@ -171,6 +187,10 @@ ssize_t rte_ring_get_memsize(unsigned count); * - RING_F_SC_DEQ: If this flag is set, the default behavior when * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` * is "single-consumer". Otherwise, it is "multi-consumers". + * - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2 + * number, but up to half the ring space may be wasted. + * - RING_F_NB: (x86_64 only) If this flag is set, the ring uses + * non-blocking variants of the dequeue and enqueue functions. * @return * 0 on success, or a negative value on error. */ @@ -206,12 +226,17 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned count, * - RING_F_SC_DEQ: If this flag is set, the default behavior when * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` * is "single-consumer". Otherwise, it is "multi-consumers". + * - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2 + * number, but up to half the ring space may be wasted. + * - RING_F_NB: (x86_64 only) If this flag is set, the ring uses + * non-blocking variants of the dequeue and enqueue functions. * @return * On success, the pointer to the new allocated ring. NULL on error with * rte_errno set appropriately. Possible errno values include: * - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure * - E_RTE_SECONDARY - function was called from a secondary process instance - * - EINVAL - count provided is not a power of 2 + * - EINVAL - count provided is not a power of 2, or RING_F_NB is used on an + * unsupported platform * - ENOSPC - the maximum number of memzones has already been allocated * - EEXIST - a memzone with the same name already exists * - ENOMEM - no appropriate memory area found in which to create memzone @@ -267,6 +292,50 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r); } \ } while (0) +/* The actual enqueue of pointers on the ring. + * Used only by the single-producer non-blocking enqueue function, but + * out-lined here for code readability. + */ +#define ENQUEUE_PTRS_NB(r, ring_start, prod_head, obj_table, n) do { \ + unsigned int i; \ + const uint32_t size = (r)->size; \ + uintptr_t idx = prod_head & (r)->mask; \ + uintptr_t new_cnt = prod_head + size; \ + struct nb_ring_entry *ring = (struct nb_ring_entry *)ring_start; \ + if (likely(idx + n < size)) { \ + for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4) { \ + ring[idx].ptr = obj_table[i]; \ + ring[idx].cnt = new_cnt + i; \ + ring[idx + 1].ptr = obj_table[i + 1]; \ + ring[idx + 1].cnt = new_cnt + i + 1; \ + ring[idx + 2].ptr = obj_table[i + 2]; \ + ring[idx + 2].cnt = new_cnt + i + 2; \ + ring[idx + 3].ptr = obj_table[i + 3]; \ + ring[idx + 3].cnt = new_cnt + i + 3; \ + } \ + switch (n & 0x3) { \ + case 3: \ + ring[idx].cnt = new_cnt + i; \ + ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \ + case 2: \ + ring[idx].cnt = new_cnt + i; \ + ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \ + case 1: \ + ring[idx].cnt = new_cnt + i; \ + ring[idx++].ptr = obj_table[i++]; \ + } \ + } else { \ + for (i = 0; idx < size; i++, idx++) { \ + ring[idx].cnt = new_cnt + i; \ + ring[idx].ptr = obj_table[i]; \ + } \ + for (idx = 0; i < n; i++, idx++) { \ + ring[idx].cnt = new_cnt + i; \ + ring[idx].ptr = obj_table[i]; \ + } \ + } \ +} while (0) + /* the actual copy of pointers on the ring to obj_table. * Placed here since identical code needed in both * single and multi consumer dequeue functions */ @@ -298,6 +367,39 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r); } \ } while (0) +/* The actual copy of pointers on the ring to obj_table. + * Placed here since identical code needed in both + * single and multi consumer non-blocking dequeue functions. + */ +#define DEQUEUE_PTRS_NB(r, ring_start, cons_head, obj_table, n) do { \ + unsigned int i; \ + uintptr_t idx = cons_head & (r)->mask; \ + const uint32_t size = (r)->size; \ + struct nb_ring_entry *ring = (struct nb_ring_entry *)ring_start; \ + if (likely(idx + n < size)) { \ + for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4) {\ + obj_table[i] = ring[idx].ptr; \ + obj_table[i + 1] = ring[idx + 1].ptr; \ + obj_table[i + 2] = ring[idx + 2].ptr; \ + obj_table[i + 3] = ring[idx + 3].ptr; \ + } \ + switch (n & 0x3) { \ + case 3: \ + obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \ + case 2: \ + obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \ + case 1: \ + obj_table[i++] = ring[idx++].ptr; \ + } \ + } else { \ + for (i = 0; idx < size; i++, idx++) \ + obj_table[i] = ring[idx].ptr; \ + for (idx = 0; i < n; i++, idx++) \ + obj_table[i] = ring[idx].ptr; \ + } \ +} while (0) + + /* Between load and load. there might be cpu reorder in weak model * (powerpc/arm). * There are 2 choices for the users @@ -313,6 +415,314 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r); #include "rte_ring_generic.h" #endif +/* @internal 128-bit structure used by the non-blocking ring */ +struct nb_ring_entry { + void *ptr; /**< Data pointer */ + uint64_t cnt; /**< Modification counter */ +}; + +/* The non-blocking ring algorithm is based on the original rte ring (derived + * from FreeBSD's bufring.h) and inspired by Michael and Scott's non-blocking + * concurrent queue. + */ + +/** + * @internal + * Enqueue several objects on the non-blocking ring (single-producer only) + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_nb_enqueue_sp(struct rte_ring *r, void * const *obj_table, + unsigned int n, + enum rte_ring_queue_behavior behavior, + unsigned int *free_space) +{ + uintptr_t head, next; + uint32_t free_entries; + + n = __rte_ring_move_prod_head(r, 1, n, behavior, + &head, &next, &free_entries); + if (n == 0) + goto end; + + ENQUEUE_PTRS_NB(r, &r[1], head, obj_table, n); + + r->prod.tail += n; + +end: + if (free_space != NULL) + *free_space = free_entries - n; + return n; +} + +/** + * @internal + * Enqueue several objects on the non-blocking ring (multi-producer safe) + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_nb_enqueue_mp(struct rte_ring *r, void * const *obj_table, + unsigned int n, + enum rte_ring_queue_behavior behavior, + unsigned int *free_space) +{ +#ifdef RTE_ARCH_X86_64 + uintptr_t head, next, tail; + uint32_t free_entries; + unsigned int i; + + n = __rte_ring_move_prod_head(r, 0, n, behavior, + &head, &next, &free_entries); + if (n == 0) + goto end; + + for (i = 0; i < n; /* i incremented if enqueue succeeds */) { + struct nb_ring_entry old_value, new_value; + struct nb_ring_entry *ring_ptr; + + /* Enqueue to the tail entry. If another thread wins the race, + * retry with the new tail. + */ + tail = r->prod.tail; + + ring_ptr = &((struct nb_ring_entry *)&r[1])[tail & r->mask]; + + old_value = *ring_ptr; + + /* If the tail entry's modification counter doesn't match the + * producer tail index, it's already been updated. + */ + if ((old_value.cnt) != tail) + continue; + + /* Prepare the new entry. The cnt field mitigates the ABA + * problem on the ring write. + */ + new_value.ptr = obj_table[i]; + new_value.cnt = tail + r->size; + + if (rte_atomic128_cmpset((volatile void *)ring_ptr, + (uint64_t *)&old_value, + (uint64_t *)&new_value)) + i++; + + /* Every thread attempts the cmpset, so they don't have to wait + * for the thread that successfully enqueued to the ring. + * Using a 64-bit tail mitigates the ABA problem here. + * + * Built-in used to handle variable-sized tail index. + */ + __sync_bool_compare_and_swap(&r->prod.tail, tail, tail + 1); + } + +end: + if (free_space != NULL) + *free_space = free_entries - n; + return n; +#else + RTE_SET_USED(r); + RTE_SET_USED(obj_table); + RTE_SET_USED(n); + RTE_SET_USED(behavior); + RTE_SET_USED(free_space); + return 0; +#endif +} + +/** + * @internal Enqueue several objects on the non-blocking ring + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring + * @param is_sp + * Indicates whether to use single producer or multi-producer head update + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_nb_enqueue(struct rte_ring *r, void * const *obj_table, + unsigned int n, enum rte_ring_queue_behavior behavior, + unsigned int is_sp, unsigned int *free_space) +{ + if (is_sp) + return __rte_ring_do_nb_enqueue_sp(r, obj_table, n, + behavior, free_space); + else + return __rte_ring_do_nb_enqueue_mp(r, obj_table, n, + behavior, free_space); +} + +/** + * @internal + * Dequeue several objects from the non-blocking ring (single-consumer only) + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_nb_dequeue_sc(struct rte_ring *r, void **obj_table, + unsigned int n, + enum rte_ring_queue_behavior behavior, + unsigned int *available) +{ + uintptr_t head, next; + uint32_t entries; + + n = __rte_ring_move_cons_head(r, 1, n, behavior, + &head, &next, &entries); + if (n == 0) + goto end; + + DEQUEUE_PTRS_NB(r, &r[1], head, obj_table, n); + + r->cons.tail += n; + +end: + if (available != NULL) + *available = entries - n; + return n; +} + +/** + * @internal + * Dequeue several objects from the non-blocking ring (multi-consumer safe) + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_nb_dequeue_mc(struct rte_ring *r, void **obj_table, + unsigned int n, + enum rte_ring_queue_behavior behavior, + unsigned int *available) +{ + uintptr_t head, next; + uint32_t entries; + + n = __rte_ring_move_cons_head(r, 0, n, behavior, + &head, &next, &entries); + if (n == 0) + goto end; + + while (1) { + uintptr_t tail = r->cons.tail; + + /* Dequeue from the cons tail onwards. If multiple threads read + * the same pointers, the thread that successfully performs the + * CAS will keep them and the other(s) will retry. + */ + DEQUEUE_PTRS_NB(r, &r[1], tail, obj_table, n); + + next = tail + n; + + /* Built-in used to handle variable-sized tail index. */ + if (__sync_bool_compare_and_swap(&r->cons.tail, tail, next)) { + /* There is potential for the ABA problem here, but + * that is mitigated by the large (64-bit) tail. + */ + break; + } + } + +end: + if (available != NULL) + *available = entries - n; + return n; +} + +/** + * @internal Dequeue several objects from the non-blocking ring + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_nb_dequeue(struct rte_ring *r, void **obj_table, + unsigned int n, enum rte_ring_queue_behavior behavior, + unsigned int is_sc, unsigned int *available) +{ + if (is_sc) + return __rte_ring_do_nb_dequeue_sc(r, obj_table, n, + behavior, available); + else + return __rte_ring_do_nb_dequeue_mc(r, obj_table, n, + behavior, available); +} + /** * @internal Enqueue several objects on the ring * @@ -420,8 +830,14 @@ static __rte_always_inline unsigned int rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - __IS_MP, free_space); + if (r->flags & RING_F_NB) + return __rte_ring_do_nb_enqueue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, __IS_MP, + free_space); + else + return __rte_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, __IS_MP, + free_space); } /** @@ -443,8 +859,14 @@ static __rte_always_inline unsigned int rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - __IS_SP, free_space); + if (r->flags & RING_F_NB) + return __rte_ring_do_nb_enqueue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, __IS_SP, + free_space); + else + return __rte_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, __IS_SP, + free_space); } /** @@ -470,8 +892,14 @@ static __rte_always_inline unsigned int rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - r->prod.single, free_space); + if (r->flags & RING_F_NB) + return __rte_ring_do_nb_enqueue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, + r->prod.single, free_space); + else + return __rte_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, + r->prod.single, free_space); } /** @@ -554,8 +982,14 @@ static __rte_always_inline unsigned int rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - __IS_MC, available); + if (r->flags & RING_F_NB) + return __rte_ring_do_nb_dequeue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, __IS_MC, + available); + else + return __rte_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, __IS_MC, + available); } /** @@ -578,8 +1012,14 @@ static __rte_always_inline unsigned int rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - __IS_SC, available); + if (r->flags & RING_F_NB) + return __rte_ring_do_nb_dequeue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, __IS_SC, + available); + else + return __rte_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, __IS_SC, + available); } /** @@ -605,8 +1045,14 @@ static __rte_always_inline unsigned int rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - r->cons.single, available); + if (r->flags & RING_F_NB) + return __rte_ring_do_nb_dequeue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, + r->cons.single, available); + else + return __rte_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_FIXED, + r->cons.single, available); } /** @@ -803,8 +1249,14 @@ static __rte_always_inline unsigned rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space); + if (r->flags & RING_F_NB) + return __rte_ring_do_nb_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + __IS_MP, free_space); + else + return __rte_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + __IS_MP, free_space); } /** @@ -826,8 +1278,14 @@ static __rte_always_inline unsigned rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space); + if (r->flags & RING_F_NB) + return __rte_ring_do_nb_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + __IS_SP, free_space); + else + return __rte_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + __IS_SP, free_space); } /** @@ -853,8 +1311,14 @@ static __rte_always_inline unsigned rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE, - r->prod.single, free_space); + if (r->flags & RING_F_NB) + return __rte_ring_do_nb_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + r->prod.single, free_space); + else + return __rte_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + r->prod.single, free_space); } /** @@ -881,8 +1345,14 @@ static __rte_always_inline unsigned rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, __IS_MC, available); + if (r->flags & RING_F_NB) + return __rte_ring_do_nb_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + __IS_MC, available); + else + return __rte_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + __IS_MC, available); } /** @@ -906,8 +1376,14 @@ static __rte_always_inline unsigned rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, __IS_SC, available); + if (r->flags & RING_F_NB) + return __rte_ring_do_nb_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + __IS_SC, available); + else + return __rte_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + __IS_SC, available); } /** @@ -933,9 +1409,14 @@ static __rte_always_inline unsigned rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, - r->cons.single, available); + if (r->flags & RING_F_NB) + return __rte_ring_do_nb_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + r->cons.single, available); + else + return __rte_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, + r->cons.single, available); } #ifdef __cplusplus diff --git a/lib/librte_ring/rte_ring_version.map b/lib/librte_ring/rte_ring_version.map index d935efd0d..8969467af 100644 --- a/lib/librte_ring/rte_ring_version.map +++ b/lib/librte_ring/rte_ring_version.map @@ -17,3 +17,10 @@ DPDK_2.2 { rte_ring_free; } DPDK_2.0; + +DPDK_19.05 { + global: + + rte_ring_get_memsize; + +} DPDK_2.2; From patchwork Thu Jan 10 21:01:19 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Eads, Gage" X-Patchwork-Id: 49640 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 1150F1B937; Thu, 10 Jan 2019 22:02:39 +0100 (CET) Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id F0B7E1B90A for ; Thu, 10 Jan 2019 22:02:34 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga003.fm.intel.com ([10.253.24.29]) by orsmga101.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 10 Jan 2019 13:02:33 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.56,462,1539673200"; d="scan'208";a="124971957" Received: from txasoft-yocto.an.intel.com (HELO txasoft-yocto.an.intel.com.) ([10.123.72.192]) by FMSMGA003.fm.intel.com with ESMTP; 10 Jan 2019 13:02:33 -0800 From: Gage Eads To: dev@dpdk.org Cc: olivier.matz@6wind.com, arybchenko@solarflare.com, bruce.richardson@intel.com, konstantin.ananyev@intel.com Date: Thu, 10 Jan 2019 15:01:19 -0600 Message-Id: <20190110210122.24889-4-gage.eads@intel.com> X-Mailer: git-send-email 2.13.6 In-Reply-To: <20190110210122.24889-1-gage.eads@intel.com> References: <20190110210122.24889-1-gage.eads@intel.com> Subject: [dpdk-dev] [PATCH 3/6] test_ring: add non-blocking ring autotest X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" ring_nb_autotest re-uses the ring_autotest code by wrapping its top-level function with one that takes a 'flags' argument. Signed-off-by: Gage Eads --- test/test/test_ring.c | 57 ++++++++++++++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/test/test/test_ring.c b/test/test/test_ring.c index aaf1e70ad..ff410d978 100644 --- a/test/test/test_ring.c +++ b/test/test/test_ring.c @@ -1,5 +1,5 @@ /* SPDX-License-Identifier: BSD-3-Clause - * Copyright(c) 2010-2014 Intel Corporation + * Copyright(c) 2010-2019 Intel Corporation */ #include @@ -601,18 +601,20 @@ test_ring_burst_basic(struct rte_ring *r) * it will always fail to create ring with a wrong ring size number in this function */ static int -test_ring_creation_with_wrong_size(void) +test_ring_creation_with_wrong_size(unsigned int flags) { struct rte_ring * rp = NULL; /* Test if ring size is not power of 2 */ - rp = rte_ring_create("test_bad_ring_size", RING_SIZE + 1, SOCKET_ID_ANY, 0); + rp = rte_ring_create("test_bad_ring_size", RING_SIZE + 1, + SOCKET_ID_ANY, flags); if (NULL != rp) { return -1; } /* Test if ring size is exceeding the limit */ - rp = rte_ring_create("test_bad_ring_size", (RTE_RING_SZ_MASK + 1), SOCKET_ID_ANY, 0); + rp = rte_ring_create("test_bad_ring_size", (RTE_RING_SZ_MASK + 1), + SOCKET_ID_ANY, flags); if (NULL != rp) { return -1; } @@ -623,11 +625,11 @@ test_ring_creation_with_wrong_size(void) * it tests if it would always fail to create ring with an used ring name */ static int -test_ring_creation_with_an_used_name(void) +test_ring_creation_with_an_used_name(unsigned int flags) { struct rte_ring * rp; - rp = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, 0); + rp = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, flags); if (NULL != rp) return -1; @@ -639,10 +641,10 @@ test_ring_creation_with_an_used_name(void) * function to fail correctly */ static int -test_create_count_odd(void) +test_create_count_odd(unsigned int flags) { struct rte_ring *r = rte_ring_create("test_ring_count", - 4097, SOCKET_ID_ANY, 0 ); + 4097, SOCKET_ID_ANY, flags); if(r != NULL){ return -1; } @@ -665,7 +667,7 @@ test_lookup_null(void) * it tests some more basic ring operations */ static int -test_ring_basic_ex(void) +test_ring_basic_ex(unsigned int flags) { int ret = -1; unsigned i; @@ -679,7 +681,7 @@ test_ring_basic_ex(void) } rp = rte_ring_create("test_ring_basic_ex", RING_SIZE, SOCKET_ID_ANY, - RING_F_SP_ENQ | RING_F_SC_DEQ); + RING_F_SP_ENQ | RING_F_SC_DEQ | flags); if (rp == NULL) { printf("test_ring_basic_ex fail to create ring\n"); goto fail_test; @@ -737,7 +739,7 @@ test_ring_basic_ex(void) } static int -test_ring_with_exact_size(void) +test_ring_with_exact_size(unsigned int flags) { struct rte_ring *std_ring = NULL, *exact_sz_ring = NULL; void *ptr_array[16]; @@ -746,13 +748,13 @@ test_ring_with_exact_size(void) int ret = -1; std_ring = rte_ring_create("std", ring_sz, rte_socket_id(), - RING_F_SP_ENQ | RING_F_SC_DEQ); + RING_F_SP_ENQ | RING_F_SC_DEQ | flags); if (std_ring == NULL) { printf("%s: error, can't create std ring\n", __func__); goto end; } exact_sz_ring = rte_ring_create("exact sz", ring_sz, rte_socket_id(), - RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ); + RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ | flags); if (exact_sz_ring == NULL) { printf("%s: error, can't create exact size ring\n", __func__); goto end; @@ -808,17 +810,17 @@ test_ring_with_exact_size(void) } static int -test_ring(void) +__test_ring(unsigned int flags) { struct rte_ring *r = NULL; /* some more basic operations */ - if (test_ring_basic_ex() < 0) + if (test_ring_basic_ex(flags) < 0) goto test_fail; rte_atomic32_init(&synchro); - r = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, 0); + r = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, flags); if (r == NULL) goto test_fail; @@ -837,27 +839,27 @@ test_ring(void) goto test_fail; /* basic operations */ - if ( test_create_count_odd() < 0){ + if (test_create_count_odd(flags) < 0) { printf("Test failed to detect odd count\n"); goto test_fail; } else printf("Test detected odd count\n"); - if ( test_lookup_null() < 0){ + if (test_lookup_null() < 0) { printf("Test failed to detect NULL ring lookup\n"); goto test_fail; } else printf("Test detected NULL ring lookup\n"); /* test of creating ring with wrong size */ - if (test_ring_creation_with_wrong_size() < 0) + if (test_ring_creation_with_wrong_size(flags) < 0) goto test_fail; /* test of creation ring with an used name */ - if (test_ring_creation_with_an_used_name() < 0) + if (test_ring_creation_with_an_used_name(flags) < 0) goto test_fail; - if (test_ring_with_exact_size() < 0) + if (test_ring_with_exact_size(flags) < 0) goto test_fail; /* dump the ring status */ @@ -873,4 +875,17 @@ test_ring(void) return -1; } +static int +test_ring(void) +{ + return __test_ring(0); +} + +static int +test_nb_ring(void) +{ + return __test_ring(RING_F_NB); +} + REGISTER_TEST_COMMAND(ring_autotest, test_ring); +REGISTER_TEST_COMMAND(ring_nb_autotest, test_nb_ring); From patchwork Thu Jan 10 21:01:20 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Eads, Gage" X-Patchwork-Id: 49642 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 913C31B946; Thu, 10 Jan 2019 22:02:41 +0100 (CET) Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id 8A7D31B90D for ; Thu, 10 Jan 2019 22:02:35 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga003.fm.intel.com ([10.253.24.29]) by orsmga101.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 10 Jan 2019 13:02:34 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.56,462,1539673200"; d="scan'208";a="124971961" Received: from txasoft-yocto.an.intel.com (HELO txasoft-yocto.an.intel.com.) ([10.123.72.192]) by FMSMGA003.fm.intel.com with ESMTP; 10 Jan 2019 13:02:33 -0800 From: Gage Eads To: dev@dpdk.org Cc: olivier.matz@6wind.com, arybchenko@solarflare.com, bruce.richardson@intel.com, konstantin.ananyev@intel.com Date: Thu, 10 Jan 2019 15:01:20 -0600 Message-Id: <20190110210122.24889-5-gage.eads@intel.com> X-Mailer: git-send-email 2.13.6 In-Reply-To: <20190110210122.24889-1-gage.eads@intel.com> References: <20190110210122.24889-1-gage.eads@intel.com> Subject: [dpdk-dev] [PATCH 4/6] test_ring_perf: add non-blocking ring perf test X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" nb_ring_perf_autotest re-uses the ring_perf_autotest code by wrapping its top-level function with one that takes a 'flags' argument. Signed-off-by: Gage Eads --- test/test/test_ring_perf.c | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/test/test/test_ring_perf.c b/test/test/test_ring_perf.c index ebb3939f5..380c4b4a1 100644 --- a/test/test/test_ring_perf.c +++ b/test/test/test_ring_perf.c @@ -1,5 +1,5 @@ /* SPDX-License-Identifier: BSD-3-Clause - * Copyright(c) 2010-2014 Intel Corporation + * Copyright(c) 2010-2019 Intel Corporation */ @@ -363,12 +363,12 @@ test_bulk_enqueue_dequeue(struct rte_ring *r) } static int -test_ring_perf(void) +__test_ring_perf(unsigned int flags) { struct lcore_pair cores; struct rte_ring *r = NULL; - r = rte_ring_create(RING_NAME, RING_SIZE, rte_socket_id(), 0); + r = rte_ring_create(RING_NAME, RING_SIZE, rte_socket_id(), flags); if (r == NULL) return -1; @@ -398,4 +398,17 @@ test_ring_perf(void) return 0; } +static int +test_ring_perf(void) +{ + return __test_ring_perf(0); +} + +static int +test_nb_ring_perf(void) +{ + return __test_ring_perf(RING_F_NB); +} + REGISTER_TEST_COMMAND(ring_perf_autotest, test_ring_perf); +REGISTER_TEST_COMMAND(ring_nb_perf_autotest, test_nb_ring_perf); From patchwork Thu Jan 10 21:01:21 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Eads, Gage" X-Patchwork-Id: 49643 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 352031B956; Thu, 10 Jan 2019 22:02:43 +0100 (CET) Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id A8E4A1B91F for ; Thu, 10 Jan 2019 22:02:35 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga003.fm.intel.com ([10.253.24.29]) by orsmga101.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 10 Jan 2019 13:02:34 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.56,462,1539673200"; d="scan'208";a="124971967" Received: from txasoft-yocto.an.intel.com (HELO txasoft-yocto.an.intel.com.) ([10.123.72.192]) by FMSMGA003.fm.intel.com with ESMTP; 10 Jan 2019 13:02:34 -0800 From: Gage Eads To: dev@dpdk.org Cc: olivier.matz@6wind.com, arybchenko@solarflare.com, bruce.richardson@intel.com, konstantin.ananyev@intel.com Date: Thu, 10 Jan 2019 15:01:21 -0600 Message-Id: <20190110210122.24889-6-gage.eads@intel.com> X-Mailer: git-send-email 2.13.6 In-Reply-To: <20190110210122.24889-1-gage.eads@intel.com> References: <20190110210122.24889-1-gage.eads@intel.com> Subject: [dpdk-dev] [PATCH 5/6] mempool/ring: add non-blocking ring handlers X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" These handlers allow an application to create a mempool based on the non-blocking ring, with any combination of single/multi producer/consumer. Signed-off-by: Gage Eads Acked-by: Andrew Rybchenko --- drivers/mempool/ring/rte_mempool_ring.c | 58 +++++++++++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 3 deletions(-) diff --git a/drivers/mempool/ring/rte_mempool_ring.c b/drivers/mempool/ring/rte_mempool_ring.c index bc123fc52..013dac3bc 100644 --- a/drivers/mempool/ring/rte_mempool_ring.c +++ b/drivers/mempool/ring/rte_mempool_ring.c @@ -1,5 +1,5 @@ /* SPDX-License-Identifier: BSD-3-Clause - * Copyright(c) 2010-2016 Intel Corporation + * Copyright(c) 2010-2019 Intel Corporation */ #include @@ -47,11 +47,11 @@ common_ring_get_count(const struct rte_mempool *mp) static int -common_ring_alloc(struct rte_mempool *mp) +__common_ring_alloc(struct rte_mempool *mp, int rg_flags) { - int rg_flags = 0, ret; char rg_name[RTE_RING_NAMESIZE]; struct rte_ring *r; + int ret; ret = snprintf(rg_name, sizeof(rg_name), RTE_MEMPOOL_MZ_FORMAT, mp->name); @@ -82,6 +82,18 @@ common_ring_alloc(struct rte_mempool *mp) return 0; } +static int +common_ring_alloc(struct rte_mempool *mp) +{ + return __common_ring_alloc(mp, 0); +} + +static int +common_ring_alloc_nb(struct rte_mempool *mp) +{ + return __common_ring_alloc(mp, RING_F_NB); +} + static void common_ring_free(struct rte_mempool *mp) { @@ -130,7 +142,47 @@ static const struct rte_mempool_ops ops_sp_mc = { .get_count = common_ring_get_count, }; +static const struct rte_mempool_ops ops_mp_mc_nb = { + .name = "ring_mp_mc_nb", + .alloc = common_ring_alloc_nb, + .free = common_ring_free, + .enqueue = common_ring_mp_enqueue, + .dequeue = common_ring_mc_dequeue, + .get_count = common_ring_get_count, +}; + +static const struct rte_mempool_ops ops_sp_sc_nb = { + .name = "ring_sp_sc_nb", + .alloc = common_ring_alloc_nb, + .free = common_ring_free, + .enqueue = common_ring_sp_enqueue, + .dequeue = common_ring_sc_dequeue, + .get_count = common_ring_get_count, +}; + +static const struct rte_mempool_ops ops_mp_sc_nb = { + .name = "ring_mp_sc_nb", + .alloc = common_ring_alloc_nb, + .free = common_ring_free, + .enqueue = common_ring_mp_enqueue, + .dequeue = common_ring_sc_dequeue, + .get_count = common_ring_get_count, +}; + +static const struct rte_mempool_ops ops_sp_mc_nb = { + .name = "ring_sp_mc_nb", + .alloc = common_ring_alloc_nb, + .free = common_ring_free, + .enqueue = common_ring_sp_enqueue, + .dequeue = common_ring_mc_dequeue, + .get_count = common_ring_get_count, +}; + MEMPOOL_REGISTER_OPS(ops_mp_mc); MEMPOOL_REGISTER_OPS(ops_sp_sc); MEMPOOL_REGISTER_OPS(ops_mp_sc); MEMPOOL_REGISTER_OPS(ops_sp_mc); +MEMPOOL_REGISTER_OPS(ops_mp_mc_nb); +MEMPOOL_REGISTER_OPS(ops_sp_sc_nb); +MEMPOOL_REGISTER_OPS(ops_mp_sc_nb); +MEMPOOL_REGISTER_OPS(ops_sp_mc_nb); From patchwork Thu Jan 10 21:01:22 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Eads, Gage" X-Patchwork-Id: 49644 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 1D7EF1B95C; Thu, 10 Jan 2019 22:02:44 +0100 (CET) Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id 06C7F1B922 for ; Thu, 10 Jan 2019 22:02:35 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga003.fm.intel.com ([10.253.24.29]) by orsmga101.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 10 Jan 2019 13:02:35 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.56,462,1539673200"; d="scan'208";a="124971974" Received: from txasoft-yocto.an.intel.com (HELO txasoft-yocto.an.intel.com.) ([10.123.72.192]) by FMSMGA003.fm.intel.com with ESMTP; 10 Jan 2019 13:02:34 -0800 From: Gage Eads To: dev@dpdk.org Cc: olivier.matz@6wind.com, arybchenko@solarflare.com, bruce.richardson@intel.com, konstantin.ananyev@intel.com Date: Thu, 10 Jan 2019 15:01:22 -0600 Message-Id: <20190110210122.24889-7-gage.eads@intel.com> X-Mailer: git-send-email 2.13.6 In-Reply-To: <20190110210122.24889-1-gage.eads@intel.com> References: <20190110210122.24889-1-gage.eads@intel.com> Subject: [dpdk-dev] [PATCH 6/6] doc: add NB ring comment to EAL "known issues" X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" This comment makes users aware of the non-blocking ring option and its caveats. Signed-off-by: Gage Eads --- doc/guides/prog_guide/env_abstraction_layer.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/guides/prog_guide/env_abstraction_layer.rst b/doc/guides/prog_guide/env_abstraction_layer.rst index 9497b879c..b6ac236d6 100644 --- a/doc/guides/prog_guide/env_abstraction_layer.rst +++ b/doc/guides/prog_guide/env_abstraction_layer.rst @@ -541,7 +541,7 @@ Known Issues 5. It MUST not be used by multi-producer/consumer pthreads, whose scheduling policies are SCHED_FIFO or SCHED_RR. - Alternatively, x86_64 applications can use the non-blocking stack mempool handler. When considering this handler, note that: + Alternatively, x86_64 applications can use the non-blocking ring or stack mempool handlers. When considering one of them, note that: - it is limited to the x86_64 platform, because it uses an instruction (16-byte compare-and-swap) that is not available on other platforms. - it has worse average-case performance than the non-preemptive rte_ring, but software caching (e.g. the mempool cache) can mitigate this by reducing the number of handler operations.