[RFC,2/6] ring: rework ring layout to allow new sync schemes

Message ID 20200224113515.1744-3-konstantin.ananyev@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers
Series New sync modes for ring |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Ananyev, Konstantin Feb. 24, 2020, 11:35 a.m. UTC
  Change from *single* to *sync_type* to allow different
synchronisation schemes to be applied.
Change layout to make sure that *sync_type* and *tail*
will always reside on same offsets.

Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
---
 app/test/test_pdump.c           |  6 +--
 lib/librte_pdump/rte_pdump.c    |  2 +-
 lib/librte_port/rte_port_ring.c | 12 +++---
 lib/librte_ring/rte_ring.c      |  6 ++-
 lib/librte_ring/rte_ring.h      | 76 ++++++++++++++++++++++++---------
 lib/librte_ring/rte_ring_elem.h |  8 ++--
 6 files changed, 71 insertions(+), 39 deletions(-)
  

Patch

diff --git a/app/test/test_pdump.c b/app/test/test_pdump.c
index ad183184c..6a1180bcb 100644
--- a/app/test/test_pdump.c
+++ b/app/test/test_pdump.c
@@ -57,8 +57,7 @@  run_pdump_client_tests(void)
 	if (ret < 0)
 		return -1;
 	mp->flags = 0x0000;
-	ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(),
-				      RING_F_SP_ENQ | RING_F_SC_DEQ);
+	ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(), 0);
 	if (ring_client == NULL) {
 		printf("rte_ring_create SR0 failed");
 		return -1;
@@ -71,9 +70,6 @@  run_pdump_client_tests(void)
 	}
 	rte_eth_dev_probing_finish(eth_dev);
 
-	ring_client->prod.single = 0;
-	ring_client->cons.single = 0;
-
 	printf("\n***** flags = RTE_PDUMP_FLAG_TX *****\n");
 
 	for (itr = 0; itr < NUM_ITR; itr++) {
diff --git a/lib/librte_pdump/rte_pdump.c b/lib/librte_pdump/rte_pdump.c
index 8a01ac510..65364f2c5 100644
--- a/lib/librte_pdump/rte_pdump.c
+++ b/lib/librte_pdump/rte_pdump.c
@@ -380,7 +380,7 @@  pdump_validate_ring_mp(struct rte_ring *ring, struct rte_mempool *mp)
 		rte_errno = EINVAL;
 		return -1;
 	}
-	if (ring->prod.single || ring->cons.single) {
+	if (rte_ring_prod_single(ring) || rte_ring_cons_single(ring)) {
 		PDUMP_LOG(ERR, "ring with either SP or SC settings"
 		" is not valid for pdump, should have MP and MC settings\n");
 		rte_errno = EINVAL;
diff --git a/lib/librte_port/rte_port_ring.c b/lib/librte_port/rte_port_ring.c
index 47fcdd06a..2f6c050fa 100644
--- a/lib/librte_port/rte_port_ring.c
+++ b/lib/librte_port/rte_port_ring.c
@@ -44,8 +44,8 @@  rte_port_ring_reader_create_internal(void *params, int socket_id,
 	/* Check input parameters */
 	if ((conf == NULL) ||
 		(conf->ring == NULL) ||
-		(conf->ring->cons.single && is_multi) ||
-		(!(conf->ring->cons.single) && !is_multi)) {
+		(rte_ring_cons_single(conf->ring) && is_multi) ||
+		(!rte_ring_cons_single(conf->ring) && !is_multi)) {
 		RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
 		return NULL;
 	}
@@ -171,8 +171,8 @@  rte_port_ring_writer_create_internal(void *params, int socket_id,
 	/* Check input parameters */
 	if ((conf == NULL) ||
 		(conf->ring == NULL) ||
-		(conf->ring->prod.single && is_multi) ||
-		(!(conf->ring->prod.single) && !is_multi) ||
+		(rte_ring_prod_single(conf->ring) && is_multi) ||
+		(!rte_ring_prod_single(conf->ring) && !is_multi) ||
 		(conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
 		RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
 		return NULL;
@@ -440,8 +440,8 @@  rte_port_ring_writer_nodrop_create_internal(void *params, int socket_id,
 	/* Check input parameters */
 	if ((conf == NULL) ||
 		(conf->ring == NULL) ||
-		(conf->ring->prod.single && is_multi) ||
-		(!(conf->ring->prod.single) && !is_multi) ||
+		(rte_ring_prod_single(conf->ring) && is_multi) ||
+		(!rte_ring_prod_single(conf->ring) && !is_multi) ||
 		(conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
 		RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
 		return NULL;
diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
index 77e5de099..fa5733907 100644
--- a/lib/librte_ring/rte_ring.c
+++ b/lib/librte_ring/rte_ring.c
@@ -106,8 +106,10 @@  rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
 	if (ret < 0 || ret >= (int)sizeof(r->name))
 		return -ENAMETOOLONG;
 	r->flags = flags;
-	r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
-	r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
+	r->prod.sync_type = (flags & RING_F_SP_ENQ) ?
+		RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
+	r->cons.sync_type = (flags & RING_F_SC_DEQ) ?
+		RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
 
 	if (flags & RING_F_EXACT_SZ) {
 		r->size = rte_align32pow2(count + 1);
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 18fc5d845..aa8c628eb 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -61,11 +61,22 @@  enum rte_ring_queue_behavior {
 #define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
 			   sizeof(RTE_RING_MZ_PREFIX) + 1)
 
-/* structure to hold a pair of head/tail values and other metadata */
+/** prod/cons sync types */
+enum {
+	RTE_RING_SYNC_MT,     /**< multi-thread safe (default mode) */
+	RTE_RING_SYNC_ST,     /**< single thread only */
+};
+
+/**
+ * structure to hold a pair of head/tail values and other metadata.
+ * used by RTE_RING_SYNC_MT, RTE_RING_SYNC_ST sync types.
+ * Depending on sync_type format of that structure might be different,
+ * but offset for *sync_type* and *tail* values should remain the same.
+ */
 struct rte_ring_headtail {
-	volatile uint32_t head;  /**< Prod/consumer head. */
-	volatile uint32_t tail;  /**< Prod/consumer tail. */
-	uint32_t single;         /**< True if single prod/cons */
+	uint32_t sync_type;                      /**< sync type of prod/cons */
+	volatile uint32_t tail __rte_aligned(8); /**< prod/consumer tail. */
+	volatile uint32_t head;                  /**< prod/consumer head. */
 };
 
 /**
@@ -116,11 +127,10 @@  struct rte_ring {
 #define RING_F_EXACT_SZ 0x0004
 #define RTE_RING_SZ_MASK  (0x7fffffffU) /**< Ring size mask */
 
-/* @internal defines for passing to the enqueue dequeue worker functions */
-#define __IS_SP 1
-#define __IS_MP 0
-#define __IS_SC 1
-#define __IS_MC 0
+#define __IS_SP RTE_RING_SYNC_ST
+#define __IS_MP RTE_RING_SYNC_MT
+#define __IS_SC RTE_RING_SYNC_ST
+#define __IS_MC RTE_RING_SYNC_MT
 
 /**
  * Calculate the memory size needed for a ring
@@ -420,7 +430,7 @@  rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
 	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			__IS_MP, free_space);
+			RTE_RING_SYNC_MT, free_space);
 }
 
 /**
@@ -443,7 +453,7 @@  rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
 	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			__IS_SP, free_space);
+			RTE_RING_SYNC_ST, free_space);
 }
 
 /**
@@ -470,7 +480,7 @@  rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 		      unsigned int n, unsigned int *free_space)
 {
 	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			r->prod.single, free_space);
+			r->prod.sync_type, free_space);
 }
 
 /**
@@ -554,7 +564,7 @@  rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
 	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			__IS_MC, available);
+			RTE_RING_SYNC_MT, available);
 }
 
 /**
@@ -578,7 +588,7 @@  rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
 	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			__IS_SC, available);
+			RTE_RING_SYNC_ST, available);
 }
 
 /**
@@ -605,7 +615,7 @@  rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
 		unsigned int *available)
 {
 	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-				r->cons.single, available);
+				r->cons.sync_type, available);
 }
 
 /**
@@ -777,6 +787,30 @@  rte_ring_get_capacity(const struct rte_ring *r)
 	return r->capacity;
 }
 
+static inline uint32_t
+rte_ring_get_prod_sync_type(const struct rte_ring *r)
+{
+	return r->prod.sync_type;
+}
+
+static inline int
+rte_ring_prod_single(const struct rte_ring *r)
+{
+	return (rte_ring_get_prod_sync_type(r) == RTE_RING_SYNC_ST);
+}
+
+static inline uint32_t
+rte_ring_get_cons_sync_type(const struct rte_ring *r)
+{
+	return r->cons.sync_type;
+}
+
+static inline int
+rte_ring_cons_single(const struct rte_ring *r)
+{
+	return (rte_ring_get_cons_sync_type(r) == RTE_RING_SYNC_ST);
+}
+
 /**
  * Dump the status of all rings on the console
  *
@@ -820,7 +854,7 @@  rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
 	return __rte_ring_do_enqueue(r, obj_table, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
+			RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, free_space);
 }
 
 /**
@@ -843,7 +877,7 @@  rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
 	return __rte_ring_do_enqueue(r, obj_table, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
+			RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, free_space);
 }
 
 /**
@@ -870,7 +904,7 @@  rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 		      unsigned int n, unsigned int *free_space)
 {
 	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE,
-			r->prod.single, free_space);
+			r->prod.sync_type, free_space);
 }
 
 /**
@@ -898,7 +932,7 @@  rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
 	return __rte_ring_do_dequeue(r, obj_table, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
+			RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, available);
 }
 
 /**
@@ -923,7 +957,7 @@  rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
 	return __rte_ring_do_dequeue(r, obj_table, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
+			RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, available);
 }
 
 /**
@@ -951,7 +985,7 @@  rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
 {
 	return __rte_ring_do_dequeue(r, obj_table, n,
 				RTE_RING_QUEUE_VARIABLE,
-				r->cons.single, available);
+				r->cons.sync_type, available);
 }
 
 #ifdef __cplusplus
diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h
index 3976757ed..ff7a28ea5 100644
--- a/lib/librte_ring/rte_ring_elem.h
+++ b/lib/librte_ring/rte_ring_elem.h
@@ -570,7 +570,7 @@  rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
 		unsigned int esize, unsigned int n, unsigned int *free_space)
 {
 	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
-			RTE_RING_QUEUE_FIXED, r->prod.single, free_space);
+			RTE_RING_QUEUE_FIXED, r->prod.sync_type, free_space);
 }
 
 /**
@@ -734,7 +734,7 @@  rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
 		unsigned int esize, unsigned int n, unsigned int *available)
 {
 	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
-			RTE_RING_QUEUE_FIXED, r->cons.single, available);
+			RTE_RING_QUEUE_FIXED, r->cons.sync_type, available);
 }
 
 /**
@@ -902,7 +902,7 @@  rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
 		unsigned int esize, unsigned int n, unsigned int *free_space)
 {
 	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
-			RTE_RING_QUEUE_VARIABLE, r->prod.single, free_space);
+			RTE_RING_QUEUE_VARIABLE, r->prod.sync_type, free_space);
 }
 
 /**
@@ -995,7 +995,7 @@  rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
 {
 	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
 				RTE_RING_QUEUE_VARIABLE,
-				r->cons.single, available);
+				r->cons.sync_type, available);
 }
 
 #ifdef __cplusplus