[dpdk-dev,RFCv2,3/4] ring: allow common ring to use 8 or 16 byte values

Message ID 1485254377-20098-4-git-send-email-bruce.richardson@intel.com
State RFC, archived
Headers show

Checks

Context Check Description
ci/Intel compilation fail Compilation issues
ci/checkpatch warning coding style issues

Commit Message

Bruce Richardson Jan. 24, 2017, 10:39 a.m.
Change the common ring enqueue/dequeue functions to support enqueuing and
dequeuing 16B values. Add the element size as parameter to all common ring
functions that need it, and pass that as parameter from the rte_ring
functions.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 lib/librte_ring/rte_common_ring.c |  14 ++--
 lib/librte_ring/rte_common_ring.h | 135 ++++++++++++++++++++++++++++----------
 lib/librte_ring/rte_ring.c        |   7 +-
 lib/librte_ring/rte_ring.h        |  16 ++---
 4 files changed, 122 insertions(+), 50 deletions(-)

Patch

diff --git a/lib/librte_ring/rte_common_ring.c b/lib/librte_ring/rte_common_ring.c
index a0c4b5a..eb04de4 100644
--- a/lib/librte_ring/rte_common_ring.c
+++ b/lib/librte_ring/rte_common_ring.c
@@ -101,7 +101,7 @@  EAL_REGISTER_TAILQ(rte_common_ring_tailq)
 
 /* return the size of memory occupied by a ring */
 ssize_t
-rte_common_ring_get_memsize(unsigned count)
+rte_common_ring_get_memsize(unsigned count, unsigned int elem_sz)
 {
 	ssize_t sz;
 
@@ -113,14 +113,14 @@  rte_common_ring_get_memsize(unsigned count)
 		return -EINVAL;
 	}
 
-	sz = sizeof(struct rte_ring) + count * sizeof(void *);
+	sz = sizeof(struct rte_ring) + count * elem_sz;
 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
 	return sz;
 }
 
 int
 rte_common_ring_init(struct rte_ring *r, const char *name, unsigned count,
-	unsigned flags)
+	unsigned flags, unsigned int elem_sz)
 {
 	int ret;
 
@@ -146,6 +146,7 @@  rte_common_ring_init(struct rte_ring *r, const char *name, unsigned count,
 	if (ret < 0 || ret >= (int)sizeof(r->name))
 		return -ENAMETOOLONG;
 	r->flags = flags;
+	r->elem_sz = elem_sz;
 	r->prod.watermark = count;
 	r->prod.sp_enqueue = !!(flags & RING_F_SP_ENQ);
 	r->cons.sc_dequeue = !!(flags & RING_F_SC_DEQ);
@@ -160,7 +161,7 @@  rte_common_ring_init(struct rte_ring *r, const char *name, unsigned count,
 /* create the ring */
 struct rte_ring *
 rte_common_ring_create(const char *name, unsigned count, int socket_id,
-		unsigned flags)
+		unsigned flags, unsigned int elem_sz)
 {
 	char mz_name[RTE_MEMZONE_NAMESIZE];
 	struct rte_ring *r;
@@ -173,7 +174,7 @@  rte_common_ring_create(const char *name, unsigned count, int socket_id,
 
 	ring_list = RTE_TAILQ_CAST(rte_common_ring_tailq.head, rte_common_ring_list);
 
-	ring_size = rte_common_ring_get_memsize(count);
+	ring_size = rte_common_ring_get_memsize(count, elem_sz);
 	if (ring_size < 0) {
 		rte_errno = ring_size;
 		return NULL;
@@ -203,7 +204,7 @@  rte_common_ring_create(const char *name, unsigned count, int socket_id,
 		r = mz->addr;
 		/* no need to check return value here, we already checked the
 		 * arguments above */
-		rte_common_ring_init(r, name, count, flags);
+		rte_common_ring_init(r, name, count, flags, elem_sz);
 
 		te->data = (void *) r;
 		r->memzone = mz;
@@ -293,6 +294,7 @@  rte_common_ring_dump(FILE *f, const struct rte_ring *r)
 
 	fprintf(f, "ring <%s>@%p\n", r->name, r);
 	fprintf(f, "  flags=%x\n", r->flags);
+	fprintf(f, "  elem_sz=%u\n", r->elem_sz);
 	fprintf(f, "  size=%"PRIu32"\n", r->prod.size);
 	fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
 	fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
diff --git a/lib/librte_ring/rte_common_ring.h b/lib/librte_ring/rte_common_ring.h
index f2c1c46..314d53b 100644
--- a/lib/librte_ring/rte_common_ring.h
+++ b/lib/librte_ring/rte_common_ring.h
@@ -101,6 +101,7 @@  extern "C" {
 #include <rte_atomic.h>
 #include <rte_branch_prediction.h>
 #include <rte_memzone.h>
+#include <rte_vect.h>
 
 #define RTE_TAILQ_RING_NAME "RTE_RING"
 
@@ -157,6 +158,7 @@  struct rte_ring {
 	 */
 	char name[RTE_MEMZONE_NAMESIZE];    /**< Name of the ring. */
 	int flags;                       /**< Flags supplied at creation. */
+	unsigned int elem_sz;            /**< Size of a ring entry */
 	const struct rte_memzone *memzone;
 			/**< Memzone, if any, containing the rte_ring */
 
@@ -232,7 +234,7 @@  struct rte_ring {
  *   - The memory size needed for the ring on success.
  *   - -EINVAL if count is not a power of 2.
  */
-ssize_t rte_common_ring_get_memsize(unsigned count);
+ssize_t rte_common_ring_get_memsize(unsigned count, unsigned int elem_sz);
 
 /**
  * Initialize a ring structure.
@@ -269,7 +271,7 @@  ssize_t rte_common_ring_get_memsize(unsigned count);
  *   0 on success, or a negative value on error.
  */
 int rte_common_ring_init(struct rte_ring *r, const char *name, unsigned count,
-	unsigned flags);
+	unsigned flags, unsigned int elem_sz);
 
 /**
  * Create a new ring named *name* in memory.
@@ -311,7 +313,8 @@  int rte_common_ring_init(struct rte_ring *r, const char *name, unsigned count,
  *    - ENOMEM - no appropriate memory area found in which to create memzone
  */
 struct rte_ring *rte_common_ring_create(const char *name, unsigned count,
-				 int socket_id, unsigned flags);
+				 int socket_id, unsigned flags,
+				 unsigned int elem_sz);
 /**
  * De-allocate all memory used by the ring.
  *
@@ -354,25 +357,50 @@  void rte_common_ring_dump(FILE *f, const struct rte_ring *r);
  * Placed here since identical code needed in both
  * single and multi producer enqueue functions */
 #define ENQUEUE_PTRS() do { \
+	void * const *objs = obj_table; \
 	const uint32_t size = r->prod.size; \
 	uint32_t idx = prod_head & mask; \
 	if (likely(idx + n < size)) { \
 		for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
-			r->ring[idx] = obj_table[i]; \
-			r->ring[idx+1] = obj_table[i+1]; \
-			r->ring[idx+2] = obj_table[i+2]; \
-			r->ring[idx+3] = obj_table[i+3]; \
+			r->ring[idx] = objs[i]; \
+			r->ring[idx+1] = objs[i+1]; \
+			r->ring[idx+2] = objs[i+2]; \
+			r->ring[idx+3] = objs[i+3]; \
 		} \
 		switch (n & 0x3) { \
-			case 3: r->ring[idx++] = obj_table[i++]; \
-			case 2: r->ring[idx++] = obj_table[i++]; \
-			case 1: r->ring[idx++] = obj_table[i++]; \
+			case 3: r->ring[idx++] = objs[i++]; \
+			case 2: r->ring[idx++] = objs[i++]; \
+			case 1: r->ring[idx++] = objs[i++]; \
 		} \
 	} else { \
 		for (i = 0; idx < size; i++, idx++)\
-			r->ring[idx] = obj_table[i]; \
+			r->ring[idx] = objs[i]; \
 		for (idx = 0; i < n; i++, idx++) \
-			r->ring[idx] = obj_table[i]; \
+			r->ring[idx] = objs[i]; \
+	} \
+} while(0)
+#define ENQUEUE_16B() do { \
+	rte_xmm_t *ring = (void *)r->ring; \
+	const rte_xmm_t *objs = obj_table; \
+	const uint32_t size = r->prod.size; \
+	uint32_t idx = prod_head & mask; \
+	if (likely(idx + n < size)) { \
+		for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
+			ring[idx] = objs[i]; \
+			ring[idx+1] = objs[i+1]; \
+			ring[idx+2] = objs[i+2]; \
+			ring[idx+3] = objs[i+3]; \
+		} \
+		switch (n & 0x3) { \
+			case 3: ring[idx++] = objs[i++]; \
+			case 2: ring[idx++] = objs[i++]; \
+			case 1: ring[idx++] = objs[i++]; \
+		} \
+	} else { \
+		for (i = 0; idx < size; i++, idx++)\
+			ring[idx] = objs[i]; \
+		for (idx = 0; i < n; i++, idx++) \
+			ring[idx] = objs[i]; \
 	} \
 } while(0)
 
@@ -380,25 +408,50 @@  void rte_common_ring_dump(FILE *f, const struct rte_ring *r);
  * Placed here since identical code needed in both
  * single and multi consumer dequeue functions */
 #define DEQUEUE_PTRS() do { \
+	void **objs = obj_table; \
+	uint32_t idx = cons_head & mask; \
+	const uint32_t size = r->cons.size; \
+	if (likely(idx + n < size)) { \
+		for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
+			objs[i] = r->ring[idx]; \
+			objs[i+1] = r->ring[idx+1]; \
+			objs[i+2] = r->ring[idx+2]; \
+			objs[i+3] = r->ring[idx+3]; \
+		} \
+		switch (n & 0x3) { \
+			case 3: objs[i++] = r->ring[idx++]; \
+			case 2: objs[i++] = r->ring[idx++]; \
+			case 1: objs[i++] = r->ring[idx++]; \
+		} \
+	} else { \
+		for (i = 0; idx < size; i++, idx++) \
+			objs[i] = r->ring[idx]; \
+		for (idx = 0; i < n; i++, idx++) \
+			objs[i] = r->ring[idx]; \
+	} \
+} while (0)
+#define DEQUEUE_16B() do { \
+	rte_xmm_t *ring = (void *)r->ring; \
+	rte_xmm_t *objs = obj_table; \
 	uint32_t idx = cons_head & mask; \
 	const uint32_t size = r->cons.size; \
 	if (likely(idx + n < size)) { \
 		for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
-			obj_table[i] = r->ring[idx]; \
-			obj_table[i+1] = r->ring[idx+1]; \
-			obj_table[i+2] = r->ring[idx+2]; \
-			obj_table[i+3] = r->ring[idx+3]; \
+			objs[i] = ring[idx]; \
+			objs[i+1] = ring[idx+1]; \
+			objs[i+2] = ring[idx+2]; \
+			objs[i+3] = ring[idx+3]; \
 		} \
 		switch (n & 0x3) { \
-			case 3: obj_table[i++] = r->ring[idx++]; \
-			case 2: obj_table[i++] = r->ring[idx++]; \
-			case 1: obj_table[i++] = r->ring[idx++]; \
+			case 3: objs[i++] = ring[idx++]; \
+			case 2: objs[i++] = ring[idx++]; \
+			case 1: objs[i++] = ring[idx++]; \
 		} \
 	} else { \
 		for (i = 0; idx < size; i++, idx++) \
-			obj_table[i] = r->ring[idx]; \
+			objs[i] = ring[idx]; \
 		for (idx = 0; i < n; i++, idx++) \
-			obj_table[i] = r->ring[idx]; \
+			objs[i] = ring[idx]; \
 	} \
 } while (0)
 
@@ -428,8 +481,9 @@  void rte_common_ring_dump(FILE *f, const struct rte_ring *r);
  *   - n: Actual number of objects enqueued.
  */
 static inline int __attribute__((always_inline))
-__rte_common_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
-			 unsigned n, enum rte_ring_queue_behavior behavior)
+__rte_common_ring_mp_do_enqueue(struct rte_ring *r, const void *obj_table,
+			 unsigned n, unsigned int elem_sz,
+			 enum rte_ring_queue_behavior behavior)
 {
 	uint32_t prod_head, prod_next;
 	uint32_t cons_tail, free_entries;
@@ -480,7 +534,10 @@  __rte_common_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
 	} while (unlikely(success == 0));
 
 	/* write entries in ring */
-	ENQUEUE_PTRS();
+	switch (elem_sz) {
+		case sizeof(void *): ENQUEUE_PTRS(); break;
+		case sizeof(rte_xmm_t): ENQUEUE_16B(); break;
+	}
 	rte_smp_wmb();
 
 	/* if we exceed the watermark */
@@ -537,8 +594,9 @@  __rte_common_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
  *   - n: Actual number of objects enqueued.
  */
 static inline int __attribute__((always_inline))
-__rte_common_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
-			 unsigned n, enum rte_ring_queue_behavior behavior)
+__rte_common_ring_sp_do_enqueue(struct rte_ring *r, const void *obj_table,
+			 unsigned n, unsigned int elem_sz,
+			 enum rte_ring_queue_behavior behavior)
 {
 	uint32_t prod_head, cons_tail;
 	uint32_t prod_next, free_entries;
@@ -575,7 +633,10 @@  __rte_common_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
 	r->prod.head = prod_next;
 
 	/* write entries in ring */
-	ENQUEUE_PTRS();
+	switch (elem_sz) {
+		case sizeof(void *): ENQUEUE_PTRS(); break;
+		case sizeof(rte_xmm_t): ENQUEUE_16B(); break;
+	}
 	rte_smp_wmb();
 
 	/* if we exceed the watermark */
@@ -621,8 +682,9 @@  __rte_common_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
  */
 
 static inline int __attribute__((always_inline))
-__rte_common_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
-		 unsigned n, enum rte_ring_queue_behavior behavior)
+__rte_common_ring_mc_do_dequeue(struct rte_ring *r, void *obj_table,
+		 unsigned n, unsigned int elem_sz,
+		 enum rte_ring_queue_behavior behavior)
 {
 	uint32_t cons_head, prod_tail;
 	uint32_t cons_next, entries;
@@ -671,7 +733,10 @@  __rte_common_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
 	} while (unlikely(success == 0));
 
 	/* copy in table */
-	DEQUEUE_PTRS();
+	switch (elem_sz) {
+		case sizeof(void *): DEQUEUE_PTRS(); break;
+		case sizeof(rte_xmm_t): DEQUEUE_16B(); break;
+	}
 	rte_smp_rmb();
 
 	/*
@@ -720,8 +785,9 @@  __rte_common_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
  *   - n: Actual number of objects dequeued.
  */
 static inline int __attribute__((always_inline))
-__rte_common_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
-		 unsigned n, enum rte_ring_queue_behavior behavior)
+__rte_common_ring_sc_do_dequeue(struct rte_ring *r, void *obj_table,
+		 unsigned n, unsigned int elem_sz,
+		 enum rte_ring_queue_behavior behavior)
 {
 	uint32_t cons_head, prod_tail;
 	uint32_t cons_next, entries;
@@ -755,7 +821,10 @@  __rte_common_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
 	r->cons.head = cons_next;
 
 	/* copy in table */
-	DEQUEUE_PTRS();
+	switch (elem_sz) {
+		case sizeof(void *): DEQUEUE_PTRS(); break;
+		case sizeof(rte_xmm_t): DEQUEUE_16B(); break;
+	}
 	rte_smp_rmb();
 
 	__RING_STAT_ADD(r, deq_success, n);
diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
index 16ddc39..0fd20f3 100644
--- a/lib/librte_ring/rte_ring.c
+++ b/lib/librte_ring/rte_ring.c
@@ -36,14 +36,14 @@ 
 ssize_t
 rte_ring_get_memsize(unsigned count)
 {
-	return rte_common_ring_get_memsize(count);
+	return rte_common_ring_get_memsize(count, sizeof(void *));
 }
 
 int
 rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
 	unsigned flags)
 {
-	return rte_common_ring_init(r, name, count, flags);
+	return rte_common_ring_init(r, name, count, flags, sizeof(void *));
 }
 
 
@@ -51,7 +51,8 @@  struct rte_ring *
 rte_ring_create(const char *name, unsigned count, int socket_id,
 		unsigned flags)
 {
-	return rte_common_ring_create(name, count, socket_id, flags);
+	return rte_common_ring_create(name, count, socket_id, flags,
+			sizeof(void *));
 }
 
 void
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 993796f..3b934da 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -215,7 +215,7 @@  static inline int __attribute__((always_inline))
 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 			 unsigned n)
 {
-	return __rte_common_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+	return __rte_common_ring_mp_do_enqueue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_FIXED);
 }
 
 /**
@@ -237,7 +237,7 @@  static inline int __attribute__((always_inline))
 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 			 unsigned n)
 {
-	return __rte_common_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+	return __rte_common_ring_sp_do_enqueue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_FIXED);
 }
 
 /**
@@ -356,7 +356,7 @@  rte_ring_enqueue(struct rte_ring *r, void *obj)
 static inline int __attribute__((always_inline))
 rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
 {
-	return __rte_common_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+	return __rte_common_ring_mc_do_dequeue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_FIXED);
 }
 
 /**
@@ -377,7 +377,7 @@  rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
 static inline int __attribute__((always_inline))
 rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
 {
-	return __rte_common_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+	return __rte_common_ring_sc_do_dequeue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_FIXED);
 }
 
 /**
@@ -568,7 +568,7 @@  static inline unsigned __attribute__((always_inline))
 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 			 unsigned n)
 {
-	return __rte_common_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+	return __rte_common_ring_mp_do_enqueue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_VARIABLE);
 }
 
 /**
@@ -587,7 +587,7 @@  static inline unsigned __attribute__((always_inline))
 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 			 unsigned n)
 {
-	return __rte_common_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+	return __rte_common_ring_sp_do_enqueue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_VARIABLE);
 }
 
 /**
@@ -636,7 +636,7 @@  rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 static inline unsigned __attribute__((always_inline))
 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
 {
-	return __rte_common_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+	return __rte_common_ring_mc_do_dequeue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_VARIABLE);
 }
 
 /**
@@ -656,7 +656,7 @@  rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
 static inline unsigned __attribute__((always_inline))
 rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
 {
-	return __rte_common_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+	return __rte_common_ring_sc_do_dequeue(r, obj_table, n, sizeof(void *), RTE_RING_QUEUE_VARIABLE);
 }
 
 /**