@@ -45,9 +45,9 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
/* return the size of memory occupied by a ring */
ssize_t
-rte_ring_get_memsize(unsigned count)
+rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags)
{
- ssize_t sz;
+ ssize_t sz, elt_sz;
/* count must be a power of 2 */
if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
@@ -57,10 +57,23 @@ rte_ring_get_memsize(unsigned count)
return -EINVAL;
}
- sz = sizeof(struct rte_ring) + count * sizeof(void *);
+ elt_sz = (flags & RING_F_NB) ? 2 * sizeof(void *) : sizeof(void *);
+
+ sz = sizeof(struct rte_ring) + count * elt_sz;
sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
return sz;
}
+BIND_DEFAULT_SYMBOL(rte_ring_get_memsize, _v1905, 19.05);
+MAP_STATIC_SYMBOL(ssize_t rte_ring_get_memsize(unsigned int count,
+ unsigned int flags),
+ rte_ring_get_memsize_v1905);
+
+ssize_t
+rte_ring_get_memsize_v20(unsigned int count)
+{
+ return rte_ring_get_memsize_v1905(count, 0);
+}
+VERSION_SYMBOL(rte_ring_get_memsize, _v20, 2.0);
int
rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
@@ -82,8 +95,6 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
if (ret < 0 || ret >= (int)sizeof(r->name))
return -ENAMETOOLONG;
r->flags = flags;
- r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
- r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
if (flags & RING_F_EXACT_SZ) {
r->size = rte_align32pow2(count + 1);
@@ -100,8 +111,30 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
r->mask = count - 1;
r->capacity = r->mask;
}
- r->prod.head = r->cons.head = 0;
- r->prod.tail = r->cons.tail = 0;
+
+ if (flags & RING_F_NB) {
+ uint64_t i;
+
+ r->prod_64.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
+ r->cons_64.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
+ r->prod_64.head = r->cons_64.head = 0;
+ r->prod_64.tail = r->cons_64.tail = 0;
+
+ for (i = 0; i < r->size; i++) {
+ struct nb_ring_entry *ring_ptr, *base;
+
+ base = ((struct nb_ring_entry *)&r[1]);
+
+ ring_ptr = &base[i & r->mask];
+
+ ring_ptr->cnt = i;
+ }
+ } else {
+ r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
+ r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
+ r->prod.head = r->cons.head = 0;
+ r->prod.tail = r->cons.tail = 0;
+ }
return 0;
}
@@ -123,11 +156,19 @@ rte_ring_create(const char *name, unsigned count, int socket_id,
ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
+#if !defined(RTE_ARCH_X86_64)
+ if (flags & RING_F_NB) {
+ printf("RING_F_NB is only supported on x86-64 platforms\n");
+ rte_errno = EINVAL;
+ return NULL;
+ }
+#endif
+
/* for an exact size ring, round up from count to a power of two */
if (flags & RING_F_EXACT_SZ)
count = rte_align32pow2(count + 1);
- ring_size = rte_ring_get_memsize(count);
+ ring_size = rte_ring_get_memsize(count, flags);
if (ring_size < 0) {
rte_errno = ring_size;
return NULL;
@@ -227,10 +268,17 @@ rte_ring_dump(FILE *f, const struct rte_ring *r)
fprintf(f, " flags=%x\n", r->flags);
fprintf(f, " size=%"PRIu32"\n", r->size);
fprintf(f, " capacity=%"PRIu32"\n", r->capacity);
- fprintf(f, " ct=%"PRIu32"\n", r->cons.tail);
- fprintf(f, " ch=%"PRIu32"\n", r->cons.head);
- fprintf(f, " pt=%"PRIu32"\n", r->prod.tail);
- fprintf(f, " ph=%"PRIu32"\n", r->prod.head);
+ if (r->flags & RING_F_NB) {
+ fprintf(f, " ct=%"PRIu64"\n", r->cons_64.tail);
+ fprintf(f, " ch=%"PRIu64"\n", r->cons_64.head);
+ fprintf(f, " pt=%"PRIu64"\n", r->prod_64.tail);
+ fprintf(f, " ph=%"PRIu64"\n", r->prod_64.head);
+ } else {
+ fprintf(f, " ct=%"PRIu32"\n", r->cons.tail);
+ fprintf(f, " ch=%"PRIu32"\n", r->cons.head);
+ fprintf(f, " pt=%"PRIu32"\n", r->prod.tail);
+ fprintf(f, " ph=%"PRIu32"\n", r->prod.head);
+ }
fprintf(f, " used=%u\n", rte_ring_count(r));
fprintf(f, " avail=%u\n", rte_ring_free_count(r));
}
@@ -20,12 +20,16 @@
*
* - FIFO (First In First Out)
* - Maximum size is fixed; the pointers are stored in a table.
- * - Lockless implementation.
+ * - Lockless (and optionally, non-blocking) implementation.
* - Multi- or single-consumer dequeue.
* - Multi- or single-producer enqueue.
* - Bulk dequeue.
* - Bulk enqueue.
*
+ * The non-blocking ring algorithm is based on the original rte ring (derived
+ * from FreeBSD's bufring.h) and inspired by Michael and Scott's non-blocking
+ * concurrent queue.
+ *
* Note: the ring implementation is not preemptible. Refer to Programmer's
* guide/Environment Abstraction Layer/Multiple pthread/Known Issues/rte_ring
* for more information.
@@ -134,6 +138,18 @@ struct rte_ring {
*/
#define RING_F_EXACT_SZ 0x0004
#define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */
+/**
+ * The ring uses non-blocking enqueue and dequeue functions. These functions
+ * do not have the "non-preemptive" constraint of a regular rte ring, and thus
+ * are suited for applications using preemptible pthreads. However, the
+ * non-blocking functions have worse average-case performance than their
+ * regular rte ring counterparts. When used as the handler for a mempool,
+ * per-thread caching can mitigate the performance difference by reducing the
+ * number (and contention) of ring accesses.
+ *
+ * This flag is only supported on x86_64 platforms.
+ */
+#define RING_F_NB 0x0008
/* @internal defines for passing to the enqueue dequeue worker functions */
#define __IS_SP 1
@@ -151,11 +167,15 @@ struct rte_ring {
*
* @param count
* The number of elements in the ring (must be a power of 2).
+ * @param flags
+ * The flags the ring will be created with.
* @return
* - The memory size needed for the ring on success.
* - -EINVAL if count is not a power of 2.
*/
-ssize_t rte_ring_get_memsize(unsigned count);
+ssize_t rte_ring_get_memsize(unsigned int count, unsigned int flags);
+ssize_t rte_ring_get_memsize_v20(unsigned int count);
+ssize_t rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags);
/**
* Initialize a ring structure.
@@ -188,6 +208,10 @@ ssize_t rte_ring_get_memsize(unsigned count);
* - RING_F_SC_DEQ: If this flag is set, the default behavior when
* using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
* is "single-consumer". Otherwise, it is "multi-consumers".
+ * - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2
+ * number, but up to half the ring space may be wasted.
+ * - RING_F_NB: (x86_64 only) If this flag is set, the ring uses
+ * non-blocking variants of the dequeue and enqueue functions.
* @return
* 0 on success, or a negative value on error.
*/
@@ -223,12 +247,17 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
* - RING_F_SC_DEQ: If this flag is set, the default behavior when
* using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
* is "single-consumer". Otherwise, it is "multi-consumers".
+ * - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2
+ * number, but up to half the ring space may be wasted.
+ * - RING_F_NB: (x86_64 only) If this flag is set, the ring uses
+ * non-blocking variants of the dequeue and enqueue functions.
* @return
* On success, the pointer to the new allocated ring. NULL on error with
* rte_errno set appropriately. Possible errno values include:
* - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
* - E_RTE_SECONDARY - function was called from a secondary process instance
- * - EINVAL - count provided is not a power of 2
+ * - EINVAL - count provided is not a power of 2, or RING_F_NB is used on an
+ * unsupported platform
* - ENOSPC - the maximum number of memzones has already been allocated
* - EEXIST - a memzone with the same name already exists
* - ENOMEM - no appropriate memory area found in which to create memzone
@@ -284,6 +313,50 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
} \
} while (0)
+/* The actual enqueue of pointers on the ring.
+ * Used only by the single-producer non-blocking enqueue function, but
+ * out-lined here for code readability.
+ */
+#define ENQUEUE_PTRS_NB(r, ring_start, prod_head, obj_table, n) do { \
+ unsigned int i; \
+ const uint32_t size = (r)->size; \
+ size_t idx = prod_head & (r)->mask; \
+ size_t new_cnt = prod_head + size; \
+ struct nb_ring_entry *ring = (struct nb_ring_entry *)ring_start; \
+ if (likely(idx + n < size)) { \
+ for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4) { \
+ ring[idx].ptr = obj_table[i]; \
+ ring[idx].cnt = new_cnt + i; \
+ ring[idx + 1].ptr = obj_table[i + 1]; \
+ ring[idx + 1].cnt = new_cnt + i + 1; \
+ ring[idx + 2].ptr = obj_table[i + 2]; \
+ ring[idx + 2].cnt = new_cnt + i + 2; \
+ ring[idx + 3].ptr = obj_table[i + 3]; \
+ ring[idx + 3].cnt = new_cnt + i + 3; \
+ } \
+ switch (n & 0x3) { \
+ case 3: \
+ ring[idx].cnt = new_cnt + i; \
+ ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \
+ case 2: \
+ ring[idx].cnt = new_cnt + i; \
+ ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \
+ case 1: \
+ ring[idx].cnt = new_cnt + i; \
+ ring[idx++].ptr = obj_table[i++]; \
+ } \
+ } else { \
+ for (i = 0; idx < size; i++, idx++) { \
+ ring[idx].cnt = new_cnt + i; \
+ ring[idx].ptr = obj_table[i]; \
+ } \
+ for (idx = 0; i < n; i++, idx++) { \
+ ring[idx].cnt = new_cnt + i; \
+ ring[idx].ptr = obj_table[i]; \
+ } \
+ } \
+} while (0)
+
/* the actual copy of pointers on the ring to obj_table.
* Placed here since identical code needed in both
* single and multi consumer dequeue functions */
@@ -315,6 +388,45 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
} \
} while (0)
+/* The actual copy of pointers on the ring to obj_table.
+ * Placed here since identical code needed in both
+ * single and multi consumer non-blocking dequeue functions.
+ */
+#define DEQUEUE_PTRS_NB(r, ring_start, cons_head, obj_table, n) do { \
+ unsigned int i; \
+ size_t idx = cons_head & (r)->mask; \
+ const uint32_t size = (r)->size; \
+ struct nb_ring_entry *ring = (struct nb_ring_entry *)ring_start; \
+ if (likely(idx + n < size)) { \
+ for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4) {\
+ obj_table[i] = ring[idx].ptr; \
+ obj_table[i + 1] = ring[idx + 1].ptr; \
+ obj_table[i + 2] = ring[idx + 2].ptr; \
+ obj_table[i + 3] = ring[idx + 3].ptr; \
+ } \
+ switch (n & 0x3) { \
+ case 3: \
+ obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \
+ case 2: \
+ obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \
+ case 1: \
+ obj_table[i++] = ring[idx++].ptr; \
+ } \
+ } else { \
+ for (i = 0; idx < size; i++, idx++) \
+ obj_table[i] = ring[idx].ptr; \
+ for (idx = 0; i < n; i++, idx++) \
+ obj_table[i] = ring[idx].ptr; \
+ } \
+} while (0)
+
+
+/* @internal 128-bit structure used by the non-blocking ring */
+struct nb_ring_entry {
+ void *ptr; /**< Data pointer */
+ uint64_t cnt; /**< Modification counter */
+};
+
/* Between load and load. there might be cpu reorder in weak model
* (powerpc/arm).
* There are 2 choices for the users
@@ -331,6 +443,70 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
#endif
/**
+ * @internal Enqueue several objects on the non-blocking ring
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param is_sp
+ * Indicates whether to use single producer or multi-producer head update
+ * @param free_space
+ * returns the amount of space after the enqueue operation has finished
+ * @return
+ * Actual number of objects enqueued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_enqueue(struct rte_ring *r, void * const *obj_table,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ unsigned int is_sp, unsigned int *free_space)
+{
+ if (is_sp)
+ return __rte_ring_do_nb_enqueue_sp(r, obj_table, n,
+ behavior, free_space);
+ else
+ return __rte_ring_do_nb_enqueue_mp(r, obj_table, n,
+ behavior, free_space);
+}
+
+/**
+ * @internal Dequeue several objects from the non-blocking ring
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to pull from the ring.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ * returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ * - Actual number of objects dequeued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_dequeue(struct rte_ring *r, void **obj_table,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ unsigned int is_sc, unsigned int *available)
+{
+ if (is_sc)
+ return __rte_ring_do_nb_dequeue_sc(r, obj_table, n,
+ behavior, available);
+ else
+ return __rte_ring_do_nb_dequeue_mc(r, obj_table, n,
+ behavior, available);
+}
+
+/**
* @internal Enqueue several objects on the ring
*
* @param r
@@ -437,8 +613,14 @@ static __rte_always_inline unsigned int
rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
- return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_MP, free_space);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_MP,
+ free_space);
+ else
+ return __rte_ring_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_MP,
+ free_space);
}
/**
@@ -460,8 +642,14 @@ static __rte_always_inline unsigned int
rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
- return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_SP, free_space);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_SP,
+ free_space);
+ else
+ return __rte_ring_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_SP,
+ free_space);
}
/**
@@ -487,8 +675,14 @@ static __rte_always_inline unsigned int
rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
- return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- r->prod.single, free_space);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED,
+ r->prod_64.single, free_space);
+ else
+ return __rte_ring_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED,
+ r->prod.single, free_space);
}
/**
@@ -571,8 +765,14 @@ static __rte_always_inline unsigned int
rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
- return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_MC, available);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_MC,
+ available);
+ else
+ return __rte_ring_do_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_MC,
+ available);
}
/**
@@ -595,8 +795,14 @@ static __rte_always_inline unsigned int
rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
- return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_SC, available);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_SC,
+ available);
+ else
+ return __rte_ring_do_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_SC,
+ available);
}
/**
@@ -622,8 +828,14 @@ static __rte_always_inline unsigned int
rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
unsigned int *available)
{
- return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- r->cons.single, available);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED,
+ r->cons_64.single, available);
+ else
+ return __rte_ring_do_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED,
+ r->cons.single, available);
}
/**
@@ -698,9 +910,13 @@ rte_ring_dequeue(struct rte_ring *r, void **obj_p)
static inline unsigned
rte_ring_count(const struct rte_ring *r)
{
- uint32_t prod_tail = r->prod.tail;
- uint32_t cons_tail = r->cons.tail;
- uint32_t count = (prod_tail - cons_tail) & r->mask;
+ uint32_t count;
+
+ if (r->flags & RING_F_NB)
+ count = (r->prod_64.tail - r->cons_64.tail) & r->mask;
+ else
+ count = (r->prod.tail - r->cons.tail) & r->mask;
+
return (count > r->capacity) ? r->capacity : count;
}
@@ -820,8 +1036,14 @@ static __rte_always_inline unsigned
rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
- return __rte_ring_do_enqueue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_MP, free_space);
+ else
+ return __rte_ring_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_MP, free_space);
}
/**
@@ -843,8 +1065,14 @@ static __rte_always_inline unsigned
rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
- return __rte_ring_do_enqueue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_SP, free_space);
+ else
+ return __rte_ring_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_SP, free_space);
}
/**
@@ -870,8 +1098,14 @@ static __rte_always_inline unsigned
rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
- return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE,
- r->prod.single, free_space);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ r->prod_64.single, free_space);
+ else
+ return __rte_ring_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ r->prod.single, free_space);
}
/**
@@ -898,8 +1132,14 @@ static __rte_always_inline unsigned
rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
- return __rte_ring_do_dequeue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_MC, available);
+ else
+ return __rte_ring_do_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_MC, available);
}
/**
@@ -923,8 +1163,14 @@ static __rte_always_inline unsigned
rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
- return __rte_ring_do_dequeue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_SC, available);
+ else
+ return __rte_ring_do_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_SC, available);
}
/**
@@ -950,9 +1196,14 @@ static __rte_always_inline unsigned
rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
- return __rte_ring_do_dequeue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE,
- r->cons.single, available);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ r->cons_64.single, available);
+ else
+ return __rte_ring_do_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ r->cons.single, available);
}
#ifdef __cplusplus
@@ -221,8 +221,8 @@ __rte_ring_move_prod_head_64(struct rte_ring *r, unsigned int is_sp,
/* Ensure the head is read before tail */
__atomic_thread_fence(__ATOMIC_ACQUIRE);
- /* load-acquire synchronize with store-release of ht->tail
- * in update_tail.
+ /* load-acquire synchronize with store-release of tail in
+ * do_nb_dequeue_{sc, mc}.
*/
cons_tail = __atomic_load_n(&r->cons_64.tail,
__ATOMIC_ACQUIRE);
@@ -252,6 +252,7 @@ __rte_ring_move_prod_head_64(struct rte_ring *r, unsigned int is_sp,
0, __ATOMIC_RELAXED,
__ATOMIC_RELAXED);
} while (unlikely(success == 0));
+
return n;
}
@@ -298,8 +299,8 @@ __rte_ring_move_cons_head_64(struct rte_ring *r, unsigned int is_sc,
/* Ensure the head is read before tail */
__atomic_thread_fence(__ATOMIC_ACQUIRE);
- /* this load-acquire synchronize with store-release of ht->tail
- * in update_tail.
+ /* load-acquire synchronize with store-release of tail in
+ * do_nb_enqueue_{sp, mp}.
*/
prod_tail = __atomic_load_n(&r->prod_64.tail,
__ATOMIC_ACQUIRE);
@@ -328,6 +329,279 @@ __rte_ring_move_cons_head_64(struct rte_ring *r, unsigned int is_sc,
0, __ATOMIC_RELAXED,
__ATOMIC_RELAXED);
} while (unlikely(success == 0));
+
+ return n;
+}
+
+/**
+ * @internal
+ * Enqueue several objects on the non-blocking ring (single-producer only)
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ * returns the amount of space after the enqueue operation has finished
+ * @return
+ * Actual number of objects enqueued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_enqueue_sp(struct rte_ring *r, void * const *obj_table,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ unsigned int *free_space)
+{
+ uint32_t free_entries;
+ uint64_t head, next;
+
+ n = __rte_ring_move_prod_head_64(r, 1, n, behavior,
+ &head, &next, &free_entries);
+ if (n == 0)
+ goto end;
+
+ ENQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
+
+ __atomic_store_n(&r->prod_64.tail,
+ r->prod_64.tail + n,
+ __ATOMIC_RELEASE);
+end:
+ if (free_space != NULL)
+ *free_space = free_entries - n;
+ return n;
+}
+
+/**
+ * @internal
+ * Enqueue several objects on the non-blocking ring (multi-producer safe)
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ * returns the amount of space after the enqueue operation has finished
+ * @return
+ * Actual number of objects enqueued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_enqueue_mp(struct rte_ring *r, void * const *obj_table,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ unsigned int *free_space)
+{
+#if !defined(RTE_ARCH_X86_64) || !defined(ALLOW_EXPERIMENTAL_API)
+ RTE_SET_USED(r);
+ RTE_SET_USED(obj_table);
+ RTE_SET_USED(n);
+ RTE_SET_USED(behavior);
+ RTE_SET_USED(free_space);
+#ifndef ALLOW_EXPERIMENTAL_API
+ printf("[%s()] RING_F_NB requires an experimental API."
+ " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n"
+ , __func__);
+#endif
+ return 0;
+#endif
+#if defined(RTE_ARCH_X86_64) && defined(ALLOW_EXPERIMENTAL_API)
+ uint64_t head, next, tail;
+ uint32_t free_entries;
+ unsigned int i;
+
+ n = __rte_ring_move_prod_head_64(r, 0, n, behavior,
+ &head, &next, &free_entries);
+ if (n == 0)
+ goto end;
+
+ tail = __atomic_load_n(&r->prod_64.tail, __ATOMIC_RELAXED);
+
+ for (i = 0; i < n; /* i incremented if enqueue succeeds */) {
+ struct nb_ring_entry old_value, new_value;
+ struct nb_ring_entry *base, *ring_ptr;
+
+ /* Enqueue to the tail entry. If another thread wins the race,
+ * retry with the new tail.
+ */
+ base = (struct nb_ring_entry *)&r[1];
+
+ ring_ptr = &base[tail & r->mask];
+
+ old_value = *ring_ptr;
+
+ /* If the tail entry's modification counter doesn't match the
+ * producer tail index, it's already been updated.
+ *
+ * Attempt to update the tail here, so this thread doesn't
+ * depend on the forward progress of the thread that
+ * successfully enqueued.
+ */
+ if (old_value.cnt != tail) {
+ /* Use a release memmodel to ensure the tail entry is
+ * visible to dequeueing threads before updating the
+ * tail. (tail is updated on failure.)
+ */
+ __atomic_compare_exchange_n(&r->prod_64.tail,
+ &tail, tail + 1,
+ 0, __ATOMIC_RELEASE,
+ __ATOMIC_RELAXED);
+ continue;
+ }
+
+ /* Prepare the new entry. The cnt field mitigates the ABA
+ * problem on the ring write.
+ */
+ new_value.ptr = obj_table[i];
+ new_value.cnt = tail + r->size;
+
+ if (rte_atomic128_cmpset((volatile rte_int128_t *)ring_ptr,
+ (rte_int128_t *)&old_value,
+ (rte_int128_t *)&new_value,
+ 0, RTE_ATOMIC_RELAXED,
+ RTE_ATOMIC_RELAXED))
+ i++;
+
+ /* Use a release memmodel to ensure the tail entry is visible
+ * to dequeueing threads before updating the tail. Every thread
+ * attempts the cmpset, so they don't have to wait for the
+ * thread that successfully enqueued to the ring. Using a
+ * 64-bit tail mitigates the ABA problem here. (tail is updated
+ * on failure.)
+ */
+ __atomic_compare_exchange_n(&r->prod_64.tail,
+ &tail, tail + 1,
+ 0, __ATOMIC_RELEASE,
+ __ATOMIC_RELAXED);
+ }
+
+end:
+ if (free_space != NULL)
+ *free_space = free_entries - n;
+ return n;
+#endif
+}
+
+/**
+ * @internal
+ * Dequeue several objects from the non-blocking ring (single-consumer only)
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to pull from the ring.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ * returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ * - Actual number of objects dequeued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_dequeue_sc(struct rte_ring *r, void **obj_table,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ unsigned int *available)
+{
+ uint64_t head, next;
+ uint32_t entries;
+
+ n = __rte_ring_move_cons_head_64(r, 1, n, behavior,
+ &head, &next, &entries);
+ if (n == 0)
+ goto end;
+
+ DEQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
+
+ __atomic_store_n(&r->cons_64.tail,
+ r->cons_64.tail + n,
+ __ATOMIC_RELEASE);
+end:
+ if (available != NULL)
+ *available = entries - n;
+ return n;
+}
+
+/**
+ * @internal
+ * Dequeue several objects from the non-blocking ring (multi-consumer safe)
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to pull from the ring.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ * returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ * - Actual number of objects dequeued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_dequeue_mc(struct rte_ring *r, void **obj_table,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ unsigned int *available)
+{
+ uint64_t head, tail, next;
+ uint32_t entries;
+ int success;
+
+ n = __rte_ring_move_cons_head_64(r, 0, n, behavior,
+ &head, &next, &entries);
+ if (n == 0)
+ goto end;
+
+ /* The acquire-release synchronization on prod_64.tail ensures that
+ * this thread correctly observes the ring entries up to prod_64.tail.
+ * However by the time this thread reads cons_64.tail, or if its CAS
+ * fails, cons_64.tail may have passed the previously read value of
+ * prod_64.tail. Acquire-release synchronization on cons_64.tail is
+ * necessary to ensure that dequeue threads always observe the correct
+ * values of the ring entries.
+ */
+ tail = __atomic_load_n(&r->cons_64.tail, __ATOMIC_ACQUIRE);
+ do {
+ /* Dequeue from the cons tail onwards. If multiple threads read
+ * the same pointers, the thread that successfully performs the
+ * CAS will keep them and the other(s) will retry.
+ */
+ DEQUEUE_PTRS_NB(r, &r[1], tail, obj_table, n);
+
+ next = tail + n;
+
+ /* There is potential for the ABA problem here, but that is
+ * mitigated by the large (64-bit) tail. Use a release memmodel
+ * to ensure the dequeue operations and CAS are properly
+ * ordered. (tail is updated on failure.)
+ */
+ success = __atomic_compare_exchange_n(&r->cons_64.tail,
+ &tail, next,
+ 0, __ATOMIC_RELEASE,
+ __ATOMIC_ACQUIRE);
+ } while (success == 0);
+
+end:
+ if (available != NULL)
+ *available = entries - n;
return n;
}
@@ -306,4 +306,273 @@ __rte_ring_move_cons_head_64(struct rte_ring *r, unsigned int is_sc,
return n;
}
+/**
+ * @internal
+ * Enqueue several objects on the non-blocking ring (single-producer only)
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ * returns the amount of space after the enqueue operation has finished
+ * @return
+ * Actual number of objects enqueued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_enqueue_sp(struct rte_ring *r, void * const *obj_table,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ unsigned int *free_space)
+{
+ uint32_t free_entries;
+ uint64_t head, next;
+
+ n = __rte_ring_move_prod_head_64(r, 1, n, behavior,
+ &head, &next, &free_entries);
+ if (n == 0)
+ goto end;
+
+ ENQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
+
+ rte_smp_wmb();
+
+ r->prod_64.tail += n;
+
+end:
+ if (free_space != NULL)
+ *free_space = free_entries - n;
+ return n;
+}
+
+/**
+ * @internal
+ * Enqueue several objects on the non-blocking ring (multi-producer safe)
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ * returns the amount of space after the enqueue operation has finished
+ * @return
+ * Actual number of objects enqueued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_enqueue_mp(struct rte_ring *r, void * const *obj_table,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ unsigned int *free_space)
+{
+#if !defined(RTE_ARCH_X86_64) || !defined(ALLOW_EXPERIMENTAL_API)
+ RTE_SET_USED(r);
+ RTE_SET_USED(obj_table);
+ RTE_SET_USED(n);
+ RTE_SET_USED(behavior);
+ RTE_SET_USED(free_space);
+#ifndef ALLOW_EXPERIMENTAL_API
+ printf("[%s()] RING_F_NB requires an experimental API."
+ " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n"
+ , __func__);
+#endif
+ return 0;
+#endif
+#if defined(RTE_ARCH_X86_64) && defined(ALLOW_EXPERIMENTAL_API)
+ uint64_t head, next, tail;
+ uint32_t free_entries;
+ unsigned int i;
+
+ n = __rte_ring_move_prod_head_64(r, 0, n, behavior,
+ &head, &next, &free_entries);
+ if (n == 0)
+ goto end;
+
+ for (i = 0; i < n; /* i incremented if enqueue succeeds */) {
+ struct nb_ring_entry old_value, new_value;
+ struct nb_ring_entry *base, *ring_ptr;
+
+ /* Enqueue to the tail entry. If another thread wins the race,
+ * retry with the new tail.
+ */
+ tail = r->prod_64.tail;
+
+ base = (struct nb_ring_entry *)&r[1];
+
+ ring_ptr = &base[tail & r->mask];
+
+ old_value = *ring_ptr;
+
+ /* If the tail entry's modification counter doesn't match the
+ * producer tail index, it's already been updated.
+ *
+ * Attempt to update the tail here, so this thread doesn't
+ * depend on the forward progress of the thread that
+ * successfully enqueued.
+ */
+ if (old_value.cnt != tail) {
+ /* Ensure the tail entry is visible to dequeueing
+ * threads before updating the tail.
+ */
+ rte_smp_wmb();
+
+ rte_atomic64_cmpset(&r->prod_64.tail, tail, tail + 1);
+ continue;
+ }
+
+ /* Prepare the new entry. The cnt field mitigates the ABA
+ * problem on the ring write.
+ */
+ new_value.ptr = obj_table[i];
+ new_value.cnt = tail + r->size;
+
+ if (rte_atomic128_cmpset((volatile rte_int128_t *)ring_ptr,
+ (rte_int128_t *)&old_value,
+ (rte_int128_t *)&new_value,
+ 0, RTE_ATOMIC_RELAXED,
+ RTE_ATOMIC_RELAXED))
+ i++;
+
+ /* Ensure the tail entry is visible to dequeueing threads
+ * before updating the tail.
+ */
+ rte_smp_wmb();
+
+ /* Every thread attempts the cmpset, so they don't have to wait
+ * for the thread that successfully enqueued to the ring.
+ * Using a 64-bit tail mitigates the ABA problem here.
+ */
+ rte_atomic64_cmpset(&r->prod_64.tail, tail, tail + 1);
+ }
+
+end:
+ if (free_space != NULL)
+ *free_space = free_entries - n;
+ return n;
+#endif
+}
+
+/**
+ * @internal
+ * Dequeue several objects from the non-blocking ring (single-consumer only)
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to pull from the ring.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ * returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ * - Actual number of objects dequeued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_dequeue_sc(struct rte_ring *r, void **obj_table,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ unsigned int *available)
+{
+ uint64_t head, next;
+ uint32_t entries;
+
+ n = __rte_ring_move_cons_head_64(r, 1, n, behavior,
+ &head, &next, &entries);
+ if (n == 0)
+ goto end;
+
+ DEQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
+
+ rte_smp_rmb();
+
+ r->cons_64.tail += n;
+
+end:
+ if (available != NULL)
+ *available = entries - n;
+ return n;
+}
+
+/**
+ * @internal
+ * Dequeue several objects from the non-blocking ring (multi-consumer safe)
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to pull from the ring.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ * returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ * - Actual number of objects dequeued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_dequeue_mc(struct rte_ring *r, void **obj_table,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ unsigned int *available)
+{
+ uint64_t head, next;
+ uint32_t entries;
+ int success;
+
+ n = __rte_ring_move_cons_head_64(r, 0, n, behavior,
+ &head, &next, &entries);
+ if (n == 0)
+ goto end;
+
+ do {
+ uint64_t tail = r->cons_64.tail;
+
+ /* Ensure that the correct ring entry values are read by this
+ * thread.
+ */
+ rte_smp_rmb();
+
+ /* Dequeue from the cons tail onwards. If multiple threads read
+ * the same pointers, the thread that successfully performs the
+ * CAS will keep them and the other(s) will retry.
+ */
+ DEQUEUE_PTRS_NB(r, &r[1], tail, obj_table, n);
+
+ next = tail + n;
+
+ /* Ensure the dequeue operations and CAS are properly
+ * ordered.
+ */
+ rte_smp_rmb();
+
+ /* There is potential for the ABA problem here, but that is
+ * mitigated by the large (64-bit) tail.
+ */
+ success = rte_atomic64_cmpset(&r->cons_64.tail, tail, next);
+ } while (success == 0);
+
+end:
+ if (available != NULL)
+ *available = entries - n;
+ return n;
+}
+
#endif /* _RTE_RING_GENERIC_H_ */
@@ -17,3 +17,10 @@ DPDK_2.2 {
rte_ring_free;
} DPDK_2.0;
+
+DPDK_19.05 {
+ global:
+
+ rte_ring_get_memsize;
+
+} DPDK_2.2;