[v2,1/5] ring: change head and tail to pointer-width size
diff mbox series

Message ID 20190115235227.14013-2-gage.eads@intel.com
State Superseded, archived
Delegated to: Thomas Monjalon
Headers show
Series
  • Add non-blocking ring
Related show

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Eads, Gage Jan. 15, 2019, 11:52 p.m. UTC
For 64-bit architectures, doubling the head and tail index widths greatly
increases the time it takes for them to wrap-around (with current CPU
speeds, it won't happen within the author's lifetime). This is important in
avoiding the ABA problem -- in which a thread mistakes reading the same
tail index in two accesses to mean that the ring was not modified in the
intervening time -- in the upcoming non-blocking ring implementation. Using
a 64-bit index makes the possibility of this occurring effectively zero.

I tested this commit's performance impact with an x86_64 build on a
dual-socket Xeon E5-2699 v4 using ring_perf_autotest, and the change made
no significant difference -- the few differences appear to be system noise.
(The test ran on isolcpus cores using a tickless scheduler, but some
variation was stll observed.) Each test was run three times and the results
were averaged:

                                  | 64b head/tail cycle cost minus
             Test                 |     32b head/tail cycle cost
------------------------------------------------------------------
SP/SC single enq/dequeue          | 0.33
MP/MC single enq/dequeue          | 0.00
SP/SC burst enq/dequeue (size 8)  | 0.00
MP/MC burst enq/dequeue (size 8)  | 1.00
SP/SC burst enq/dequeue (size 32) | 0.00
MP/MC burst enq/dequeue (size 32) | -1.00
SC empty dequeue                  | 0.01
MC empty dequeue                  | 0.00

Single lcore:
SP/SC bulk enq/dequeue (size 8)   | -0.36
MP/MC bulk enq/dequeue (size 8)   | 0.99
SP/SC bulk enq/dequeue (size 32)  | -0.40
MP/MC bulk enq/dequeue (size 32)  | -0.57

Two physical cores:
SP/SC bulk enq/dequeue (size 8)   | -0.49
MP/MC bulk enq/dequeue (size 8)   | 0.19
SP/SC bulk enq/dequeue (size 32)  | -0.28
MP/MC bulk enq/dequeue (size 32)  | -0.62

Two NUMA nodes:
SP/SC bulk enq/dequeue (size 8)   | 3.25
MP/MC bulk enq/dequeue (size 8)   | 1.87
SP/SC bulk enq/dequeue (size 32)  | -0.44
MP/MC bulk enq/dequeue (size 32)  | -1.10

An earlier version of this patch changed the head and tail indexes to
uint64_t, but that caused a performance drop on 32-bit builds. With
size_t, no performance difference is observed on an i686 build.

Signed-off-by: Gage Eads <gage.eads@intel.com>
---
 lib/librte_eventdev/rte_event_ring.h |  6 +++---
 lib/librte_ring/Makefile             |  2 +-
 lib/librte_ring/meson.build          |  2 +-
 lib/librte_ring/rte_ring.c           | 10 +++++-----
 lib/librte_ring/rte_ring.h           | 29 ++++++++++++++++++-----------
 lib/librte_ring/rte_ring_generic.h   | 16 +++++++++-------
 6 files changed, 37 insertions(+), 28 deletions(-)

Patch
diff mbox series

diff --git a/lib/librte_eventdev/rte_event_ring.h b/lib/librte_eventdev/rte_event_ring.h
index 827a3209e..da8886c08 100644
--- a/lib/librte_eventdev/rte_event_ring.h
+++ b/lib/librte_eventdev/rte_event_ring.h
@@ -1,5 +1,5 @@ 
 /* SPDX-License-Identifier: BSD-3-Clause
- * Copyright(c) 2016-2017 Intel Corporation
+ * Copyright(c) 2016-2019 Intel Corporation
  */
 
 /**
@@ -88,7 +88,7 @@  rte_event_ring_enqueue_burst(struct rte_event_ring *r,
 		const struct rte_event *events,
 		unsigned int n, uint16_t *free_space)
 {
-	uint32_t prod_head, prod_next;
+	size_t prod_head, prod_next;
 	uint32_t free_entries;
 
 	n = __rte_ring_move_prod_head(&r->r, r->r.prod.single, n,
@@ -129,7 +129,7 @@  rte_event_ring_dequeue_burst(struct rte_event_ring *r,
 		struct rte_event *events,
 		unsigned int n, uint16_t *available)
 {
-	uint32_t cons_head, cons_next;
+	size_t cons_head, cons_next;
 	uint32_t entries;
 
 	n = __rte_ring_move_cons_head(&r->r, r->r.cons.single, n,
diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
index 21a36770d..c106f9908 100644
--- a/lib/librte_ring/Makefile
+++ b/lib/librte_ring/Makefile
@@ -11,7 +11,7 @@  LDLIBS += -lrte_eal
 
 EXPORT_MAP := rte_ring_version.map
 
-LIBABIVER := 2
+LIBABIVER := 3
 
 # all source are stored in SRCS-y
 SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build
index ab8b0b469..3603fdb7a 100644
--- a/lib/librte_ring/meson.build
+++ b/lib/librte_ring/meson.build
@@ -1,7 +1,7 @@ 
 # SPDX-License-Identifier: BSD-3-Clause
 # Copyright(c) 2017 Intel Corporation
 
-version = 2
+version = 3
 sources = files('rte_ring.c')
 headers = files('rte_ring.h',
 		'rte_ring_c11_mem.h',
diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
index d215acecc..b15ee0eb3 100644
--- a/lib/librte_ring/rte_ring.c
+++ b/lib/librte_ring/rte_ring.c
@@ -1,6 +1,6 @@ 
 /* SPDX-License-Identifier: BSD-3-Clause
  *
- * Copyright (c) 2010-2015 Intel Corporation
+ * Copyright (c) 2010-2019 Intel Corporation
  * Copyright (c) 2007,2008 Kip Macy kmacy@freebsd.org
  * All rights reserved.
  * Derived from FreeBSD's bufring.h
@@ -227,10 +227,10 @@  rte_ring_dump(FILE *f, const struct rte_ring *r)
 	fprintf(f, "  flags=%x\n", r->flags);
 	fprintf(f, "  size=%"PRIu32"\n", r->size);
 	fprintf(f, "  capacity=%"PRIu32"\n", r->capacity);
-	fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
-	fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
-	fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
-	fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
+	fprintf(f, "  ct=%"PRIuPTR"\n", r->cons.tail);
+	fprintf(f, "  ch=%"PRIuPTR"\n", r->cons.head);
+	fprintf(f, "  pt=%"PRIuPTR"\n", r->prod.tail);
+	fprintf(f, "  ph=%"PRIuPTR"\n", r->prod.head);
 	fprintf(f, "  used=%u\n", rte_ring_count(r));
 	fprintf(f, "  avail=%u\n", rte_ring_free_count(r));
 }
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index af5444a9f..213c50708 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -1,6 +1,6 @@ 
 /* SPDX-License-Identifier: BSD-3-Clause
  *
- * Copyright (c) 2010-2017 Intel Corporation
+ * Copyright (c) 2010-2019 Intel Corporation
  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
  * All rights reserved.
  * Derived from FreeBSD's bufring.h
@@ -63,11 +63,18 @@  enum rte_ring_queue_behavior {
 
 struct rte_memzone; /* forward declaration, so as not to require memzone.h */
 
+/* Verify that size_t is the same as uintptr_t, which rte_ring (among other
+ * components) assumes.
+ */
+#if UINTPTR_MAX != SIZE_MAX
+#error "DPDK requires sizeof(size_t) == sizeof(uintptr_t)"
+#endif
+
 /* structure to hold a pair of head/tail values and other metadata */
 struct rte_ring_headtail {
-	volatile uint32_t head;  /**< Prod/consumer head. */
-	volatile uint32_t tail;  /**< Prod/consumer tail. */
-	uint32_t single;         /**< True if single prod/cons */
+	volatile size_t head;  /**< Prod/consumer head. */
+	volatile size_t tail;  /**< Prod/consumer tail. */
+	uint32_t single;       /**< True if single prod/cons */
 };
 
 /**
@@ -242,7 +249,7 @@  void rte_ring_dump(FILE *f, const struct rte_ring *r);
 #define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \
 	unsigned int i; \
 	const uint32_t size = (r)->size; \
-	uint32_t idx = prod_head & (r)->mask; \
+	size_t idx = prod_head & (r)->mask; \
 	obj_type *ring = (obj_type *)ring_start; \
 	if (likely(idx + n < size)) { \
 		for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
@@ -272,7 +279,7 @@  void rte_ring_dump(FILE *f, const struct rte_ring *r);
  * single and multi consumer dequeue functions */
 #define DEQUEUE_PTRS(r, ring_start, cons_head, obj_table, n, obj_type) do { \
 	unsigned int i; \
-	uint32_t idx = cons_head & (r)->mask; \
+	size_t idx = cons_head & (r)->mask; \
 	const uint32_t size = (r)->size; \
 	obj_type *ring = (obj_type *)ring_start; \
 	if (likely(idx + n < size)) { \
@@ -338,7 +345,7 @@  __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
 		 unsigned int n, enum rte_ring_queue_behavior behavior,
 		 unsigned int is_sp, unsigned int *free_space)
 {
-	uint32_t prod_head, prod_next;
+	size_t prod_head, prod_next;
 	uint32_t free_entries;
 
 	n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
@@ -380,7 +387,7 @@  __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
 		 unsigned int n, enum rte_ring_queue_behavior behavior,
 		 unsigned int is_sc, unsigned int *available)
 {
-	uint32_t cons_head, cons_next;
+	size_t cons_head, cons_next;
 	uint32_t entries;
 
 	n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
@@ -681,9 +688,9 @@  rte_ring_dequeue(struct rte_ring *r, void **obj_p)
 static inline unsigned
 rte_ring_count(const struct rte_ring *r)
 {
-	uint32_t prod_tail = r->prod.tail;
-	uint32_t cons_tail = r->cons.tail;
-	uint32_t count = (prod_tail - cons_tail) & r->mask;
+	size_t prod_tail = r->prod.tail;
+	size_t cons_tail = r->cons.tail;
+	size_t count = (prod_tail - cons_tail) & r->mask;
 	return (count > r->capacity) ? r->capacity : count;
 }
 
diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h
index ea7dbe5b9..31e38adcc 100644
--- a/lib/librte_ring/rte_ring_generic.h
+++ b/lib/librte_ring/rte_ring_generic.h
@@ -1,6 +1,6 @@ 
 /* SPDX-License-Identifier: BSD-3-Clause
  *
- * Copyright (c) 2010-2017 Intel Corporation
+ * Copyright (c) 2010-2019 Intel Corporation
  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
  * All rights reserved.
  * Derived from FreeBSD's bufring.h
@@ -11,7 +11,7 @@ 
 #define _RTE_RING_GENERIC_H_
 
 static __rte_always_inline void
-update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
+update_tail(struct rte_ring_headtail *ht, size_t old_val, size_t new_val,
 		uint32_t single, uint32_t enqueue)
 {
 	if (enqueue)
@@ -55,7 +55,7 @@  update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
 static __rte_always_inline unsigned int
 __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
 		unsigned int n, enum rte_ring_queue_behavior behavior,
-		uint32_t *old_head, uint32_t *new_head,
+		size_t *old_head, size_t *new_head,
 		uint32_t *free_entries)
 {
 	const uint32_t capacity = r->capacity;
@@ -93,7 +93,8 @@  __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
 		if (is_sp)
 			r->prod.head = *new_head, success = 1;
 		else
-			success = rte_atomic32_cmpset(&r->prod.head,
+			/* Built-in used to handle variable-sized head index. */
+			success = __sync_bool_compare_and_swap(&r->prod.head,
 					*old_head, *new_head);
 	} while (unlikely(success == 0));
 	return n;
@@ -125,7 +126,7 @@  __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
 static __rte_always_inline unsigned int
 __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
 		unsigned int n, enum rte_ring_queue_behavior behavior,
-		uint32_t *old_head, uint32_t *new_head,
+		size_t *old_head, size_t *new_head,
 		uint32_t *entries)
 {
 	unsigned int max = n;
@@ -161,8 +162,9 @@  __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
 		if (is_sc)
 			r->cons.head = *new_head, success = 1;
 		else
-			success = rte_atomic32_cmpset(&r->cons.head, *old_head,
-					*new_head);
+			/* Built-in used to handle variable-sized head index. */
+			success = __sync_bool_compare_and_swap(&r->cons.head,
+					*old_head, *new_head);
 	} while (unlikely(success == 0));
 	return n;
 }