@@ -45,9 +45,9 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
/* return the size of memory occupied by a ring */
ssize_t
-rte_ring_get_memsize(unsigned count)
+rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags)
{
- ssize_t sz;
+ ssize_t sz, elt_sz;
/* count must be a power of 2 */
if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
@@ -57,10 +57,23 @@ rte_ring_get_memsize(unsigned count)
return -EINVAL;
}
- sz = sizeof(struct rte_ring) + count * sizeof(void *);
+ elt_sz = (flags & RING_F_NB) ? 2 * sizeof(void *) : sizeof(void *);
+
+ sz = sizeof(struct rte_ring) + count * elt_sz;
sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
return sz;
}
+BIND_DEFAULT_SYMBOL(rte_ring_get_memsize, _v1905, 19.05);
+MAP_STATIC_SYMBOL(ssize_t rte_ring_get_memsize(unsigned int count,
+ unsigned int flags),
+ rte_ring_get_memsize_v1905);
+
+ssize_t
+rte_ring_get_memsize_v20(unsigned int count)
+{
+ return rte_ring_get_memsize_v1905(count, 0);
+}
+VERSION_SYMBOL(rte_ring_get_memsize, _v20, 2.0);
int
rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
@@ -103,6 +116,20 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
r->prod.head = r->cons.head = 0;
r->prod.tail = r->cons.tail = 0;
+ if (flags & RING_F_NB) {
+ uint64_t i;
+
+ for (i = 0; i < r->size; i++) {
+ struct nb_ring_entry *ring_ptr, *base;
+
+ base = ((struct nb_ring_entry *)&r[1]);
+
+ ring_ptr = &base[i & r->mask];
+
+ ring_ptr->cnt = i;
+ }
+ }
+
return 0;
}
@@ -123,11 +150,19 @@ rte_ring_create(const char *name, unsigned count, int socket_id,
ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
+#if !defined(RTE_ARCH_X86_64)
+ if (flags & RING_F_NB) {
+ printf("RING_F_NB is only supported on x86-64 platforms\n");
+ rte_errno = EINVAL;
+ return NULL;
+ }
+#endif
+
/* for an exact size ring, round up from count to a power of two */
if (flags & RING_F_EXACT_SZ)
count = rte_align32pow2(count + 1);
- ring_size = rte_ring_get_memsize(count);
+ ring_size = rte_ring_get_memsize(count, flags);
if (ring_size < 0) {
rte_errno = ring_size;
return NULL;
@@ -124,6 +124,18 @@ struct rte_ring {
*/
#define RING_F_EXACT_SZ 0x0004
#define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */
+/**
+ * The ring uses non-blocking enqueue and dequeue functions. These functions
+ * do not have the "non-preemptive" constraint of a regular rte ring, and thus
+ * are suited for applications using preemptible pthreads. However, the
+ * non-blocking functions have worse average-case performance than their
+ * regular rte ring counterparts. When used as the handler for a mempool,
+ * per-thread caching can mitigate the performance difference by reducing the
+ * number (and contention) of ring accesses.
+ *
+ * This flag is only supported on x86_64 platforms.
+ */
+#define RING_F_NB 0x0008
/* @internal defines for passing to the enqueue dequeue worker functions */
#define __IS_SP 1
@@ -141,11 +153,15 @@ struct rte_ring {
*
* @param count
* The number of elements in the ring (must be a power of 2).
+ * @param flags
+ * The flags the ring will be created with.
* @return
* - The memory size needed for the ring on success.
* - -EINVAL if count is not a power of 2.
*/
-ssize_t rte_ring_get_memsize(unsigned count);
+ssize_t rte_ring_get_memsize(unsigned int count, unsigned int flags);
+ssize_t rte_ring_get_memsize_v20(unsigned int count);
+ssize_t rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags);
/**
* Initialize a ring structure.
@@ -178,6 +194,10 @@ ssize_t rte_ring_get_memsize(unsigned count);
* - RING_F_SC_DEQ: If this flag is set, the default behavior when
* using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
* is "single-consumer". Otherwise, it is "multi-consumers".
+ * - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2
+ * number, but up to half the ring space may be wasted.
+ * - RING_F_NB: (x86_64 only) If this flag is set, the ring uses
+ * non-blocking variants of the dequeue and enqueue functions.
* @return
* 0 on success, or a negative value on error.
*/
@@ -213,12 +233,17 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
* - RING_F_SC_DEQ: If this flag is set, the default behavior when
* using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
* is "single-consumer". Otherwise, it is "multi-consumers".
+ * - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2
+ * number, but up to half the ring space may be wasted.
+ * - RING_F_NB: (x86_64 only) If this flag is set, the ring uses
+ * non-blocking variants of the dequeue and enqueue functions.
* @return
* On success, the pointer to the new allocated ring. NULL on error with
* rte_errno set appropriately. Possible errno values include:
* - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
* - E_RTE_SECONDARY - function was called from a secondary process instance
- * - EINVAL - count provided is not a power of 2
+ * - EINVAL - count provided is not a power of 2, or RING_F_NB is used on an
+ * unsupported platform
* - ENOSPC - the maximum number of memzones has already been allocated
* - EEXIST - a memzone with the same name already exists
* - ENOMEM - no appropriate memory area found in which to create memzone
@@ -274,6 +299,50 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
} \
} while (0)
+/* The actual enqueue of pointers on the ring.
+ * Used only by the single-producer non-blocking enqueue function, but
+ * out-lined here for code readability.
+ */
+#define ENQUEUE_PTRS_NB(r, ring_start, prod_head, obj_table, n) do { \
+ unsigned int i; \
+ const uint32_t size = (r)->size; \
+ size_t idx = prod_head & (r)->mask; \
+ size_t new_cnt = prod_head + size; \
+ struct nb_ring_entry *ring = (struct nb_ring_entry *)ring_start; \
+ if (likely(idx + n < size)) { \
+ for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4) { \
+ ring[idx].ptr = obj_table[i]; \
+ ring[idx].cnt = new_cnt + i; \
+ ring[idx + 1].ptr = obj_table[i + 1]; \
+ ring[idx + 1].cnt = new_cnt + i + 1; \
+ ring[idx + 2].ptr = obj_table[i + 2]; \
+ ring[idx + 2].cnt = new_cnt + i + 2; \
+ ring[idx + 3].ptr = obj_table[i + 3]; \
+ ring[idx + 3].cnt = new_cnt + i + 3; \
+ } \
+ switch (n & 0x3) { \
+ case 3: \
+ ring[idx].cnt = new_cnt + i; \
+ ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \
+ case 2: \
+ ring[idx].cnt = new_cnt + i; \
+ ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \
+ case 1: \
+ ring[idx].cnt = new_cnt + i; \
+ ring[idx++].ptr = obj_table[i++]; \
+ } \
+ } else { \
+ for (i = 0; idx < size; i++, idx++) { \
+ ring[idx].cnt = new_cnt + i; \
+ ring[idx].ptr = obj_table[i]; \
+ } \
+ for (idx = 0; i < n; i++, idx++) { \
+ ring[idx].cnt = new_cnt + i; \
+ ring[idx].ptr = obj_table[i]; \
+ } \
+ } \
+} while (0)
+
/* the actual copy of pointers on the ring to obj_table.
* Placed here since identical code needed in both
* single and multi consumer dequeue functions */
@@ -305,6 +374,39 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
} \
} while (0)
+/* The actual copy of pointers on the ring to obj_table.
+ * Placed here since identical code needed in both
+ * single and multi consumer non-blocking dequeue functions.
+ */
+#define DEQUEUE_PTRS_NB(r, ring_start, cons_head, obj_table, n) do { \
+ unsigned int i; \
+ size_t idx = cons_head & (r)->mask; \
+ const uint32_t size = (r)->size; \
+ struct nb_ring_entry *ring = (struct nb_ring_entry *)ring_start; \
+ if (likely(idx + n < size)) { \
+ for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4) {\
+ obj_table[i] = ring[idx].ptr; \
+ obj_table[i + 1] = ring[idx + 1].ptr; \
+ obj_table[i + 2] = ring[idx + 2].ptr; \
+ obj_table[i + 3] = ring[idx + 3].ptr; \
+ } \
+ switch (n & 0x3) { \
+ case 3: \
+ obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \
+ case 2: \
+ obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \
+ case 1: \
+ obj_table[i++] = ring[idx++].ptr; \
+ } \
+ } else { \
+ for (i = 0; idx < size; i++, idx++) \
+ obj_table[i] = ring[idx].ptr; \
+ for (idx = 0; i < n; i++, idx++) \
+ obj_table[i] = ring[idx].ptr; \
+ } \
+} while (0)
+
+
/* Between load and load. there might be cpu reorder in weak model
* (powerpc/arm).
* There are 2 choices for the users
@@ -320,6 +422,314 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
#include "rte_ring_generic.h"
#endif
+/* @internal 128-bit structure used by the non-blocking ring */
+struct nb_ring_entry {
+ void *ptr; /**< Data pointer */
+ uint64_t cnt; /**< Modification counter */
+};
+
+/* The non-blocking ring algorithm is based on the original rte ring (derived
+ * from FreeBSD's bufring.h) and inspired by Michael and Scott's non-blocking
+ * concurrent queue.
+ */
+
+/**
+ * @internal
+ * Enqueue several objects on the non-blocking ring (single-producer only)
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ * returns the amount of space after the enqueue operation has finished
+ * @return
+ * Actual number of objects enqueued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_enqueue_sp(struct rte_ring *r, void * const *obj_table,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ unsigned int *free_space)
+{
+ uint32_t free_entries;
+ size_t head, next;
+
+ n = __rte_ring_move_prod_head(r, 1, n, behavior,
+ &head, &next, &free_entries);
+ if (n == 0)
+ goto end;
+
+ ENQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
+
+ r->prod.tail += n;
+
+end:
+ if (free_space != NULL)
+ *free_space = free_entries - n;
+ return n;
+}
+
+/**
+ * @internal
+ * Enqueue several objects on the non-blocking ring (multi-producer safe)
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ * returns the amount of space after the enqueue operation has finished
+ * @return
+ * Actual number of objects enqueued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_enqueue_mp(struct rte_ring *r, void * const *obj_table,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ unsigned int *free_space)
+{
+#ifdef RTE_ARCH_X86_64
+ size_t head, next, tail;
+ uint32_t free_entries;
+ unsigned int i;
+
+ n = __rte_ring_move_prod_head(r, 0, n, behavior,
+ &head, &next, &free_entries);
+ if (n == 0)
+ goto end;
+
+ for (i = 0; i < n; /* i incremented if enqueue succeeds */) {
+ struct nb_ring_entry old_value, new_value;
+ struct nb_ring_entry *ring_ptr;
+
+ /* Enqueue to the tail entry. If another thread wins the race,
+ * retry with the new tail.
+ */
+ tail = r->prod.tail;
+
+ ring_ptr = &((struct nb_ring_entry *)&r[1])[tail & r->mask];
+
+ old_value = *ring_ptr;
+
+ /* If the tail entry's modification counter doesn't match the
+ * producer tail index, it's already been updated.
+ */
+ if (old_value.cnt != tail)
+ continue;
+
+ /* Prepare the new entry. The cnt field mitigates the ABA
+ * problem on the ring write.
+ */
+ new_value.ptr = obj_table[i];
+ new_value.cnt = tail + r->size;
+
+ if (rte_atomic128_cmpset((volatile void *)ring_ptr,
+ (uint64_t *)&old_value,
+ (uint64_t *)&new_value))
+ i++;
+
+ /* Every thread attempts the cmpset, so they don't have to wait
+ * for the thread that successfully enqueued to the ring.
+ * Using a 64-bit tail mitigates the ABA problem here.
+ *
+ * Built-in used to handle variable-sized tail index.
+ */
+ __sync_bool_compare_and_swap(&r->prod.tail, tail, tail + 1);
+ }
+
+end:
+ if (free_space != NULL)
+ *free_space = free_entries - n;
+ return n;
+#else
+ RTE_SET_USED(r);
+ RTE_SET_USED(obj_table);
+ RTE_SET_USED(n);
+ RTE_SET_USED(behavior);
+ RTE_SET_USED(free_space);
+ return 0;
+#endif
+}
+
+/**
+ * @internal Enqueue several objects on the non-blocking ring
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param is_sp
+ * Indicates whether to use single producer or multi-producer head update
+ * @param free_space
+ * returns the amount of space after the enqueue operation has finished
+ * @return
+ * Actual number of objects enqueued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_enqueue(struct rte_ring *r, void * const *obj_table,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ unsigned int is_sp, unsigned int *free_space)
+{
+ if (is_sp)
+ return __rte_ring_do_nb_enqueue_sp(r, obj_table, n,
+ behavior, free_space);
+ else
+ return __rte_ring_do_nb_enqueue_mp(r, obj_table, n,
+ behavior, free_space);
+}
+
+/**
+ * @internal
+ * Dequeue several objects from the non-blocking ring (single-consumer only)
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to pull from the ring.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ * returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ * - Actual number of objects dequeued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_dequeue_sc(struct rte_ring *r, void **obj_table,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ unsigned int *available)
+{
+ size_t head, next;
+ uint32_t entries;
+
+ n = __rte_ring_move_cons_head(r, 1, n, behavior,
+ &head, &next, &entries);
+ if (n == 0)
+ goto end;
+
+ DEQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
+
+ r->cons.tail += n;
+
+end:
+ if (available != NULL)
+ *available = entries - n;
+ return n;
+}
+
+/**
+ * @internal
+ * Dequeue several objects from the non-blocking ring (multi-consumer safe)
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to pull from the ring.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ * returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ * - Actual number of objects dequeued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_dequeue_mc(struct rte_ring *r, void **obj_table,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ unsigned int *available)
+{
+ size_t head, next;
+ uint32_t entries;
+
+ n = __rte_ring_move_cons_head(r, 0, n, behavior,
+ &head, &next, &entries);
+ if (n == 0)
+ goto end;
+
+ while (1) {
+ size_t tail = r->cons.tail;
+
+ /* Dequeue from the cons tail onwards. If multiple threads read
+ * the same pointers, the thread that successfully performs the
+ * CAS will keep them and the other(s) will retry.
+ */
+ DEQUEUE_PTRS_NB(r, &r[1], tail, obj_table, n);
+
+ next = tail + n;
+
+ /* Built-in used to handle variable-sized tail index. */
+ if (__sync_bool_compare_and_swap(&r->cons.tail, tail, next)) {
+ /* There is potential for the ABA problem here, but
+ * that is mitigated by the large (64-bit) tail.
+ */
+ break;
+ }
+ }
+
+end:
+ if (available != NULL)
+ *available = entries - n;
+ return n;
+}
+
+/**
+ * @internal Dequeue several objects from the non-blocking ring
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to pull from the ring.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ * returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ * - Actual number of objects dequeued.
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_nb_dequeue(struct rte_ring *r, void **obj_table,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
+ unsigned int is_sc, unsigned int *available)
+{
+ if (is_sc)
+ return __rte_ring_do_nb_dequeue_sc(r, obj_table, n,
+ behavior, available);
+ else
+ return __rte_ring_do_nb_dequeue_mc(r, obj_table, n,
+ behavior, available);
+}
+
/**
* @internal Enqueue several objects on the ring
*
@@ -427,8 +837,14 @@ static __rte_always_inline unsigned int
rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
- return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_MP, free_space);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_MP,
+ free_space);
+ else
+ return __rte_ring_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_MP,
+ free_space);
}
/**
@@ -450,8 +866,14 @@ static __rte_always_inline unsigned int
rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
- return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_SP, free_space);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_SP,
+ free_space);
+ else
+ return __rte_ring_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_SP,
+ free_space);
}
/**
@@ -477,8 +899,14 @@ static __rte_always_inline unsigned int
rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
- return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- r->prod.single, free_space);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED,
+ r->prod.single, free_space);
+ else
+ return __rte_ring_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED,
+ r->prod.single, free_space);
}
/**
@@ -561,8 +989,14 @@ static __rte_always_inline unsigned int
rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
- return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_MC, available);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_MC,
+ available);
+ else
+ return __rte_ring_do_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_MC,
+ available);
}
/**
@@ -585,8 +1019,14 @@ static __rte_always_inline unsigned int
rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
- return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_SC, available);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_SC,
+ available);
+ else
+ return __rte_ring_do_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED, __IS_SC,
+ available);
}
/**
@@ -612,8 +1052,14 @@ static __rte_always_inline unsigned int
rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
unsigned int *available)
{
- return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- r->cons.single, available);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED,
+ r->cons.single, available);
+ else
+ return __rte_ring_do_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_FIXED,
+ r->cons.single, available);
}
/**
@@ -810,8 +1256,14 @@ static __rte_always_inline unsigned
rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
- return __rte_ring_do_enqueue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_MP, free_space);
+ else
+ return __rte_ring_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_MP, free_space);
}
/**
@@ -833,8 +1285,14 @@ static __rte_always_inline unsigned
rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
- return __rte_ring_do_enqueue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_SP, free_space);
+ else
+ return __rte_ring_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_SP, free_space);
}
/**
@@ -860,8 +1318,14 @@ static __rte_always_inline unsigned
rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
unsigned int n, unsigned int *free_space)
{
- return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE,
- r->prod.single, free_space);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ r->prod.single, free_space);
+ else
+ return __rte_ring_do_enqueue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ r->prod.single, free_space);
}
/**
@@ -888,8 +1352,14 @@ static __rte_always_inline unsigned
rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
- return __rte_ring_do_dequeue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_MC, available);
+ else
+ return __rte_ring_do_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_MC, available);
}
/**
@@ -913,8 +1383,14 @@ static __rte_always_inline unsigned
rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
- return __rte_ring_do_dequeue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_SC, available);
+ else
+ return __rte_ring_do_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ __IS_SC, available);
}
/**
@@ -940,9 +1416,14 @@ static __rte_always_inline unsigned
rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
unsigned int n, unsigned int *available)
{
- return __rte_ring_do_dequeue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE,
- r->cons.single, available);
+ if (r->flags & RING_F_NB)
+ return __rte_ring_do_nb_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ r->cons.single, available);
+ else
+ return __rte_ring_do_dequeue(r, obj_table, n,
+ RTE_RING_QUEUE_VARIABLE,
+ r->cons.single, available);
}
#ifdef __cplusplus
@@ -17,3 +17,10 @@ DPDK_2.2 {
rte_ring_free;
} DPDK_2.0;
+
+DPDK_19.05 {
+ global:
+
+ rte_ring_get_memsize;
+
+} DPDK_2.2;