[v4,1/2] lib/ring: apis to support configurable element size

Message ID 20191009024709.38144-2-honnappa.nagarahalli@arm.com
State Superseded
Delegated to: David Marchand
Headers show
Series
  • [v4,1/2] lib/ring: apis to support configurable element size
Related show

Checks

Context Check Description
ci/iol-mellanox-Performance success Performance Testing PASS
ci/Intel-compilation fail Compilation issues
ci/iol-intel-Performance success Performance Testing PASS
ci/iol-compilation success Compile Testing PASS
ci/checkpatch warning coding style issues

Commit Message

Honnappa Nagarahalli Oct. 9, 2019, 2:47 a.m.
Current APIs assume ring elements to be pointers. However, in many
use cases, the size can be different. Add new APIs to support
configurable ring element sizes.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Reviewed-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
Reviewed-by: Gavin Hu <gavin.hu@arm.com>
Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
---
 lib/librte_ring/Makefile             |   3 +-
 lib/librte_ring/meson.build          |   3 +
 lib/librte_ring/rte_ring.c           |  45 +-
 lib/librte_ring/rte_ring.h           |   1 +
 lib/librte_ring/rte_ring_elem.h      | 946 +++++++++++++++++++++++++++
 lib/librte_ring/rte_ring_version.map |   2 +
 6 files changed, 991 insertions(+), 9 deletions(-)
 create mode 100644 lib/librte_ring/rte_ring_elem.h

Comments

Honnappa Nagarahalli Oct. 11, 2019, 7:21 p.m. | #1
Hi Bruce, Konstantin, Stephen,
	Appreciate if you could provide feedback on this.

Thanks,
Honnappa

> -----Original Message-----
> From: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> Sent: Tuesday, October 8, 2019 9:47 PM
> To: olivier.matz@6wind.com; sthemmin@microsoft.com; jerinj@marvell.com;
> bruce.richardson@intel.com; david.marchand@redhat.com;
> pbhagavatula@marvell.com; konstantin.ananyev@intel.com; Honnappa
> Nagarahalli <Honnappa.Nagarahalli@arm.com>
> Cc: dev@dpdk.org; Dharmik Thakkar <Dharmik.Thakkar@arm.com>; Ruifeng
> Wang (Arm Technology China) <Ruifeng.Wang@arm.com>; Gavin Hu (Arm
> Technology China) <Gavin.Hu@arm.com>
> Subject: [PATCH v4 1/2] lib/ring: apis to support configurable element size
> 
> Current APIs assume ring elements to be pointers. However, in many use cases,
> the size can be different. Add new APIs to support configurable ring element
> sizes.
> 
> Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> Reviewed-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
> Reviewed-by: Gavin Hu <gavin.hu@arm.com>
> Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
> ---
>  lib/librte_ring/Makefile             |   3 +-
>  lib/librte_ring/meson.build          |   3 +
>  lib/librte_ring/rte_ring.c           |  45 +-
>  lib/librte_ring/rte_ring.h           |   1 +
>  lib/librte_ring/rte_ring_elem.h      | 946 +++++++++++++++++++++++++++
>  lib/librte_ring/rte_ring_version.map |   2 +
>  6 files changed, 991 insertions(+), 9 deletions(-)  create mode 100644
> lib/librte_ring/rte_ring_elem.h
> 
> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index
> 21a36770d..515a967bb 100644
> --- a/lib/librte_ring/Makefile
> +++ b/lib/librte_ring/Makefile
> @@ -6,7 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk  # library name  LIB =
> librte_ring.a
> 
> -CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
> +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 -
> DALLOW_EXPERIMENTAL_API
>  LDLIBS += -lrte_eal
> 
>  EXPORT_MAP := rte_ring_version.map
> @@ -18,6 +18,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
> 
>  # install includes
>  SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
> +					rte_ring_elem.h \
>  					rte_ring_generic.h \
>  					rte_ring_c11_mem.h
> 
> diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index
> ab8b0b469..74219840a 100644
> --- a/lib/librte_ring/meson.build
> +++ b/lib/librte_ring/meson.build
> @@ -6,3 +6,6 @@ sources = files('rte_ring.c')  headers = files('rte_ring.h',
>  		'rte_ring_c11_mem.h',
>  		'rte_ring_generic.h')
> +
> +# rte_ring_create_elem and rte_ring_get_memsize_elem are experimental
> +allow_experimental_apis = true
> diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index
> d9b308036..6fed3648b 100644
> --- a/lib/librte_ring/rte_ring.c
> +++ b/lib/librte_ring/rte_ring.c
> @@ -33,6 +33,7 @@
>  #include <rte_tailq.h>
> 
>  #include "rte_ring.h"
> +#include "rte_ring_elem.h"
> 
>  TAILQ_HEAD(rte_ring_list, rte_tailq_entry);
> 
> @@ -46,23 +47,42 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
> 
>  /* return the size of memory occupied by a ring */  ssize_t -
> rte_ring_get_memsize(unsigned count)
> +rte_ring_get_memsize_elem(unsigned count, unsigned esize)
>  {
>  	ssize_t sz;
> 
> +	/* Supported esize values are 4/8/16.
> +	 * Others can be added on need basis.
> +	 */
> +	if ((esize != 4) && (esize != 8) && (esize != 16)) {
> +		RTE_LOG(ERR, RING,
> +			"Unsupported esize value. Supported values are 4, 8
> and 16\n");
> +
> +		return -EINVAL;
> +	}
> +
>  	/* count must be a power of 2 */
>  	if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
>  		RTE_LOG(ERR, RING,
> -			"Requested size is invalid, must be power of 2, and "
> -			"do not exceed the size limit %u\n",
> RTE_RING_SZ_MASK);
> +			"Requested number of elements is invalid, must be "
> +			"power of 2, and do not exceed the limit %u\n",
> +			RTE_RING_SZ_MASK);
> +
>  		return -EINVAL;
>  	}
> 
> -	sz = sizeof(struct rte_ring) + count * sizeof(void *);
> +	sz = sizeof(struct rte_ring) + count * esize;
>  	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
>  	return sz;
>  }
> 
> +/* return the size of memory occupied by a ring */ ssize_t
> +rte_ring_get_memsize(unsigned count) {
> +	return rte_ring_get_memsize_elem(count, sizeof(void *)); }
> +
>  void
>  rte_ring_reset(struct rte_ring *r)
>  {
> @@ -114,10 +134,10 @@ rte_ring_init(struct rte_ring *r, const char *name,
> unsigned count,
>  	return 0;
>  }
> 
> -/* create the ring */
> +/* create the ring for a given element size */
>  struct rte_ring *
> -rte_ring_create(const char *name, unsigned count, int socket_id,
> -		unsigned flags)
> +rte_ring_create_elem(const char *name, unsigned count, unsigned esize,
> +		int socket_id, unsigned flags)
>  {
>  	char mz_name[RTE_MEMZONE_NAMESIZE];
>  	struct rte_ring *r;
> @@ -135,7 +155,7 @@ rte_ring_create(const char *name, unsigned count,
> int socket_id,
>  	if (flags & RING_F_EXACT_SZ)
>  		count = rte_align32pow2(count + 1);
> 
> -	ring_size = rte_ring_get_memsize(count);
> +	ring_size = rte_ring_get_memsize_elem(count, esize);
>  	if (ring_size < 0) {
>  		rte_errno = ring_size;
>  		return NULL;
> @@ -182,6 +202,15 @@ rte_ring_create(const char *name, unsigned count,
> int socket_id,
>  	return r;
>  }
> 
> +/* create the ring */
> +struct rte_ring *
> +rte_ring_create(const char *name, unsigned count, int socket_id,
> +		unsigned flags)
> +{
> +	return rte_ring_create_elem(name, count, sizeof(void *), socket_id,
> +		flags);
> +}
> +
>  /* free the ring */
>  void
>  rte_ring_free(struct rte_ring *r)
> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index
> 2a9f768a1..18fc5d845 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -216,6 +216,7 @@ int rte_ring_init(struct rte_ring *r, const char *name,
> unsigned count,
>   */
>  struct rte_ring *rte_ring_create(const char *name, unsigned count,
>  				 int socket_id, unsigned flags);
> +
>  /**
>   * De-allocate all memory used by the ring.
>   *
> diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h
> new file mode 100644 index 000000000..860f059ad
> --- /dev/null
> +++ b/lib/librte_ring/rte_ring_elem.h
> @@ -0,0 +1,946 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2019 Arm Limited
> + * Copyright (c) 2010-2017 Intel Corporation
> + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
> + * All rights reserved.
> + * Derived from FreeBSD's bufring.h
> + * Used as BSD-3 Licensed with permission from Kip Macy.
> + */
> +
> +#ifndef _RTE_RING_ELEM_H_
> +#define _RTE_RING_ELEM_H_
> +
> +/**
> + * @file
> + * RTE Ring with flexible element size
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdio.h>
> +#include <stdint.h>
> +#include <sys/queue.h>
> +#include <errno.h>
> +#include <rte_common.h>
> +#include <rte_config.h>
> +#include <rte_memory.h>
> +#include <rte_lcore.h>
> +#include <rte_atomic.h>
> +#include <rte_branch_prediction.h>
> +#include <rte_memzone.h>
> +#include <rte_pause.h>
> +
> +#include "rte_ring.h"
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Calculate the memory size needed for a ring with given element size
> + *
> + * This function returns the number of bytes needed for a ring, given
> + * the number of elements in it and the size of the element. This value
> + * is the sum of the size of the structure rte_ring and the size of the
> + * memory needed for storing the elements. The value is aligned to a
> +cache
> + * line size.
> + *
> + * @param count
> + *   The number of elements in the ring (must be a power of 2).
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported.
> + * @return
> + *   - The memory size needed for the ring on success.
> + *   - -EINVAL if count is not a power of 2.
> + */
> +__rte_experimental
> +ssize_t rte_ring_get_memsize_elem(unsigned count, unsigned esize);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Create a new ring named *name* that stores elements with given size.
> + *
> + * This function uses ``memzone_reserve()`` to allocate memory. Then it
> + * calls rte_ring_init() to initialize an empty ring.
> + *
> + * The new ring size is set to *count*, which must be a power of
> + * two. Water marking is disabled by default. The real usable ring size
> + * is *count-1* instead of *count* to differentiate a free ring from an
> + * empty ring.
> + *
> + * The ring is added in RTE_TAILQ_RING list.
> + *
> + * @param name
> + *   The name of the ring.
> + * @param count
> + *   The number of elements in the ring (must be a power of 2).
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported.
> + * @param socket_id
> + *   The *socket_id* argument is the socket identifier in case of
> + *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
> + *   constraint for the reserved zone.
> + * @param flags
> + *   An OR of the following:
> + *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
> + *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
> + *      is "single-producer". Otherwise, it is "multi-producers".
> + *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
> + *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
> + *      is "single-consumer". Otherwise, it is "multi-consumers".
> + * @return
> + *   On success, the pointer to the new allocated ring. NULL on error with
> + *    rte_errno set appropriately. Possible errno values include:
> + *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config
> structure
> + *    - E_RTE_SECONDARY - function was called from a secondary process
> instance
> + *    - EINVAL - count provided is not a power of 2
> + *    - ENOSPC - the maximum number of memzones has already been
> allocated
> + *    - EEXIST - a memzone with the same name already exists
> + *    - ENOMEM - no appropriate memory area found in which to create
> memzone
> + */
> +__rte_experimental
> +struct rte_ring *rte_ring_create_elem(const char *name, unsigned count,
> +				unsigned esize, int socket_id, unsigned flags);
> +
> +/* the actual enqueue of pointers on the ring.
> + * Placed here since identical code needed in both
> + * single and multi producer enqueue functions.
> + */
> +#define ENQUEUE_PTRS_ELEM(r, ring_start, prod_head, obj_table, esize, n)
> do { \
> +	if (esize == 4) \
> +		ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n); \
> +	else if (esize == 8) \
> +		ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n); \
> +	else if (esize == 16) \
> +		ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n); \ }
> while
> +(0)
> +
> +#define ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n) do { \
> +	unsigned int i; \
> +	const uint32_t size = (r)->size; \
> +	uint32_t idx = prod_head & (r)->mask; \
> +	uint32_t *ring = (uint32_t *)ring_start; \
> +	uint32_t *obj = (uint32_t *)obj_table; \
> +	if (likely(idx + n < size)) { \
> +		for (i = 0; i < (n & ((~(unsigned)0x7))); i += 8, idx += 8) { \
> +			ring[idx] = obj[i]; \
> +			ring[idx + 1] = obj[i + 1]; \
> +			ring[idx + 2] = obj[i + 2]; \
> +			ring[idx + 3] = obj[i + 3]; \
> +			ring[idx + 4] = obj[i + 4]; \
> +			ring[idx + 5] = obj[i + 5]; \
> +			ring[idx + 6] = obj[i + 6]; \
> +			ring[idx + 7] = obj[i + 7]; \
> +		} \
> +		switch (n & 0x7) { \
> +		case 7: \
> +			ring[idx++] = obj[i++]; /* fallthrough */ \
> +		case 6: \
> +			ring[idx++] = obj[i++]; /* fallthrough */ \
> +		case 5: \
> +			ring[idx++] = obj[i++]; /* fallthrough */ \
> +		case 4: \
> +			ring[idx++] = obj[i++]; /* fallthrough */ \
> +		case 3: \
> +			ring[idx++] = obj[i++]; /* fallthrough */ \
> +		case 2: \
> +			ring[idx++] = obj[i++]; /* fallthrough */ \
> +		case 1: \
> +			ring[idx++] = obj[i++]; /* fallthrough */ \
> +		} \
> +	} else { \
> +		for (i = 0; idx < size; i++, idx++)\
> +			ring[idx] = obj[i]; \
> +		for (idx = 0; i < n; i++, idx++) \
> +			ring[idx] = obj[i]; \
> +	} \
> +} while (0)
> +
> +#define ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n) do { \
> +	unsigned int i; \
> +	const uint32_t size = (r)->size; \
> +	uint32_t idx = prod_head & (r)->mask; \
> +	uint64_t *ring = (uint64_t *)ring_start; \
> +	uint64_t *obj = (uint64_t *)obj_table; \
> +	if (likely(idx + n < size)) { \
> +		for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4) { \
> +			ring[idx] = obj[i]; \
> +			ring[idx + 1] = obj[i + 1]; \
> +			ring[idx + 2] = obj[i + 2]; \
> +			ring[idx + 3] = obj[i + 3]; \
> +		} \
> +		switch (n & 0x3) { \
> +		case 3: \
> +			ring[idx++] = obj[i++]; /* fallthrough */ \
> +		case 2: \
> +			ring[idx++] = obj[i++]; /* fallthrough */ \
> +		case 1: \
> +			ring[idx++] = obj[i++]; \
> +		} \
> +	} else { \
> +		for (i = 0; idx < size; i++, idx++)\
> +			ring[idx] = obj[i]; \
> +		for (idx = 0; i < n; i++, idx++) \
> +			ring[idx] = obj[i]; \
> +	} \
> +} while (0)
> +
> +#define ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n) do { \
> +	unsigned int i; \
> +	const uint32_t size = (r)->size; \
> +	uint32_t idx = prod_head & (r)->mask; \
> +	__uint128_t *ring = (__uint128_t *)ring_start; \
> +	__uint128_t *obj = (__uint128_t *)obj_table; \
> +	if (likely(idx + n < size)) { \
> +		for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
> +			ring[idx] = obj[i]; \
> +			ring[idx + 1] = obj[i + 1]; \
> +		} \
> +		switch (n & 0x1) { \
> +		case 1: \
> +			ring[idx++] = obj[i++]; \
> +		} \
> +	} else { \
> +		for (i = 0; idx < size; i++, idx++)\
> +			ring[idx] = obj[i]; \
> +		for (idx = 0; i < n; i++, idx++) \
> +			ring[idx] = obj[i]; \
> +	} \
> +} while (0)
> +
> +/* the actual copy of pointers on the ring to obj_table.
> + * Placed here since identical code needed in both
> + * single and multi consumer dequeue functions.
> + */
> +#define DEQUEUE_PTRS_ELEM(r, ring_start, cons_head, obj_table, esize, n)
> do { \
> +	if (esize == 4) \
> +		DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n); \
> +	else if (esize == 8) \
> +		DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n); \
> +	else if (esize == 16) \
> +		DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n); \ }
> while
> +(0)
> +
> +#define DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n) do { \
> +	unsigned int i; \
> +	uint32_t idx = cons_head & (r)->mask; \
> +	const uint32_t size = (r)->size; \
> +	uint32_t *ring = (uint32_t *)ring_start; \
> +	uint32_t *obj = (uint32_t *)obj_table; \
> +	if (likely(idx + n < size)) { \
> +		for (i = 0; i < (n & (~(unsigned)0x7)); i += 8, idx += 8) {\
> +			obj[i] = ring[idx]; \
> +			obj[i + 1] = ring[idx + 1]; \
> +			obj[i + 2] = ring[idx + 2]; \
> +			obj[i + 3] = ring[idx + 3]; \
> +			obj[i + 4] = ring[idx + 4]; \
> +			obj[i + 5] = ring[idx + 5]; \
> +			obj[i + 6] = ring[idx + 6]; \
> +			obj[i + 7] = ring[idx + 7]; \
> +		} \
> +		switch (n & 0x7) { \
> +		case 7: \
> +			obj[i++] = ring[idx++]; /* fallthrough */ \
> +		case 6: \
> +			obj[i++] = ring[idx++]; /* fallthrough */ \
> +		case 5: \
> +			obj[i++] = ring[idx++]; /* fallthrough */ \
> +		case 4: \
> +			obj[i++] = ring[idx++]; /* fallthrough */ \
> +		case 3: \
> +			obj[i++] = ring[idx++]; /* fallthrough */ \
> +		case 2: \
> +			obj[i++] = ring[idx++]; /* fallthrough */ \
> +		case 1: \
> +			obj[i++] = ring[idx++]; /* fallthrough */ \
> +		} \
> +	} else { \
> +		for (i = 0; idx < size; i++, idx++) \
> +			obj[i] = ring[idx]; \
> +		for (idx = 0; i < n; i++, idx++) \
> +			obj[i] = ring[idx]; \
> +	} \
> +} while (0)
> +
> +#define DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n) do { \
> +	unsigned int i; \
> +	uint32_t idx = cons_head & (r)->mask; \
> +	const uint32_t size = (r)->size; \
> +	uint64_t *ring = (uint64_t *)ring_start; \
> +	uint64_t *obj = (uint64_t *)obj_table; \
> +	if (likely(idx + n < size)) { \
> +		for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4) {\
> +			obj[i] = ring[idx]; \
> +			obj[i + 1] = ring[idx + 1]; \
> +			obj[i + 2] = ring[idx + 2]; \
> +			obj[i + 3] = ring[idx + 3]; \
> +		} \
> +		switch (n & 0x3) { \
> +		case 3: \
> +			obj[i++] = ring[idx++]; /* fallthrough */ \
> +		case 2: \
> +			obj[i++] = ring[idx++]; /* fallthrough */ \
> +		case 1: \
> +			obj[i++] = ring[idx++]; \
> +		} \
> +	} else { \
> +		for (i = 0; idx < size; i++, idx++) \
> +			obj[i] = ring[idx]; \
> +		for (idx = 0; i < n; i++, idx++) \
> +			obj[i] = ring[idx]; \
> +	} \
> +} while (0)
> +
> +#define DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n) do { \
> +	unsigned int i; \
> +	uint32_t idx = cons_head & (r)->mask; \
> +	const uint32_t size = (r)->size; \
> +	__uint128_t *ring = (__uint128_t *)ring_start; \
> +	__uint128_t *obj = (__uint128_t *)obj_table; \
> +	if (likely(idx + n < size)) { \
> +		for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
> +			obj[i] = ring[idx]; \
> +			obj[i + 1] = ring[idx + 1]; \
> +		} \
> +		switch (n & 0x1) { \
> +		case 1: \
> +			obj[i++] = ring[idx++]; /* fallthrough */ \
> +		} \
> +	} else { \
> +		for (i = 0; idx < size; i++, idx++) \
> +			obj[i] = ring[idx]; \
> +		for (idx = 0; i < n; i++, idx++) \
> +			obj[i] = ring[idx]; \
> +	} \
> +} while (0)
> +
> +/* Between load and load. there might be cpu reorder in weak model
> + * (powerpc/arm).
> + * There are 2 choices for the users
> + * 1.use rmb() memory barrier
> + * 2.use one-direction load_acquire/store_release barrier,defined by
> + * CONFIG_RTE_USE_C11_MEM_MODEL=y
> + * It depends on performance test results.
> + * By default, move common functions to rte_ring_generic.h  */ #ifdef
> +RTE_USE_C11_MEM_MODEL #include "rte_ring_c11_mem.h"
> +#else
> +#include "rte_ring_generic.h"
> +#endif
> +
> +/**
> + * @internal Enqueue several objects on the ring
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from
> ring
> + * @param is_sp
> + *   Indicates whether to use single producer or multi-producer head update
> + * @param free_space
> + *   returns the amount of space after the enqueue operation has finished
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_enqueue_elem(struct rte_ring *r, void * const obj_table,
> +		unsigned int esize, unsigned int n,
> +		enum rte_ring_queue_behavior behavior, unsigned int is_sp,
> +		unsigned int *free_space)
> +{
> +	uint32_t prod_head, prod_next;
> +	uint32_t free_entries;
> +
> +	n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
> +			&prod_head, &prod_next, &free_entries);
> +	if (n == 0)
> +		goto end;
> +
> +	ENQUEUE_PTRS_ELEM(r, &r[1], prod_head, obj_table, esize, n);
> +
> +	update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
> +end:
> +	if (free_space != NULL)
> +		*free_space = free_entries - n;
> +	return n;
> +}
> +
> +/**
> + * @internal Dequeue several objects from the ring
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from
> ring
> + * @param is_sc
> + *   Indicates whether to use single consumer or multi-consumer head update
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue has
> finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table,
> +		unsigned int esize, unsigned int n,
> +		enum rte_ring_queue_behavior behavior, unsigned int is_sc,
> +		unsigned int *available)
> +{
> +	uint32_t cons_head, cons_next;
> +	uint32_t entries;
> +
> +	n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
> +			&cons_head, &cons_next, &entries);
> +	if (n == 0)
> +		goto end;
> +
> +	DEQUEUE_PTRS_ELEM(r, &r[1], cons_head, obj_table, esize, n);
> +
> +	update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
> +
> +end:
> +	if (available != NULL)
> +		*available = entries - n;
> +	return n;
> +}
> +
> +/**
> + * Enqueue several objects on the ring (multi-producers safe).
> + *
> + * This function uses a "compare and set" instruction to move the
> + * producer index atomically.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   The number of objects enqueued, either 0 or n
> + */
> +static __rte_always_inline unsigned int
> +rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, void * const obj_table,
> +		unsigned int esize, unsigned int n, unsigned int *free_space) {
> +	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> +			RTE_RING_QUEUE_FIXED, __IS_MP, free_space); }
> +
> +/**
> + * Enqueue several objects on a ring (NOT multi-producers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   The number of objects enqueued, either 0 or n
> + */
> +static __rte_always_inline unsigned int
> +rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, void * const obj_table,
> +		unsigned int esize, unsigned int n, unsigned int *free_space) {
> +	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> +			RTE_RING_QUEUE_FIXED, __IS_SP, free_space); }
> +
> +/**
> + * Enqueue several objects on a ring.
> + *
> + * This function calls the multi-producer or the single-producer
> + * version depending on the default behavior that was specified at
> + * ring creation time (see flags).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   The number of objects enqueued, either 0 or n
> + */
> +static __rte_always_inline unsigned int
> +rte_ring_enqueue_bulk_elem(struct rte_ring *r, void * const obj_table,
> +		unsigned int esize, unsigned int n, unsigned int *free_space) {
> +	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> +			RTE_RING_QUEUE_FIXED, r->prod.single, free_space); }
> +
> +/**
> + * Enqueue one object on a ring (multi-producers safe).
> + *
> + * This function uses a "compare and set" instruction to move the
> + * producer index atomically.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj
> + *   A pointer to the object to be added.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @return
> + *   - 0: Success; objects enqueued.
> + *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is
> enqueued.
> + */
> +static __rte_always_inline int
> +rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int
> +esize) {
> +	return rte_ring_mp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
> +								-ENOBUFS;
> +}
> +
> +/**
> + * Enqueue one object on a ring (NOT multi-producers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj
> + *   A pointer to the object to be added.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @return
> + *   - 0: Success; objects enqueued.
> + *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is
> enqueued.
> + */
> +static __rte_always_inline int
> +rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int
> +esize) {
> +	return rte_ring_sp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
> +								-ENOBUFS;
> +}
> +
> +/**
> + * Enqueue one object on a ring.
> + *
> + * This function calls the multi-producer or the single-producer
> + * version, depending on the default behaviour that was specified at
> + * ring creation time (see flags).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj
> + *   A pointer to the object to be added.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @return
> + *   - 0: Success; objects enqueued.
> + *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is
> enqueued.
> + */
> +static __rte_always_inline int
> +rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int
> +esize) {
> +	return rte_ring_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
> +								-ENOBUFS;
> +}
> +
> +/**
> + * Dequeue several objects from a ring (multi-consumers safe).
> + *
> + * This function uses a "compare and set" instruction to move the
> + * consumer index atomically.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects) that will be filled.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   The number of objects dequeued, either 0 or n
> + */
> +static __rte_always_inline unsigned int
> +rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
> +		unsigned int esize, unsigned int n, unsigned int *available) {
> +	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> +				RTE_RING_QUEUE_FIXED, __IS_MC,
> available); }
> +
> +/**
> + * Dequeue several objects from a ring (NOT multi-consumers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects) that will be filled.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table,
> + *   must be strictly positive.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   The number of objects dequeued, either 0 or n
> + */
> +static __rte_always_inline unsigned int
> +rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
> +		unsigned int esize, unsigned int n, unsigned int *available) {
> +	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> +			RTE_RING_QUEUE_FIXED, __IS_SC, available); }
> +
> +/**
> + * Dequeue several objects from a ring.
> + *
> + * This function calls the multi-consumers or the single-consumer
> + * version, depending on the default behaviour that was specified at
> + * ring creation time (see flags).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects) that will be filled.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   The number of objects dequeued, either 0 or n
> + */
> +static __rte_always_inline unsigned int
> +rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
> +		unsigned int esize, unsigned int n, unsigned int *available) {
> +	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> +			RTE_RING_QUEUE_FIXED, r->cons.single, available); }
> +
> +/**
> + * Dequeue one object from a ring (multi-consumers safe).
> + *
> + * This function uses a "compare and set" instruction to move the
> + * consumer index atomically.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_p
> + *   A pointer to a void * pointer (object) that will be filled.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @return
> + *   - 0: Success; objects dequeued.
> + *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
> + *     dequeued.
> + */
> +static __rte_always_inline int
> +rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p,
> +				unsigned int esize)
> +{
> +	return rte_ring_mc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL)  ? 0 :
> +								-ENOENT;
> +}
> +
> +/**
> + * Dequeue one object from a ring (NOT multi-consumers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_p
> + *   A pointer to a void * pointer (object) that will be filled.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @return
> + *   - 0: Success; objects dequeued.
> + *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
> + *     dequeued.
> + */
> +static __rte_always_inline int
> +rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p,
> +				unsigned int esize)
> +{
> +	return rte_ring_sc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
> +								-ENOENT;
> +}
> +
> +/**
> + * Dequeue one object from a ring.
> + *
> + * This function calls the multi-consumers or the single-consumer
> + * version depending on the default behaviour that was specified at
> + * ring creation time (see flags).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_p
> + *   A pointer to a void * pointer (object) that will be filled.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @return
> + *   - 0: Success, objects dequeued.
> + *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
> + *     dequeued.
> + */
> +static __rte_always_inline int
> +rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int
> +esize) {
> +	return rte_ring_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
> +								-ENOENT;
> +}
> +
> +/**
> + * Enqueue several objects on the ring (multi-producers safe).
> + *
> + * This function uses a "compare and set" instruction to move the
> + * producer index atomically.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   - n: Actual number of objects enqueued.
> + */
> +static __rte_always_inline unsigned
> +rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, void * const obj_table,
> +		unsigned int esize, unsigned int n, unsigned int *free_space) {
> +	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> +			RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space); }
> +
> +/**
> + * Enqueue several objects on a ring (NOT multi-producers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   - n: Actual number of objects enqueued.
> + */
> +static __rte_always_inline unsigned
> +rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, void * const obj_table,
> +		unsigned int esize, unsigned int n, unsigned int *free_space) {
> +	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> +			RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space); }
> +
> +/**
> + * Enqueue several objects on a ring.
> + *
> + * This function calls the multi-producer or the single-producer
> + * version depending on the default behavior that was specified at
> + * ring creation time (see flags).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   - n: Actual number of objects enqueued.
> + */
> +static __rte_always_inline unsigned
> +rte_ring_enqueue_burst_elem(struct rte_ring *r, void * const obj_table,
> +		unsigned int esize, unsigned int n, unsigned int *free_space) {
> +	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> +			RTE_RING_QUEUE_VARIABLE, r->prod.single,
> free_space); }
> +
> +/**
> + * Dequeue several objects from a ring (multi-consumers safe). When the
> +request
> + * objects are more than the available objects, only dequeue the actual
> +number
> + * of objects
> + *
> + * This function uses a "compare and set" instruction to move the
> + * consumer index atomically.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects) that will be filled.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   - n: Actual number of objects dequeued, 0 if ring is empty
> + */
> +static __rte_always_inline unsigned
> +rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
> +		unsigned int esize, unsigned int n, unsigned int *available) {
> +	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> +			RTE_RING_QUEUE_VARIABLE, __IS_MC, available); }
> +
> +/**
> + * Dequeue several objects from a ring (NOT multi-consumers safe).When
> +the
> + * request objects are more than the available objects, only dequeue
> +the
> + * actual number of objects
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects) that will be filled.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   - n: Actual number of objects dequeued, 0 if ring is empty
> + */
> +static __rte_always_inline unsigned
> +rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
> +		unsigned int esize, unsigned int n, unsigned int *available) {
> +	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> +			RTE_RING_QUEUE_VARIABLE, __IS_SC, available); }
> +
> +/**
> + * Dequeue multiple objects from a ring up to a maximum number.
> + *
> + * This function calls the multi-consumers or the single-consumer
> + * version, depending on the default behaviour that was specified at
> + * ring creation time (see flags).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects) that will be filled.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   - Number of objects dequeued
> + */
> +static __rte_always_inline unsigned
> +rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
> +		unsigned int esize, unsigned int n, unsigned int *available) {
> +	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> +				RTE_RING_QUEUE_VARIABLE,
> +				r->cons.single, available);
> +}
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _RTE_RING_ELEM_H_ */
> diff --git a/lib/librte_ring/rte_ring_version.map
> b/lib/librte_ring/rte_ring_version.map
> index 510c1386e..e410a7503 100644
> --- a/lib/librte_ring/rte_ring_version.map
> +++ b/lib/librte_ring/rte_ring_version.map
> @@ -21,6 +21,8 @@ DPDK_2.2 {
>  EXPERIMENTAL {
>  	global:
> 
> +	rte_ring_create_elem;
> +	rte_ring_get_memsize_elem;
>  	rte_ring_reset;
> 
>  };
> --
> 2.17.1
Ananyev, Konstantin Oct. 14, 2019, 7:41 p.m. | #2
> >
> > Current APIs assume ring elements to be pointers. However, in many use cases,
> > the size can be different. Add new APIs to support configurable ring element
> > sizes.
> >
> > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> > Reviewed-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
> > Reviewed-by: Gavin Hu <gavin.hu@arm.com>
> > Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
> > ---
> >  lib/librte_ring/Makefile             |   3 +-
> >  lib/librte_ring/meson.build          |   3 +
> >  lib/librte_ring/rte_ring.c           |  45 +-
> >  lib/librte_ring/rte_ring.h           |   1 +
> >  lib/librte_ring/rte_ring_elem.h      | 946 +++++++++++++++++++++++++++
> >  lib/librte_ring/rte_ring_version.map |   2 +
> >  6 files changed, 991 insertions(+), 9 deletions(-)  create mode 100644
> > lib/librte_ring/rte_ring_elem.h
> >
> > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index
> > 21a36770d..515a967bb 100644
> > --- a/lib/librte_ring/Makefile
> > +++ b/lib/librte_ring/Makefile
> > @@ -6,7 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk  # library name  LIB =
> > librte_ring.a
> >
> > -CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
> > +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 -
> > DALLOW_EXPERIMENTAL_API
> >  LDLIBS += -lrte_eal
> >
> >  EXPORT_MAP := rte_ring_version.map
> > @@ -18,6 +18,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
> >
> >  # install includes
> >  SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
> > +					rte_ring_elem.h \
> >  					rte_ring_generic.h \
> >  					rte_ring_c11_mem.h
> >
> > diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index
> > ab8b0b469..74219840a 100644
> > --- a/lib/librte_ring/meson.build
> > +++ b/lib/librte_ring/meson.build
> > @@ -6,3 +6,6 @@ sources = files('rte_ring.c')  headers = files('rte_ring.h',
> >  		'rte_ring_c11_mem.h',
> >  		'rte_ring_generic.h')
> > +
> > +# rte_ring_create_elem and rte_ring_get_memsize_elem are experimental
> > +allow_experimental_apis = true
> > diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index
> > d9b308036..6fed3648b 100644
> > --- a/lib/librte_ring/rte_ring.c
> > +++ b/lib/librte_ring/rte_ring.c
> > @@ -33,6 +33,7 @@
> >  #include <rte_tailq.h>
> >
> >  #include "rte_ring.h"
> > +#include "rte_ring_elem.h"
> >
> >  TAILQ_HEAD(rte_ring_list, rte_tailq_entry);
> >
> > @@ -46,23 +47,42 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
> >
> >  /* return the size of memory occupied by a ring */  ssize_t -
> > rte_ring_get_memsize(unsigned count)
> > +rte_ring_get_memsize_elem(unsigned count, unsigned esize)
> >  {
> >  	ssize_t sz;
> >
> > +	/* Supported esize values are 4/8/16.
> > +	 * Others can be added on need basis.
> > +	 */
> > +	if ((esize != 4) && (esize != 8) && (esize != 16)) {
> > +		RTE_LOG(ERR, RING,
> > +			"Unsupported esize value. Supported values are 4, 8
> > and 16\n");
> > +
> > +		return -EINVAL;
> > +	}
> > +
> >  	/* count must be a power of 2 */
> >  	if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
> >  		RTE_LOG(ERR, RING,
> > -			"Requested size is invalid, must be power of 2, and "
> > -			"do not exceed the size limit %u\n",
> > RTE_RING_SZ_MASK);
> > +			"Requested number of elements is invalid, must be "
> > +			"power of 2, and do not exceed the limit %u\n",
> > +			RTE_RING_SZ_MASK);
> > +
> >  		return -EINVAL;
> >  	}
> >
> > -	sz = sizeof(struct rte_ring) + count * sizeof(void *);
> > +	sz = sizeof(struct rte_ring) + count * esize;
> >  	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
> >  	return sz;
> >  }
> >
> > +/* return the size of memory occupied by a ring */ ssize_t
> > +rte_ring_get_memsize(unsigned count) {
> > +	return rte_ring_get_memsize_elem(count, sizeof(void *)); }
> > +
> >  void
> >  rte_ring_reset(struct rte_ring *r)
> >  {
> > @@ -114,10 +134,10 @@ rte_ring_init(struct rte_ring *r, const char *name,
> > unsigned count,
> >  	return 0;
> >  }
> >
> > -/* create the ring */
> > +/* create the ring for a given element size */
> >  struct rte_ring *
> > -rte_ring_create(const char *name, unsigned count, int socket_id,
> > -		unsigned flags)
> > +rte_ring_create_elem(const char *name, unsigned count, unsigned esize,
> > +		int socket_id, unsigned flags)
> >  {
> >  	char mz_name[RTE_MEMZONE_NAMESIZE];
> >  	struct rte_ring *r;
> > @@ -135,7 +155,7 @@ rte_ring_create(const char *name, unsigned count,
> > int socket_id,
> >  	if (flags & RING_F_EXACT_SZ)
> >  		count = rte_align32pow2(count + 1);
> >
> > -	ring_size = rte_ring_get_memsize(count);
> > +	ring_size = rte_ring_get_memsize_elem(count, esize);
> >  	if (ring_size < 0) {
> >  		rte_errno = ring_size;
> >  		return NULL;
> > @@ -182,6 +202,15 @@ rte_ring_create(const char *name, unsigned count,
> > int socket_id,
> >  	return r;
> >  }
> >
> > +/* create the ring */
> > +struct rte_ring *
> > +rte_ring_create(const char *name, unsigned count, int socket_id,
> > +		unsigned flags)
> > +{
> > +	return rte_ring_create_elem(name, count, sizeof(void *), socket_id,
> > +		flags);
> > +}
> > +
> >  /* free the ring */
> >  void
> >  rte_ring_free(struct rte_ring *r)
> > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index
> > 2a9f768a1..18fc5d845 100644
> > --- a/lib/librte_ring/rte_ring.h
> > +++ b/lib/librte_ring/rte_ring.h
> > @@ -216,6 +216,7 @@ int rte_ring_init(struct rte_ring *r, const char *name,
> > unsigned count,
> >   */
> >  struct rte_ring *rte_ring_create(const char *name, unsigned count,
> >  				 int socket_id, unsigned flags);
> > +
> >  /**
> >   * De-allocate all memory used by the ring.
> >   *
> > diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h
> > new file mode 100644 index 000000000..860f059ad
> > --- /dev/null
> > +++ b/lib/librte_ring/rte_ring_elem.h
> > @@ -0,0 +1,946 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + *
> > + * Copyright (c) 2019 Arm Limited
> > + * Copyright (c) 2010-2017 Intel Corporation
> > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
> > + * All rights reserved.
> > + * Derived from FreeBSD's bufring.h
> > + * Used as BSD-3 Licensed with permission from Kip Macy.
> > + */
> > +
> > +#ifndef _RTE_RING_ELEM_H_
> > +#define _RTE_RING_ELEM_H_
> > +
> > +/**
> > + * @file
> > + * RTE Ring with flexible element size
> > + */
> > +
> > +#ifdef __cplusplus
> > +extern "C" {
> > +#endif
> > +
> > +#include <stdio.h>
> > +#include <stdint.h>
> > +#include <sys/queue.h>
> > +#include <errno.h>
> > +#include <rte_common.h>
> > +#include <rte_config.h>
> > +#include <rte_memory.h>
> > +#include <rte_lcore.h>
> > +#include <rte_atomic.h>
> > +#include <rte_branch_prediction.h>
> > +#include <rte_memzone.h>
> > +#include <rte_pause.h>
> > +
> > +#include "rte_ring.h"
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Calculate the memory size needed for a ring with given element size
> > + *
> > + * This function returns the number of bytes needed for a ring, given
> > + * the number of elements in it and the size of the element. This value
> > + * is the sum of the size of the structure rte_ring and the size of the
> > + * memory needed for storing the elements. The value is aligned to a
> > +cache
> > + * line size.
> > + *
> > + * @param count
> > + *   The number of elements in the ring (must be a power of 2).
> > + * @param esize
> > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > + *   Currently, sizes 4, 8 and 16 are supported.
> > + * @return
> > + *   - The memory size needed for the ring on success.
> > + *   - -EINVAL if count is not a power of 2.
> > + */
> > +__rte_experimental
> > +ssize_t rte_ring_get_memsize_elem(unsigned count, unsigned esize);
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Create a new ring named *name* that stores elements with given size.
> > + *
> > + * This function uses ``memzone_reserve()`` to allocate memory. Then it
> > + * calls rte_ring_init() to initialize an empty ring.
> > + *
> > + * The new ring size is set to *count*, which must be a power of
> > + * two. Water marking is disabled by default. The real usable ring size
> > + * is *count-1* instead of *count* to differentiate a free ring from an
> > + * empty ring.
> > + *
> > + * The ring is added in RTE_TAILQ_RING list.
> > + *
> > + * @param name
> > + *   The name of the ring.
> > + * @param count
> > + *   The number of elements in the ring (must be a power of 2).
> > + * @param esize
> > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > + *   Currently, sizes 4, 8 and 16 are supported.
> > + * @param socket_id
> > + *   The *socket_id* argument is the socket identifier in case of
> > + *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
> > + *   constraint for the reserved zone.
> > + * @param flags
> > + *   An OR of the following:
> > + *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
> > + *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
> > + *      is "single-producer". Otherwise, it is "multi-producers".
> > + *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
> > + *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
> > + *      is "single-consumer". Otherwise, it is "multi-consumers".
> > + * @return
> > + *   On success, the pointer to the new allocated ring. NULL on error with
> > + *    rte_errno set appropriately. Possible errno values include:
> > + *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config
> > structure
> > + *    - E_RTE_SECONDARY - function was called from a secondary process
> > instance
> > + *    - EINVAL - count provided is not a power of 2
> > + *    - ENOSPC - the maximum number of memzones has already been
> > allocated
> > + *    - EEXIST - a memzone with the same name already exists
> > + *    - ENOMEM - no appropriate memory area found in which to create
> > memzone
> > + */
> > +__rte_experimental
> > +struct rte_ring *rte_ring_create_elem(const char *name, unsigned count,
> > +				unsigned esize, int socket_id, unsigned flags);
> > +
> > +/* the actual enqueue of pointers on the ring.
> > + * Placed here since identical code needed in both
> > + * single and multi producer enqueue functions.
> > + */
> > +#define ENQUEUE_PTRS_ELEM(r, ring_start, prod_head, obj_table, esize, n)
> > do { \
> > +	if (esize == 4) \
> > +		ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n); \
> > +	else if (esize == 8) \
> > +		ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n); \
> > +	else if (esize == 16) \
> > +		ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n); \ }
> > while
> > +(0)
> > +
> > +#define ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n) do { \
> > +	unsigned int i; \
> > +	const uint32_t size = (r)->size; \
> > +	uint32_t idx = prod_head & (r)->mask; \
> > +	uint32_t *ring = (uint32_t *)ring_start; \
> > +	uint32_t *obj = (uint32_t *)obj_table; \
> > +	if (likely(idx + n < size)) { \
> > +		for (i = 0; i < (n & ((~(unsigned)0x7))); i += 8, idx += 8) { \
> > +			ring[idx] = obj[i]; \
> > +			ring[idx + 1] = obj[i + 1]; \
> > +			ring[idx + 2] = obj[i + 2]; \
> > +			ring[idx + 3] = obj[i + 3]; \
> > +			ring[idx + 4] = obj[i + 4]; \
> > +			ring[idx + 5] = obj[i + 5]; \
> > +			ring[idx + 6] = obj[i + 6]; \
> > +			ring[idx + 7] = obj[i + 7]; \
> > +		} \
> > +		switch (n & 0x7) { \
> > +		case 7: \
> > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > +		case 6: \
> > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > +		case 5: \
> > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > +		case 4: \
> > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > +		case 3: \
> > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > +		case 2: \
> > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > +		case 1: \
> > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > +		} \
> > +	} else { \
> > +		for (i = 0; idx < size; i++, idx++)\
> > +			ring[idx] = obj[i]; \
> > +		for (idx = 0; i < n; i++, idx++) \
> > +			ring[idx] = obj[i]; \
> > +	} \
> > +} while (0)
> > +
> > +#define ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n) do { \
> > +	unsigned int i; \
> > +	const uint32_t size = (r)->size; \
> > +	uint32_t idx = prod_head & (r)->mask; \
> > +	uint64_t *ring = (uint64_t *)ring_start; \
> > +	uint64_t *obj = (uint64_t *)obj_table; \
> > +	if (likely(idx + n < size)) { \
> > +		for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4) { \
> > +			ring[idx] = obj[i]; \
> > +			ring[idx + 1] = obj[i + 1]; \
> > +			ring[idx + 2] = obj[i + 2]; \
> > +			ring[idx + 3] = obj[i + 3]; \
> > +		} \
> > +		switch (n & 0x3) { \
> > +		case 3: \
> > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > +		case 2: \
> > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > +		case 1: \
> > +			ring[idx++] = obj[i++]; \
> > +		} \
> > +	} else { \
> > +		for (i = 0; idx < size; i++, idx++)\
> > +			ring[idx] = obj[i]; \
> > +		for (idx = 0; i < n; i++, idx++) \
> > +			ring[idx] = obj[i]; \
> > +	} \
> > +} while (0)
> > +
> > +#define ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n) do { \
> > +	unsigned int i; \
> > +	const uint32_t size = (r)->size; \
> > +	uint32_t idx = prod_head & (r)->mask; \
> > +	__uint128_t *ring = (__uint128_t *)ring_start; \
> > +	__uint128_t *obj = (__uint128_t *)obj_table; \
> > +	if (likely(idx + n < size)) { \
> > +		for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
> > +			ring[idx] = obj[i]; \
> > +			ring[idx + 1] = obj[i + 1]; \
> > +		} \
> > +		switch (n & 0x1) { \
> > +		case 1: \
> > +			ring[idx++] = obj[i++]; \
> > +		} \
> > +	} else { \
> > +		for (i = 0; idx < size; i++, idx++)\
> > +			ring[idx] = obj[i]; \
> > +		for (idx = 0; i < n; i++, idx++) \
> > +			ring[idx] = obj[i]; \
> > +	} \
> > +} while (0)
> > +
> > +/* the actual copy of pointers on the ring to obj_table.
> > + * Placed here since identical code needed in both
> > + * single and multi consumer dequeue functions.
> > + */
> > +#define DEQUEUE_PTRS_ELEM(r, ring_start, cons_head, obj_table, esize, n)
> > do { \
> > +	if (esize == 4) \
> > +		DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n); \
> > +	else if (esize == 8) \
> > +		DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n); \
> > +	else if (esize == 16) \
> > +		DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n); \ }
> > while
> > +(0)
> > +
> > +#define DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n) do { \
> > +	unsigned int i; \
> > +	uint32_t idx = cons_head & (r)->mask; \
> > +	const uint32_t size = (r)->size; \
> > +	uint32_t *ring = (uint32_t *)ring_start; \
> > +	uint32_t *obj = (uint32_t *)obj_table; \
> > +	if (likely(idx + n < size)) { \
> > +		for (i = 0; i < (n & (~(unsigned)0x7)); i += 8, idx += 8) {\
> > +			obj[i] = ring[idx]; \
> > +			obj[i + 1] = ring[idx + 1]; \
> > +			obj[i + 2] = ring[idx + 2]; \
> > +			obj[i + 3] = ring[idx + 3]; \
> > +			obj[i + 4] = ring[idx + 4]; \
> > +			obj[i + 5] = ring[idx + 5]; \
> > +			obj[i + 6] = ring[idx + 6]; \
> > +			obj[i + 7] = ring[idx + 7]; \
> > +		} \
> > +		switch (n & 0x7) { \
> > +		case 7: \
> > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > +		case 6: \
> > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > +		case 5: \
> > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > +		case 4: \
> > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > +		case 3: \
> > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > +		case 2: \
> > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > +		case 1: \
> > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > +		} \
> > +	} else { \
> > +		for (i = 0; idx < size; i++, idx++) \
> > +			obj[i] = ring[idx]; \
> > +		for (idx = 0; i < n; i++, idx++) \
> > +			obj[i] = ring[idx]; \
> > +	} \
> > +} while (0)
> > +
> > +#define DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n) do { \
> > +	unsigned int i; \
> > +	uint32_t idx = cons_head & (r)->mask; \
> > +	const uint32_t size = (r)->size; \
> > +	uint64_t *ring = (uint64_t *)ring_start; \
> > +	uint64_t *obj = (uint64_t *)obj_table; \
> > +	if (likely(idx + n < size)) { \
> > +		for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4) {\
> > +			obj[i] = ring[idx]; \
> > +			obj[i + 1] = ring[idx + 1]; \
> > +			obj[i + 2] = ring[idx + 2]; \
> > +			obj[i + 3] = ring[idx + 3]; \
> > +		} \
> > +		switch (n & 0x3) { \
> > +		case 3: \
> > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > +		case 2: \
> > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > +		case 1: \
> > +			obj[i++] = ring[idx++]; \
> > +		} \
> > +	} else { \
> > +		for (i = 0; idx < size; i++, idx++) \
> > +			obj[i] = ring[idx]; \
> > +		for (idx = 0; i < n; i++, idx++) \
> > +			obj[i] = ring[idx]; \
> > +	} \
> > +} while (0)
> > +
> > +#define DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n) do { \
> > +	unsigned int i; \
> > +	uint32_t idx = cons_head & (r)->mask; \
> > +	const uint32_t size = (r)->size; \
> > +	__uint128_t *ring = (__uint128_t *)ring_start; \
> > +	__uint128_t *obj = (__uint128_t *)obj_table; \
> > +	if (likely(idx + n < size)) { \
> > +		for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
> > +			obj[i] = ring[idx]; \
> > +			obj[i + 1] = ring[idx + 1]; \
> > +		} \
> > +		switch (n & 0x1) { \
> > +		case 1: \
> > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > +		} \
> > +	} else { \
> > +		for (i = 0; idx < size; i++, idx++) \
> > +			obj[i] = ring[idx]; \
> > +		for (idx = 0; i < n; i++, idx++) \
> > +			obj[i] = ring[idx]; \
> > +	} \
> > +} while (0)
> > +
> > +/* Between load and load. there might be cpu reorder in weak model
> > + * (powerpc/arm).
> > + * There are 2 choices for the users
> > + * 1.use rmb() memory barrier
> > + * 2.use one-direction load_acquire/store_release barrier,defined by
> > + * CONFIG_RTE_USE_C11_MEM_MODEL=y
> > + * It depends on performance test results.
> > + * By default, move common functions to rte_ring_generic.h  */ #ifdef
> > +RTE_USE_C11_MEM_MODEL #include "rte_ring_c11_mem.h"
> > +#else
> > +#include "rte_ring_generic.h"
> > +#endif
> > +
> > +/**
> > + * @internal Enqueue several objects on the ring
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param obj_table
> > + *   A pointer to a table of void * pointers (objects).
> > + * @param esize
> > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> > + *   as passed while creating the ring, otherwise the results are undefined.
> > + * @param n
> > + *   The number of objects to add in the ring from the obj_table.
> > + * @param behavior
> > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
> > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from
> > ring
> > + * @param is_sp
> > + *   Indicates whether to use single producer or multi-producer head update
> > + * @param free_space
> > + *   returns the amount of space after the enqueue operation has finished
> > + * @return
> > + *   Actual number of objects enqueued.
> > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_do_enqueue_elem(struct rte_ring *r, void * const obj_table,
> > +		unsigned int esize, unsigned int n,
> > +		enum rte_ring_queue_behavior behavior, unsigned int is_sp,
> > +		unsigned int *free_space)


I like the idea to add esize as an argument to the public API,
so the compiler can do it's jib optimizing calls with constant esize.
Though I am not very happy with the rest of implementation:
1. It doesn't really provide configurable elem size - only 4/8/16B elems are supported.
2. A lot of code duplication with these 3 copies of ENQUEUE/DEQUEUE macros.

Looking at ENQUEUE/DEQUEUE macros, I can see that main loop always
does 32B copy per iteration.
So wonder can we make a generic function that would do 32B copy per iteration
in a main loop, and copy tail  by 4B chunks?
That would avoid copy duplication and will allow user to have any elem
size (multiple of 4B) he wants.
Something like that (note didn't test it, just a rough idea):

 static inline void
copy_elems(uint32_t du32[], const uint32_t su32[], uint32_t num, uint32_t esize)
{
        uint32_t i, sz;

        sz = (num * esize) / sizeof(uint32_t);

        for (i = 0; i < (sz & ~7); i += 8)
                memcpy(du32 + i, su32 + i, 8 * sizeof(uint32_t));

        switch (sz & 7) {
        case 7: du32[sz - 7] = su32[sz - 7]; /* fallthrough */
        case 6: du32[sz - 6] = su32[sz - 6]; /* fallthrough */
        case 5: du32[sz - 5] = su32[sz - 5]; /* fallthrough */
        case 4: du32[sz - 4] = su32[sz - 4]; /* fallthrough */
        case 3: du32[sz - 3] = su32[sz - 3]; /* fallthrough */
        case 2: du32[sz - 2] = su32[sz - 2]; /* fallthrough */
        case 1: du32[sz - 1] = su32[sz - 1]; /* fallthrough */
        }
}

static inline void
enqueue_elems(struct rte_ring *r, void *ring_start, uint32_t prod_head,
                void *obj_table, uint32_t num, uint32_t esize)
{
        uint32_t idx, n;
        uint32_t *du32;

        const uint32_t size = r->size;

        idx = prod_head & (r)->mask;

        du32 = ring_start + idx * sizeof(uint32_t);

        if (idx + num < size)
                copy_elems(du32, obj_table, num, esize);
        else {
                n = size - idx;
                copy_elems(du32, obj_table, n, esize);
                copy_elems(ring_start, obj_table + n * sizeof(uint32_t),
                        num - n, esize);
        }
}

And then, in that function, instead of ENQUEUE_PTRS_ELEM(), just:

enqueue_elems(r, &r[1], prod_head, obj_table, n, esize);

 
> > +{
> > +	uint32_t prod_head, prod_next;
> > +	uint32_t free_entries;
> > +
> > +	n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
> > +			&prod_head, &prod_next, &free_entries);
> > +	if (n == 0)
> > +		goto end;
> > +
> > +	ENQUEUE_PTRS_ELEM(r, &r[1], prod_head, obj_table, esize, n);
> > +
> > +	update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
> > +end:
> > +	if (free_space != NULL)
> > +		*free_space = free_entries - n;
> > +	return n;
> > +}
> > +
Honnappa Nagarahalli Oct. 14, 2019, 11:56 p.m. | #3
Hi Konstantin,
	Thank you for the feedback.

<snip>

> 
> > >
> > > Current APIs assume ring elements to be pointers. However, in many
> > > use cases, the size can be different. Add new APIs to support
> > > configurable ring element sizes.
> > >
> > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> > > Reviewed-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
> > > Reviewed-by: Gavin Hu <gavin.hu@arm.com>
> > > Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
> > > ---
> > >  lib/librte_ring/Makefile             |   3 +-
> > >  lib/librte_ring/meson.build          |   3 +
> > >  lib/librte_ring/rte_ring.c           |  45 +-
> > >  lib/librte_ring/rte_ring.h           |   1 +
> > >  lib/librte_ring/rte_ring_elem.h      | 946 +++++++++++++++++++++++++++
> > >  lib/librte_ring/rte_ring_version.map |   2 +
> > >  6 files changed, 991 insertions(+), 9 deletions(-)  create mode
> > > 100644 lib/librte_ring/rte_ring_elem.h
> > >
> > > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
> > > index 21a36770d..515a967bb 100644
> > > --- a/lib/librte_ring/Makefile
> > > +++ b/lib/librte_ring/Makefile
> > > @@ -6,7 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk  # library name
> > > LIB = librte_ring.a
> > >
> > > -CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
> > > +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 -
> > > DALLOW_EXPERIMENTAL_API
> > >  LDLIBS += -lrte_eal
> > >
> > >  EXPORT_MAP := rte_ring_version.map
> > > @@ -18,6 +18,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
> > >
> > >  # install includes
> > >  SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
> > > +					rte_ring_elem.h \
> > >  					rte_ring_generic.h \
> > >  					rte_ring_c11_mem.h
> > >
> > > diff --git a/lib/librte_ring/meson.build
> > > b/lib/librte_ring/meson.build index ab8b0b469..74219840a 100644
> > > --- a/lib/librte_ring/meson.build
> > > +++ b/lib/librte_ring/meson.build
> > > @@ -6,3 +6,6 @@ sources = files('rte_ring.c')  headers = files('rte_ring.h',
> > >  		'rte_ring_c11_mem.h',
> > >  		'rte_ring_generic.h')
> > > +
> > > +# rte_ring_create_elem and rte_ring_get_memsize_elem are
> > > +experimental allow_experimental_apis = true
> > > diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
> > > index d9b308036..6fed3648b 100644
> > > --- a/lib/librte_ring/rte_ring.c
> > > +++ b/lib/librte_ring/rte_ring.c
> > > @@ -33,6 +33,7 @@
> > >  #include <rte_tailq.h>
> > >
> > >  #include "rte_ring.h"
> > > +#include "rte_ring_elem.h"
> > >
> > >  TAILQ_HEAD(rte_ring_list, rte_tailq_entry);
> > >
> > > @@ -46,23 +47,42 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
> > >
> > >  /* return the size of memory occupied by a ring */  ssize_t -
> > > rte_ring_get_memsize(unsigned count)
> > > +rte_ring_get_memsize_elem(unsigned count, unsigned esize)
> > >  {
> > >  	ssize_t sz;
> > >
> > > +	/* Supported esize values are 4/8/16.
> > > +	 * Others can be added on need basis.
> > > +	 */
> > > +	if ((esize != 4) && (esize != 8) && (esize != 16)) {
> > > +		RTE_LOG(ERR, RING,
> > > +			"Unsupported esize value. Supported values are 4, 8
> > > and 16\n");
> > > +
> > > +		return -EINVAL;
> > > +	}
> > > +
> > >  	/* count must be a power of 2 */
> > >  	if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
> > >  		RTE_LOG(ERR, RING,
> > > -			"Requested size is invalid, must be power of 2, and "
> > > -			"do not exceed the size limit %u\n",
> > > RTE_RING_SZ_MASK);
> > > +			"Requested number of elements is invalid, must be "
> > > +			"power of 2, and do not exceed the limit %u\n",
> > > +			RTE_RING_SZ_MASK);
> > > +
> > >  		return -EINVAL;
> > >  	}
> > >
> > > -	sz = sizeof(struct rte_ring) + count * sizeof(void *);
> > > +	sz = sizeof(struct rte_ring) + count * esize;
> > >  	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
> > >  	return sz;
> > >  }
> > >
> > > +/* return the size of memory occupied by a ring */ ssize_t
> > > +rte_ring_get_memsize(unsigned count) {
> > > +	return rte_ring_get_memsize_elem(count, sizeof(void *)); }
> > > +
> > >  void
> > >  rte_ring_reset(struct rte_ring *r)
> > >  {
> > > @@ -114,10 +134,10 @@ rte_ring_init(struct rte_ring *r, const char
> > > *name, unsigned count,
> > >  	return 0;
> > >  }
> > >
> > > -/* create the ring */
> > > +/* create the ring for a given element size */
> > >  struct rte_ring *
> > > -rte_ring_create(const char *name, unsigned count, int socket_id,
> > > -		unsigned flags)
> > > +rte_ring_create_elem(const char *name, unsigned count, unsigned esize,
> > > +		int socket_id, unsigned flags)
> > >  {
> > >  	char mz_name[RTE_MEMZONE_NAMESIZE];
> > >  	struct rte_ring *r;
> > > @@ -135,7 +155,7 @@ rte_ring_create(const char *name, unsigned
> > > count, int socket_id,
> > >  	if (flags & RING_F_EXACT_SZ)
> > >  		count = rte_align32pow2(count + 1);
> > >
> > > -	ring_size = rte_ring_get_memsize(count);
> > > +	ring_size = rte_ring_get_memsize_elem(count, esize);
> > >  	if (ring_size < 0) {
> > >  		rte_errno = ring_size;
> > >  		return NULL;
> > > @@ -182,6 +202,15 @@ rte_ring_create(const char *name, unsigned
> > > count, int socket_id,
> > >  	return r;
> > >  }
> > >
> > > +/* create the ring */
> > > +struct rte_ring *
> > > +rte_ring_create(const char *name, unsigned count, int socket_id,
> > > +		unsigned flags)
> > > +{
> > > +	return rte_ring_create_elem(name, count, sizeof(void *), socket_id,
> > > +		flags);
> > > +}
> > > +
> > >  /* free the ring */
> > >  void
> > >  rte_ring_free(struct rte_ring *r)
> > > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> > > index
> > > 2a9f768a1..18fc5d845 100644
> > > --- a/lib/librte_ring/rte_ring.h
> > > +++ b/lib/librte_ring/rte_ring.h
> > > @@ -216,6 +216,7 @@ int rte_ring_init(struct rte_ring *r, const char
> > > *name, unsigned count,
> > >   */
> > >  struct rte_ring *rte_ring_create(const char *name, unsigned count,
> > >  				 int socket_id, unsigned flags);
> > > +
> > >  /**
> > >   * De-allocate all memory used by the ring.
> > >   *
> > > diff --git a/lib/librte_ring/rte_ring_elem.h
> > > b/lib/librte_ring/rte_ring_elem.h new file mode 100644 index
> > > 000000000..860f059ad
> > > --- /dev/null
> > > +++ b/lib/librte_ring/rte_ring_elem.h
> > > @@ -0,0 +1,946 @@
> > > +/* SPDX-License-Identifier: BSD-3-Clause
> > > + *
> > > + * Copyright (c) 2019 Arm Limited
> > > + * Copyright (c) 2010-2017 Intel Corporation
> > > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
> > > + * All rights reserved.
> > > + * Derived from FreeBSD's bufring.h
> > > + * Used as BSD-3 Licensed with permission from Kip Macy.
> > > + */
> > > +
> > > +#ifndef _RTE_RING_ELEM_H_
> > > +#define _RTE_RING_ELEM_H_
> > > +
> > > +/**
> > > + * @file
> > > + * RTE Ring with flexible element size  */
> > > +
> > > +#ifdef __cplusplus
> > > +extern "C" {
> > > +#endif
> > > +
> > > +#include <stdio.h>
> > > +#include <stdint.h>
> > > +#include <sys/queue.h>
> > > +#include <errno.h>
> > > +#include <rte_common.h>
> > > +#include <rte_config.h>
> > > +#include <rte_memory.h>
> > > +#include <rte_lcore.h>
> > > +#include <rte_atomic.h>
> > > +#include <rte_branch_prediction.h>
> > > +#include <rte_memzone.h>
> > > +#include <rte_pause.h>
> > > +
> > > +#include "rte_ring.h"
> > > +
> > > +/**
> > > + * @warning
> > > + * @b EXPERIMENTAL: this API may change without prior notice
> > > + *
> > > + * Calculate the memory size needed for a ring with given element
> > > +size
> > > + *
> > > + * This function returns the number of bytes needed for a ring,
> > > +given
> > > + * the number of elements in it and the size of the element. This
> > > +value
> > > + * is the sum of the size of the structure rte_ring and the size of
> > > +the
> > > + * memory needed for storing the elements. The value is aligned to
> > > +a cache
> > > + * line size.
> > > + *
> > > + * @param count
> > > + *   The number of elements in the ring (must be a power of 2).
> > > + * @param esize
> > > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > > + *   Currently, sizes 4, 8 and 16 are supported.
> > > + * @return
> > > + *   - The memory size needed for the ring on success.
> > > + *   - -EINVAL if count is not a power of 2.
> > > + */
> > > +__rte_experimental
> > > +ssize_t rte_ring_get_memsize_elem(unsigned count, unsigned esize);
> > > +
> > > +/**
> > > + * @warning
> > > + * @b EXPERIMENTAL: this API may change without prior notice
> > > + *
> > > + * Create a new ring named *name* that stores elements with given size.
> > > + *
> > > + * This function uses ``memzone_reserve()`` to allocate memory.
> > > +Then it
> > > + * calls rte_ring_init() to initialize an empty ring.
> > > + *
> > > + * The new ring size is set to *count*, which must be a power of
> > > + * two. Water marking is disabled by default. The real usable ring
> > > +size
> > > + * is *count-1* instead of *count* to differentiate a free ring
> > > +from an
> > > + * empty ring.
> > > + *
> > > + * The ring is added in RTE_TAILQ_RING list.
> > > + *
> > > + * @param name
> > > + *   The name of the ring.
> > > + * @param count
> > > + *   The number of elements in the ring (must be a power of 2).
> > > + * @param esize
> > > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > > + *   Currently, sizes 4, 8 and 16 are supported.
> > > + * @param socket_id
> > > + *   The *socket_id* argument is the socket identifier in case of
> > > + *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
> > > + *   constraint for the reserved zone.
> > > + * @param flags
> > > + *   An OR of the following:
> > > + *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
> > > + *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
> > > + *      is "single-producer". Otherwise, it is "multi-producers".
> > > + *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
> > > + *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
> > > + *      is "single-consumer". Otherwise, it is "multi-consumers".
> > > + * @return
> > > + *   On success, the pointer to the new allocated ring. NULL on error with
> > > + *    rte_errno set appropriately. Possible errno values include:
> > > + *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config
> > > structure
> > > + *    - E_RTE_SECONDARY - function was called from a secondary process
> > > instance
> > > + *    - EINVAL - count provided is not a power of 2
> > > + *    - ENOSPC - the maximum number of memzones has already been
> > > allocated
> > > + *    - EEXIST - a memzone with the same name already exists
> > > + *    - ENOMEM - no appropriate memory area found in which to create
> > > memzone
> > > + */
> > > +__rte_experimental
> > > +struct rte_ring *rte_ring_create_elem(const char *name, unsigned count,
> > > +				unsigned esize, int socket_id, unsigned flags);
> > > +
> > > +/* the actual enqueue of pointers on the ring.
> > > + * Placed here since identical code needed in both
> > > + * single and multi producer enqueue functions.
> > > + */
> > > +#define ENQUEUE_PTRS_ELEM(r, ring_start, prod_head, obj_table,
> > > +esize, n)
> > > do { \
> > > +	if (esize == 4) \
> > > +		ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n); \
> > > +	else if (esize == 8) \
> > > +		ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n); \
> > > +	else if (esize == 16) \
> > > +		ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n);
> \ }
> > > while
> > > +(0)
> > > +
> > > +#define ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n) do { \
> > > +	unsigned int i; \
> > > +	const uint32_t size = (r)->size; \
> > > +	uint32_t idx = prod_head & (r)->mask; \
> > > +	uint32_t *ring = (uint32_t *)ring_start; \
> > > +	uint32_t *obj = (uint32_t *)obj_table; \
> > > +	if (likely(idx + n < size)) { \
> > > +		for (i = 0; i < (n & ((~(unsigned)0x7))); i += 8, idx += 8) { \
> > > +			ring[idx] = obj[i]; \
> > > +			ring[idx + 1] = obj[i + 1]; \
> > > +			ring[idx + 2] = obj[i + 2]; \
> > > +			ring[idx + 3] = obj[i + 3]; \
> > > +			ring[idx + 4] = obj[i + 4]; \
> > > +			ring[idx + 5] = obj[i + 5]; \
> > > +			ring[idx + 6] = obj[i + 6]; \
> > > +			ring[idx + 7] = obj[i + 7]; \
> > > +		} \
> > > +		switch (n & 0x7) { \
> > > +		case 7: \
> > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > +		case 6: \
> > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > +		case 5: \
> > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > +		case 4: \
> > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > +		case 3: \
> > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > +		case 2: \
> > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > +		case 1: \
> > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > +		} \
> > > +	} else { \
> > > +		for (i = 0; idx < size; i++, idx++)\
> > > +			ring[idx] = obj[i]; \
> > > +		for (idx = 0; i < n; i++, idx++) \
> > > +			ring[idx] = obj[i]; \
> > > +	} \
> > > +} while (0)
> > > +
> > > +#define ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n) do { \
> > > +	unsigned int i; \
> > > +	const uint32_t size = (r)->size; \
> > > +	uint32_t idx = prod_head & (r)->mask; \
> > > +	uint64_t *ring = (uint64_t *)ring_start; \
> > > +	uint64_t *obj = (uint64_t *)obj_table; \
> > > +	if (likely(idx + n < size)) { \
> > > +		for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4) { \
> > > +			ring[idx] = obj[i]; \
> > > +			ring[idx + 1] = obj[i + 1]; \
> > > +			ring[idx + 2] = obj[i + 2]; \
> > > +			ring[idx + 3] = obj[i + 3]; \
> > > +		} \
> > > +		switch (n & 0x3) { \
> > > +		case 3: \
> > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > +		case 2: \
> > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > +		case 1: \
> > > +			ring[idx++] = obj[i++]; \
> > > +		} \
> > > +	} else { \
> > > +		for (i = 0; idx < size; i++, idx++)\
> > > +			ring[idx] = obj[i]; \
> > > +		for (idx = 0; i < n; i++, idx++) \
> > > +			ring[idx] = obj[i]; \
> > > +	} \
> > > +} while (0)
> > > +
> > > +#define ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n) do
> { \
> > > +	unsigned int i; \
> > > +	const uint32_t size = (r)->size; \
> > > +	uint32_t idx = prod_head & (r)->mask; \
> > > +	__uint128_t *ring = (__uint128_t *)ring_start; \
> > > +	__uint128_t *obj = (__uint128_t *)obj_table; \
> > > +	if (likely(idx + n < size)) { \
> > > +		for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
> > > +			ring[idx] = obj[i]; \
> > > +			ring[idx + 1] = obj[i + 1]; \
> > > +		} \
> > > +		switch (n & 0x1) { \
> > > +		case 1: \
> > > +			ring[idx++] = obj[i++]; \
> > > +		} \
> > > +	} else { \
> > > +		for (i = 0; idx < size; i++, idx++)\
> > > +			ring[idx] = obj[i]; \
> > > +		for (idx = 0; i < n; i++, idx++) \
> > > +			ring[idx] = obj[i]; \
> > > +	} \
> > > +} while (0)
> > > +
> > > +/* the actual copy of pointers on the ring to obj_table.
> > > + * Placed here since identical code needed in both
> > > + * single and multi consumer dequeue functions.
> > > + */
> > > +#define DEQUEUE_PTRS_ELEM(r, ring_start, cons_head, obj_table,
> > > +esize, n)
> > > do { \
> > > +	if (esize == 4) \
> > > +		DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n); \
> > > +	else if (esize == 8) \
> > > +		DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n); \
> > > +	else if (esize == 16) \
> > > +		DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n);
> \ }
> > > while
> > > +(0)
> > > +
> > > +#define DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n) do { \
> > > +	unsigned int i; \
> > > +	uint32_t idx = cons_head & (r)->mask; \
> > > +	const uint32_t size = (r)->size; \
> > > +	uint32_t *ring = (uint32_t *)ring_start; \
> > > +	uint32_t *obj = (uint32_t *)obj_table; \
> > > +	if (likely(idx + n < size)) { \
> > > +		for (i = 0; i < (n & (~(unsigned)0x7)); i += 8, idx += 8) {\
> > > +			obj[i] = ring[idx]; \
> > > +			obj[i + 1] = ring[idx + 1]; \
> > > +			obj[i + 2] = ring[idx + 2]; \
> > > +			obj[i + 3] = ring[idx + 3]; \
> > > +			obj[i + 4] = ring[idx + 4]; \
> > > +			obj[i + 5] = ring[idx + 5]; \
> > > +			obj[i + 6] = ring[idx + 6]; \
> > > +			obj[i + 7] = ring[idx + 7]; \
> > > +		} \
> > > +		switch (n & 0x7) { \
> > > +		case 7: \
> > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > +		case 6: \
> > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > +		case 5: \
> > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > +		case 4: \
> > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > +		case 3: \
> > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > +		case 2: \
> > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > +		case 1: \
> > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > +		} \
> > > +	} else { \
> > > +		for (i = 0; idx < size; i++, idx++) \
> > > +			obj[i] = ring[idx]; \
> > > +		for (idx = 0; i < n; i++, idx++) \
> > > +			obj[i] = ring[idx]; \
> > > +	} \
> > > +} while (0)
> > > +
> > > +#define DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n) do { \
> > > +	unsigned int i; \
> > > +	uint32_t idx = cons_head & (r)->mask; \
> > > +	const uint32_t size = (r)->size; \
> > > +	uint64_t *ring = (uint64_t *)ring_start; \
> > > +	uint64_t *obj = (uint64_t *)obj_table; \
> > > +	if (likely(idx + n < size)) { \
> > > +		for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4) {\
> > > +			obj[i] = ring[idx]; \
> > > +			obj[i + 1] = ring[idx + 1]; \
> > > +			obj[i + 2] = ring[idx + 2]; \
> > > +			obj[i + 3] = ring[idx + 3]; \
> > > +		} \
> > > +		switch (n & 0x3) { \
> > > +		case 3: \
> > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > +		case 2: \
> > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > +		case 1: \
> > > +			obj[i++] = ring[idx++]; \
> > > +		} \
> > > +	} else { \
> > > +		for (i = 0; idx < size; i++, idx++) \
> > > +			obj[i] = ring[idx]; \
> > > +		for (idx = 0; i < n; i++, idx++) \
> > > +			obj[i] = ring[idx]; \
> > > +	} \
> > > +} while (0)
> > > +
> > > +#define DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n) do
> { \
> > > +	unsigned int i; \
> > > +	uint32_t idx = cons_head & (r)->mask; \
> > > +	const uint32_t size = (r)->size; \
> > > +	__uint128_t *ring = (__uint128_t *)ring_start; \
> > > +	__uint128_t *obj = (__uint128_t *)obj_table; \
> > > +	if (likely(idx + n < size)) { \
> > > +		for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
> > > +			obj[i] = ring[idx]; \
> > > +			obj[i + 1] = ring[idx + 1]; \
> > > +		} \
> > > +		switch (n & 0x1) { \
> > > +		case 1: \
> > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > +		} \
> > > +	} else { \
> > > +		for (i = 0; idx < size; i++, idx++) \
> > > +			obj[i] = ring[idx]; \
> > > +		for (idx = 0; i < n; i++, idx++) \
> > > +			obj[i] = ring[idx]; \
> > > +	} \
> > > +} while (0)
> > > +
> > > +/* Between load and load. there might be cpu reorder in weak model
> > > + * (powerpc/arm).
> > > + * There are 2 choices for the users
> > > + * 1.use rmb() memory barrier
> > > + * 2.use one-direction load_acquire/store_release barrier,defined
> > > +by
> > > + * CONFIG_RTE_USE_C11_MEM_MODEL=y
> > > + * It depends on performance test results.
> > > + * By default, move common functions to rte_ring_generic.h  */
> > > +#ifdef RTE_USE_C11_MEM_MODEL #include "rte_ring_c11_mem.h"
> > > +#else
> > > +#include "rte_ring_generic.h"
> > > +#endif
> > > +
> > > +/**
> > > + * @internal Enqueue several objects on the ring
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @param obj_table
> > > + *   A pointer to a table of void * pointers (objects).
> > > + * @param esize
> > > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > > + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> > > + *   as passed while creating the ring, otherwise the results are undefined.
> > > + * @param n
> > > + *   The number of objects to add in the ring from the obj_table.
> > > + * @param behavior
> > > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a
> ring
> > > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible
> from
> > > ring
> > > + * @param is_sp
> > > + *   Indicates whether to use single producer or multi-producer head
> update
> > > + * @param free_space
> > > + *   returns the amount of space after the enqueue operation has
> finished
> > > + * @return
> > > + *   Actual number of objects enqueued.
> > > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > > + */
> > > +static __rte_always_inline unsigned int
> > > +__rte_ring_do_enqueue_elem(struct rte_ring *r, void * const obj_table,
> > > +		unsigned int esize, unsigned int n,
> > > +		enum rte_ring_queue_behavior behavior, unsigned int is_sp,
> > > +		unsigned int *free_space)
> 
> 
> I like the idea to add esize as an argument to the public API, so the compiler
> can do it's jib optimizing calls with constant esize.
> Though I am not very happy with the rest of implementation:
> 1. It doesn't really provide configurable elem size - only 4/8/16B elems are
> supported.
Agree. I was thinking other sizes can be added on need basis.
However, I am wondering if we should just provide for 4B and then the users can use bulk operations to construct whatever they need? It would mean extra work for the users.

> 2. A lot of code duplication with these 3 copies of ENQUEUE/DEQUEUE
> macros.
> 
> Looking at ENQUEUE/DEQUEUE macros, I can see that main loop always does
> 32B copy per iteration.
Yes, I tried to keep it the same as the existing one (originally, I guess the intention was to allow for 256b vector instructions to be generated)

> So wonder can we make a generic function that would do 32B copy per
> iteration in a main loop, and copy tail  by 4B chunks?
> That would avoid copy duplication and will allow user to have any elem size
> (multiple of 4B) he wants.
> Something like that (note didn't test it, just a rough idea):
> 
>  static inline void
> copy_elems(uint32_t du32[], const uint32_t su32[], uint32_t num, uint32_t
> esize) {
>         uint32_t i, sz;
> 
>         sz = (num * esize) / sizeof(uint32_t);
If 'num' is a compile time constant, 'sz' will be a compile time constant. Otherwise, this will result in a multiplication operation. I have tried to avoid the multiplication operation and try to use shift and mask operations (just like how the rest of the ring code does).

> 
>         for (i = 0; i < (sz & ~7); i += 8)
>                 memcpy(du32 + i, su32 + i, 8 * sizeof(uint32_t));
I had used memcpy to start with (for the entire copy operation), performance is not the same for 64b elements when compared with the existing ring APIs (some cases more and some cases less).

IMO, we have to keep the performance of the 64b and 128b the same as what we get with the existing ring and event-ring APIs. That would allow us to replace them with these new APIs. I suggest that we keep the macros in this patch for 64b and 128b.

For the rest of the sizes, we could put a for loop around 32b macro (this would allow for all sizes as well).

> 
>         switch (sz & 7) {
>         case 7: du32[sz - 7] = su32[sz - 7]; /* fallthrough */
>         case 6: du32[sz - 6] = su32[sz - 6]; /* fallthrough */
>         case 5: du32[sz - 5] = su32[sz - 5]; /* fallthrough */
>         case 4: du32[sz - 4] = su32[sz - 4]; /* fallthrough */
>         case 3: du32[sz - 3] = su32[sz - 3]; /* fallthrough */
>         case 2: du32[sz - 2] = su32[sz - 2]; /* fallthrough */
>         case 1: du32[sz - 1] = su32[sz - 1]; /* fallthrough */
>         }
> }
> 
> static inline void
> enqueue_elems(struct rte_ring *r, void *ring_start, uint32_t prod_head,
>                 void *obj_table, uint32_t num, uint32_t esize) {
>         uint32_t idx, n;
>         uint32_t *du32;
> 
>         const uint32_t size = r->size;
> 
>         idx = prod_head & (r)->mask;
> 
>         du32 = ring_start + idx * sizeof(uint32_t);
> 
>         if (idx + num < size)
>                 copy_elems(du32, obj_table, num, esize);
>         else {
>                 n = size - idx;
>                 copy_elems(du32, obj_table, n, esize);
>                 copy_elems(ring_start, obj_table + n * sizeof(uint32_t),
>                         num - n, esize);
>         }
> }
> 
> And then, in that function, instead of ENQUEUE_PTRS_ELEM(), just:
> 
> enqueue_elems(r, &r[1], prod_head, obj_table, n, esize);
> 
> 
> > > +{
> > > +	uint32_t prod_head, prod_next;
> > > +	uint32_t free_entries;
> > > +
> > > +	n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
> > > +			&prod_head, &prod_next, &free_entries);
> > > +	if (n == 0)
> > > +		goto end;
> > > +
> > > +	ENQUEUE_PTRS_ELEM(r, &r[1], prod_head, obj_table, esize, n);
> > > +
> > > +	update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
> > > +end:
> > > +	if (free_space != NULL)
> > > +		*free_space = free_entries - n;
> > > +	return n;
> > > +}
> > > +
Ananyev, Konstantin Oct. 15, 2019, 9:34 a.m. | #4
Hi Honnappa,
 
> > > >
> > > > Current APIs assume ring elements to be pointers. However, in many
> > > > use cases, the size can be different. Add new APIs to support
> > > > configurable ring element sizes.
> > > >
> > > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> > > > Reviewed-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
> > > > Reviewed-by: Gavin Hu <gavin.hu@arm.com>
> > > > Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
> > > > ---
> > > >  lib/librte_ring/Makefile             |   3 +-
> > > >  lib/librte_ring/meson.build          |   3 +
> > > >  lib/librte_ring/rte_ring.c           |  45 +-
> > > >  lib/librte_ring/rte_ring.h           |   1 +
> > > >  lib/librte_ring/rte_ring_elem.h      | 946 +++++++++++++++++++++++++++
> > > >  lib/librte_ring/rte_ring_version.map |   2 +
> > > >  6 files changed, 991 insertions(+), 9 deletions(-)  create mode
> > > > 100644 lib/librte_ring/rte_ring_elem.h
> > > >
> > > > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
> > > > index 21a36770d..515a967bb 100644
> > > > --- a/lib/librte_ring/Makefile
> > > > +++ b/lib/librte_ring/Makefile
> > > > @@ -6,7 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk  # library name
> > > > LIB = librte_ring.a
> > > >
> > > > -CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
> > > > +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 -
> > > > DALLOW_EXPERIMENTAL_API
> > > >  LDLIBS += -lrte_eal
> > > >
> > > >  EXPORT_MAP := rte_ring_version.map
> > > > @@ -18,6 +18,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
> > > >
> > > >  # install includes
> > > >  SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
> > > > +					rte_ring_elem.h \
> > > >  					rte_ring_generic.h \
> > > >  					rte_ring_c11_mem.h
> > > >
> > > > diff --git a/lib/librte_ring/meson.build
> > > > b/lib/librte_ring/meson.build index ab8b0b469..74219840a 100644
> > > > --- a/lib/librte_ring/meson.build
> > > > +++ b/lib/librte_ring/meson.build
> > > > @@ -6,3 +6,6 @@ sources = files('rte_ring.c')  headers = files('rte_ring.h',
> > > >  		'rte_ring_c11_mem.h',
> > > >  		'rte_ring_generic.h')
> > > > +
> > > > +# rte_ring_create_elem and rte_ring_get_memsize_elem are
> > > > +experimental allow_experimental_apis = true
> > > > diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
> > > > index d9b308036..6fed3648b 100644
> > > > --- a/lib/librte_ring/rte_ring.c
> > > > +++ b/lib/librte_ring/rte_ring.c
> > > > @@ -33,6 +33,7 @@
> > > >  #include <rte_tailq.h>
> > > >
> > > >  #include "rte_ring.h"
> > > > +#include "rte_ring_elem.h"
> > > >
> > > >  TAILQ_HEAD(rte_ring_list, rte_tailq_entry);
> > > >
> > > > @@ -46,23 +47,42 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
> > > >
> > > >  /* return the size of memory occupied by a ring */  ssize_t -
> > > > rte_ring_get_memsize(unsigned count)
> > > > +rte_ring_get_memsize_elem(unsigned count, unsigned esize)
> > > >  {
> > > >  	ssize_t sz;
> > > >
> > > > +	/* Supported esize values are 4/8/16.
> > > > +	 * Others can be added on need basis.
> > > > +	 */
> > > > +	if ((esize != 4) && (esize != 8) && (esize != 16)) {
> > > > +		RTE_LOG(ERR, RING,
> > > > +			"Unsupported esize value. Supported values are 4, 8
> > > > and 16\n");
> > > > +
> > > > +		return -EINVAL;
> > > > +	}
> > > > +
> > > >  	/* count must be a power of 2 */
> > > >  	if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
> > > >  		RTE_LOG(ERR, RING,
> > > > -			"Requested size is invalid, must be power of 2, and "
> > > > -			"do not exceed the size limit %u\n",
> > > > RTE_RING_SZ_MASK);
> > > > +			"Requested number of elements is invalid, must be "
> > > > +			"power of 2, and do not exceed the limit %u\n",
> > > > +			RTE_RING_SZ_MASK);
> > > > +
> > > >  		return -EINVAL;
> > > >  	}
> > > >
> > > > -	sz = sizeof(struct rte_ring) + count * sizeof(void *);
> > > > +	sz = sizeof(struct rte_ring) + count * esize;
> > > >  	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
> > > >  	return sz;
> > > >  }
> > > >
> > > > +/* return the size of memory occupied by a ring */ ssize_t
> > > > +rte_ring_get_memsize(unsigned count) {
> > > > +	return rte_ring_get_memsize_elem(count, sizeof(void *)); }
> > > > +
> > > >  void
> > > >  rte_ring_reset(struct rte_ring *r)
> > > >  {
> > > > @@ -114,10 +134,10 @@ rte_ring_init(struct rte_ring *r, const char
> > > > *name, unsigned count,
> > > >  	return 0;
> > > >  }
> > > >
> > > > -/* create the ring */
> > > > +/* create the ring for a given element size */
> > > >  struct rte_ring *
> > > > -rte_ring_create(const char *name, unsigned count, int socket_id,
> > > > -		unsigned flags)
> > > > +rte_ring_create_elem(const char *name, unsigned count, unsigned esize,
> > > > +		int socket_id, unsigned flags)
> > > >  {
> > > >  	char mz_name[RTE_MEMZONE_NAMESIZE];
> > > >  	struct rte_ring *r;
> > > > @@ -135,7 +155,7 @@ rte_ring_create(const char *name, unsigned
> > > > count, int socket_id,
> > > >  	if (flags & RING_F_EXACT_SZ)
> > > >  		count = rte_align32pow2(count + 1);
> > > >
> > > > -	ring_size = rte_ring_get_memsize(count);
> > > > +	ring_size = rte_ring_get_memsize_elem(count, esize);
> > > >  	if (ring_size < 0) {
> > > >  		rte_errno = ring_size;
> > > >  		return NULL;
> > > > @@ -182,6 +202,15 @@ rte_ring_create(const char *name, unsigned
> > > > count, int socket_id,
> > > >  	return r;
> > > >  }
> > > >
> > > > +/* create the ring */
> > > > +struct rte_ring *
> > > > +rte_ring_create(const char *name, unsigned count, int socket_id,
> > > > +		unsigned flags)
> > > > +{
> > > > +	return rte_ring_create_elem(name, count, sizeof(void *), socket_id,
> > > > +		flags);
> > > > +}
> > > > +
> > > >  /* free the ring */
> > > >  void
> > > >  rte_ring_free(struct rte_ring *r)
> > > > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> > > > index
> > > > 2a9f768a1..18fc5d845 100644
> > > > --- a/lib/librte_ring/rte_ring.h
> > > > +++ b/lib/librte_ring/rte_ring.h
> > > > @@ -216,6 +216,7 @@ int rte_ring_init(struct rte_ring *r, const char
> > > > *name, unsigned count,
> > > >   */
> > > >  struct rte_ring *rte_ring_create(const char *name, unsigned count,
> > > >  				 int socket_id, unsigned flags);
> > > > +
> > > >  /**
> > > >   * De-allocate all memory used by the ring.
> > > >   *
> > > > diff --git a/lib/librte_ring/rte_ring_elem.h
> > > > b/lib/librte_ring/rte_ring_elem.h new file mode 100644 index
> > > > 000000000..860f059ad
> > > > --- /dev/null
> > > > +++ b/lib/librte_ring/rte_ring_elem.h
> > > > @@ -0,0 +1,946 @@
> > > > +/* SPDX-License-Identifier: BSD-3-Clause
> > > > + *
> > > > + * Copyright (c) 2019 Arm Limited
> > > > + * Copyright (c) 2010-2017 Intel Corporation
> > > > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
> > > > + * All rights reserved.
> > > > + * Derived from FreeBSD's bufring.h
> > > > + * Used as BSD-3 Licensed with permission from Kip Macy.
> > > > + */
> > > > +
> > > > +#ifndef _RTE_RING_ELEM_H_
> > > > +#define _RTE_RING_ELEM_H_
> > > > +
> > > > +/**
> > > > + * @file
> > > > + * RTE Ring with flexible element size  */
> > > > +
> > > > +#ifdef __cplusplus
> > > > +extern "C" {
> > > > +#endif
> > > > +
> > > > +#include <stdio.h>
> > > > +#include <stdint.h>
> > > > +#include <sys/queue.h>
> > > > +#include <errno.h>
> > > > +#include <rte_common.h>
> > > > +#include <rte_config.h>
> > > > +#include <rte_memory.h>
> > > > +#include <rte_lcore.h>
> > > > +#include <rte_atomic.h>
> > > > +#include <rte_branch_prediction.h>
> > > > +#include <rte_memzone.h>
> > > > +#include <rte_pause.h>
> > > > +
> > > > +#include "rte_ring.h"
> > > > +
> > > > +/**
> > > > + * @warning
> > > > + * @b EXPERIMENTAL: this API may change without prior notice
> > > > + *
> > > > + * Calculate the memory size needed for a ring with given element
> > > > +size
> > > > + *
> > > > + * This function returns the number of bytes needed for a ring,
> > > > +given
> > > > + * the number of elements in it and the size of the element. This
> > > > +value
> > > > + * is the sum of the size of the structure rte_ring and the size of
> > > > +the
> > > > + * memory needed for storing the elements. The value is aligned to
> > > > +a cache
> > > > + * line size.
> > > > + *
> > > > + * @param count
> > > > + *   The number of elements in the ring (must be a power of 2).
> > > > + * @param esize
> > > > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > > > + *   Currently, sizes 4, 8 and 16 are supported.
> > > > + * @return
> > > > + *   - The memory size needed for the ring on success.
> > > > + *   - -EINVAL if count is not a power of 2.
> > > > + */
> > > > +__rte_experimental
> > > > +ssize_t rte_ring_get_memsize_elem(unsigned count, unsigned esize);
> > > > +
> > > > +/**
> > > > + * @warning
> > > > + * @b EXPERIMENTAL: this API may change without prior notice
> > > > + *
> > > > + * Create a new ring named *name* that stores elements with given size.
> > > > + *
> > > > + * This function uses ``memzone_reserve()`` to allocate memory.
> > > > +Then it
> > > > + * calls rte_ring_init() to initialize an empty ring.
> > > > + *
> > > > + * The new ring size is set to *count*, which must be a power of
> > > > + * two. Water marking is disabled by default. The real usable ring
> > > > +size
> > > > + * is *count-1* instead of *count* to differentiate a free ring
> > > > +from an
> > > > + * empty ring.
> > > > + *
> > > > + * The ring is added in RTE_TAILQ_RING list.
> > > > + *
> > > > + * @param name
> > > > + *   The name of the ring.
> > > > + * @param count
> > > > + *   The number of elements in the ring (must be a power of 2).
> > > > + * @param esize
> > > > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > > > + *   Currently, sizes 4, 8 and 16 are supported.
> > > > + * @param socket_id
> > > > + *   The *socket_id* argument is the socket identifier in case of
> > > > + *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
> > > > + *   constraint for the reserved zone.
> > > > + * @param flags
> > > > + *   An OR of the following:
> > > > + *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
> > > > + *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
> > > > + *      is "single-producer". Otherwise, it is "multi-producers".
> > > > + *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
> > > > + *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
> > > > + *      is "single-consumer". Otherwise, it is "multi-consumers".
> > > > + * @return
> > > > + *   On success, the pointer to the new allocated ring. NULL on error with
> > > > + *    rte_errno set appropriately. Possible errno values include:
> > > > + *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config
> > > > structure
> > > > + *    - E_RTE_SECONDARY - function was called from a secondary process
> > > > instance
> > > > + *    - EINVAL - count provided is not a power of 2
> > > > + *    - ENOSPC - the maximum number of memzones has already been
> > > > allocated
> > > > + *    - EEXIST - a memzone with the same name already exists
> > > > + *    - ENOMEM - no appropriate memory area found in which to create
> > > > memzone
> > > > + */
> > > > +__rte_experimental
> > > > +struct rte_ring *rte_ring_create_elem(const char *name, unsigned count,
> > > > +				unsigned esize, int socket_id, unsigned flags);
> > > > +
> > > > +/* the actual enqueue of pointers on the ring.
> > > > + * Placed here since identical code needed in both
> > > > + * single and multi producer enqueue functions.
> > > > + */
> > > > +#define ENQUEUE_PTRS_ELEM(r, ring_start, prod_head, obj_table,
> > > > +esize, n)
> > > > do { \
> > > > +	if (esize == 4) \
> > > > +		ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n); \
> > > > +	else if (esize == 8) \
> > > > +		ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n); \
> > > > +	else if (esize == 16) \
> > > > +		ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n);
> > \ }
> > > > while
> > > > +(0)
> > > > +
> > > > +#define ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n) do { \
> > > > +	unsigned int i; \
> > > > +	const uint32_t size = (r)->size; \
> > > > +	uint32_t idx = prod_head & (r)->mask; \
> > > > +	uint32_t *ring = (uint32_t *)ring_start; \
> > > > +	uint32_t *obj = (uint32_t *)obj_table; \
> > > > +	if (likely(idx + n < size)) { \
> > > > +		for (i = 0; i < (n & ((~(unsigned)0x7))); i += 8, idx += 8) { \
> > > > +			ring[idx] = obj[i]; \
> > > > +			ring[idx + 1] = obj[i + 1]; \
> > > > +			ring[idx + 2] = obj[i + 2]; \
> > > > +			ring[idx + 3] = obj[i + 3]; \
> > > > +			ring[idx + 4] = obj[i + 4]; \
> > > > +			ring[idx + 5] = obj[i + 5]; \
> > > > +			ring[idx + 6] = obj[i + 6]; \
> > > > +			ring[idx + 7] = obj[i + 7]; \
> > > > +		} \
> > > > +		switch (n & 0x7) { \
> > > > +		case 7: \
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > +		case 6: \
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > +		case 5: \
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > +		case 4: \
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > +		case 3: \
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > +		case 2: \
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > +		case 1: \
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > +		} \
> > > > +	} else { \
> > > > +		for (i = 0; idx < size; i++, idx++)\
> > > > +			ring[idx] = obj[i]; \
> > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > +			ring[idx] = obj[i]; \
> > > > +	} \
> > > > +} while (0)
> > > > +
> > > > +#define ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n) do { \
> > > > +	unsigned int i; \
> > > > +	const uint32_t size = (r)->size; \
> > > > +	uint32_t idx = prod_head & (r)->mask; \
> > > > +	uint64_t *ring = (uint64_t *)ring_start; \
> > > > +	uint64_t *obj = (uint64_t *)obj_table; \
> > > > +	if (likely(idx + n < size)) { \
> > > > +		for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4) { \
> > > > +			ring[idx] = obj[i]; \
> > > > +			ring[idx + 1] = obj[i + 1]; \
> > > > +			ring[idx + 2] = obj[i + 2]; \
> > > > +			ring[idx + 3] = obj[i + 3]; \
> > > > +		} \
> > > > +		switch (n & 0x3) { \
> > > > +		case 3: \
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > +		case 2: \
> > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > +		case 1: \
> > > > +			ring[idx++] = obj[i++]; \
> > > > +		} \
> > > > +	} else { \
> > > > +		for (i = 0; idx < size; i++, idx++)\
> > > > +			ring[idx] = obj[i]; \
> > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > +			ring[idx] = obj[i]; \
> > > > +	} \
> > > > +} while (0)
> > > > +
> > > > +#define ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n) do
> > { \
> > > > +	unsigned int i; \
> > > > +	const uint32_t size = (r)->size; \
> > > > +	uint32_t idx = prod_head & (r)->mask; \
> > > > +	__uint128_t *ring = (__uint128_t *)ring_start; \
> > > > +	__uint128_t *obj = (__uint128_t *)obj_table; \
> > > > +	if (likely(idx + n < size)) { \
> > > > +		for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
> > > > +			ring[idx] = obj[i]; \
> > > > +			ring[idx + 1] = obj[i + 1]; \
> > > > +		} \
> > > > +		switch (n & 0x1) { \
> > > > +		case 1: \
> > > > +			ring[idx++] = obj[i++]; \
> > > > +		} \
> > > > +	} else { \
> > > > +		for (i = 0; idx < size; i++, idx++)\
> > > > +			ring[idx] = obj[i]; \
> > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > +			ring[idx] = obj[i]; \
> > > > +	} \
> > > > +} while (0)
> > > > +
> > > > +/* the actual copy of pointers on the ring to obj_table.
> > > > + * Placed here since identical code needed in both
> > > > + * single and multi consumer dequeue functions.
> > > > + */
> > > > +#define DEQUEUE_PTRS_ELEM(r, ring_start, cons_head, obj_table,
> > > > +esize, n)
> > > > do { \
> > > > +	if (esize == 4) \
> > > > +		DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n); \
> > > > +	else if (esize == 8) \
> > > > +		DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n); \
> > > > +	else if (esize == 16) \
> > > > +		DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n);
> > \ }
> > > > while
> > > > +(0)
> > > > +
> > > > +#define DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n) do { \
> > > > +	unsigned int i; \
> > > > +	uint32_t idx = cons_head & (r)->mask; \
> > > > +	const uint32_t size = (r)->size; \
> > > > +	uint32_t *ring = (uint32_t *)ring_start; \
> > > > +	uint32_t *obj = (uint32_t *)obj_table; \
> > > > +	if (likely(idx + n < size)) { \
> > > > +		for (i = 0; i < (n & (~(unsigned)0x7)); i += 8, idx += 8) {\
> > > > +			obj[i] = ring[idx]; \
> > > > +			obj[i + 1] = ring[idx + 1]; \
> > > > +			obj[i + 2] = ring[idx + 2]; \
> > > > +			obj[i + 3] = ring[idx + 3]; \
> > > > +			obj[i + 4] = ring[idx + 4]; \
> > > > +			obj[i + 5] = ring[idx + 5]; \
> > > > +			obj[i + 6] = ring[idx + 6]; \
> > > > +			obj[i + 7] = ring[idx + 7]; \
> > > > +		} \
> > > > +		switch (n & 0x7) { \
> > > > +		case 7: \
> > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > +		case 6: \
> > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > +		case 5: \
> > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > +		case 4: \
> > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > +		case 3: \
> > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > +		case 2: \
> > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > +		case 1: \
> > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > +		} \
> > > > +	} else { \
> > > > +		for (i = 0; idx < size; i++, idx++) \
> > > > +			obj[i] = ring[idx]; \
> > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > +			obj[i] = ring[idx]; \
> > > > +	} \
> > > > +} while (0)
> > > > +
> > > > +#define DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n) do { \
> > > > +	unsigned int i; \
> > > > +	uint32_t idx = cons_head & (r)->mask; \
> > > > +	const uint32_t size = (r)->size; \
> > > > +	uint64_t *ring = (uint64_t *)ring_start; \
> > > > +	uint64_t *obj = (uint64_t *)obj_table; \
> > > > +	if (likely(idx + n < size)) { \
> > > > +		for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4) {\
> > > > +			obj[i] = ring[idx]; \
> > > > +			obj[i + 1] = ring[idx + 1]; \
> > > > +			obj[i + 2] = ring[idx + 2]; \
> > > > +			obj[i + 3] = ring[idx + 3]; \
> > > > +		} \
> > > > +		switch (n & 0x3) { \
> > > > +		case 3: \
> > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > +		case 2: \
> > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > +		case 1: \
> > > > +			obj[i++] = ring[idx++]; \
> > > > +		} \
> > > > +	} else { \
> > > > +		for (i = 0; idx < size; i++, idx++) \
> > > > +			obj[i] = ring[idx]; \
> > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > +			obj[i] = ring[idx]; \
> > > > +	} \
> > > > +} while (0)
> > > > +
> > > > +#define DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n) do
> > { \
> > > > +	unsigned int i; \
> > > > +	uint32_t idx = cons_head & (r)->mask; \
> > > > +	const uint32_t size = (r)->size; \
> > > > +	__uint128_t *ring = (__uint128_t *)ring_start; \
> > > > +	__uint128_t *obj = (__uint128_t *)obj_table; \
> > > > +	if (likely(idx + n < size)) { \
> > > > +		for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
> > > > +			obj[i] = ring[idx]; \
> > > > +			obj[i + 1] = ring[idx + 1]; \
> > > > +		} \
> > > > +		switch (n & 0x1) { \
> > > > +		case 1: \
> > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > +		} \
> > > > +	} else { \
> > > > +		for (i = 0; idx < size; i++, idx++) \
> > > > +			obj[i] = ring[idx]; \
> > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > +			obj[i] = ring[idx]; \
> > > > +	} \
> > > > +} while (0)
> > > > +
> > > > +/* Between load and load. there might be cpu reorder in weak model
> > > > + * (powerpc/arm).
> > > > + * There are 2 choices for the users
> > > > + * 1.use rmb() memory barrier
> > > > + * 2.use one-direction load_acquire/store_release barrier,defined
> > > > +by
> > > > + * CONFIG_RTE_USE_C11_MEM_MODEL=y
> > > > + * It depends on performance test results.
> > > > + * By default, move common functions to rte_ring_generic.h  */
> > > > +#ifdef RTE_USE_C11_MEM_MODEL #include "rte_ring_c11_mem.h"
> > > > +#else
> > > > +#include "rte_ring_generic.h"
> > > > +#endif
> > > > +
> > > > +/**
> > > > + * @internal Enqueue several objects on the ring
> > > > + *
> > > > + * @param r
> > > > + *   A pointer to the ring structure.
> > > > + * @param obj_table
> > > > + *   A pointer to a table of void * pointers (objects).
> > > > + * @param esize
> > > > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > > > + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> > > > + *   as passed while creating the ring, otherwise the results are undefined.
> > > > + * @param n
> > > > + *   The number of objects to add in the ring from the obj_table.
> > > > + * @param behavior
> > > > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a
> > ring
> > > > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible
> > from
> > > > ring
> > > > + * @param is_sp
> > > > + *   Indicates whether to use single producer or multi-producer head
> > update
> > > > + * @param free_space
> > > > + *   returns the amount of space after the enqueue operation has
> > finished
> > > > + * @return
> > > > + *   Actual number of objects enqueued.
> > > > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > > > + */
> > > > +static __rte_always_inline unsigned int
> > > > +__rte_ring_do_enqueue_elem(struct rte_ring *r, void * const obj_table,
> > > > +		unsigned int esize, unsigned int n,
> > > > +		enum rte_ring_queue_behavior behavior, unsigned int is_sp,
> > > > +		unsigned int *free_space)
> >
> >
> > I like the idea to add esize as an argument to the public API, so the compiler
> > can do it's jib optimizing calls with constant esize.
> > Though I am not very happy with the rest of implementation:
> > 1. It doesn't really provide configurable elem size - only 4/8/16B elems are
> > supported.
> Agree. I was thinking other sizes can be added on need basis.
> However, I am wondering if we should just provide for 4B and then the users can use bulk operations to construct whatever they need?

I suppose it could be plan B... if there would be no agreement on generic case.
And for 4B elems, I guess you do have a particular use-case?

> It
> would mean extra work for the users.
> 
> > 2. A lot of code duplication with these 3 copies of ENQUEUE/DEQUEUE
> > macros.
> >
> > Looking at ENQUEUE/DEQUEUE macros, I can see that main loop always does
> > 32B copy per iteration.
> Yes, I tried to keep it the same as the existing one (originally, I guess the intention was to allow for 256b vector instructions to be
> generated)
> 
> > So wonder can we make a generic function that would do 32B copy per
> > iteration in a main loop, and copy tail  by 4B chunks?
> > That would avoid copy duplication and will allow user to have any elem size
> > (multiple of 4B) he wants.
> > Something like that (note didn't test it, just a rough idea):
> >
> >  static inline void
> > copy_elems(uint32_t du32[], const uint32_t su32[], uint32_t num, uint32_t
> > esize) {
> >         uint32_t i, sz;
> >
> >         sz = (num * esize) / sizeof(uint32_t);
> If 'num' is a compile time constant, 'sz' will be a compile time constant. Otherwise, this will result in a multiplication operation. 

Not always.
If esize is compile time constant, then for esize as power of 2 (4,8,16,...), it would be just one shift.
For other constant values it could be a 'mul' or in many cases just 2 shifts plus 'add' (if compiler is smart enough).
I.E. let say for 24B elem is would be either num * 6 or (num << 2) + (num << 1).
I suppose for non-power of 2 elems it might be ok to get such small perf hit.

>I have tried 
> to avoid the multiplication operation and try to use shift and mask operations (just like how the rest of the ring code does).
> 
> >
> >         for (i = 0; i < (sz & ~7); i += 8)
> >                 memcpy(du32 + i, su32 + i, 8 * sizeof(uint32_t));
> I had used memcpy to start with (for the entire copy operation), performance is not the same for 64b elements when compared with the
> existing ring APIs (some cases more and some cases less).

I remember that from one of your previous mails, that's why here I suggest to use in a loop memcpy() with fixed size.
That way for each iteration complier will replace memcpy() with instructions to copy 32B in a way he thinks is optimal
(same as for original macro, I think).

> 
> IMO, we have to keep the performance of the 64b and 128b the same as what we get with the existing ring and event-ring APIs. That would
> allow us to replace them with these new APIs. I suggest that we keep the macros in this patch for 64b and 128b.

I still think we probably can achieve that without duplicating macros, while still supporting arbitrary elem size.
See above.

> For the rest of the sizes, we could put a for loop around 32b macro (this would allow for all sizes as well).
> 
> >
> >         switch (sz & 7) {
> >         case 7: du32[sz - 7] = su32[sz - 7]; /* fallthrough */
> >         case 6: du32[sz - 6] = su32[sz - 6]; /* fallthrough */
> >         case 5: du32[sz - 5] = su32[sz - 5]; /* fallthrough */
> >         case 4: du32[sz - 4] = su32[sz - 4]; /* fallthrough */
> >         case 3: du32[sz - 3] = su32[sz - 3]; /* fallthrough */
> >         case 2: du32[sz - 2] = su32[sz - 2]; /* fallthrough */
> >         case 1: du32[sz - 1] = su32[sz - 1]; /* fallthrough */
> >         }
> > }
> >
> > static inline void
> > enqueue_elems(struct rte_ring *r, void *ring_start, uint32_t prod_head,
> >                 void *obj_table, uint32_t num, uint32_t esize) {
> >         uint32_t idx, n;
> >         uint32_t *du32;
> >
> >         const uint32_t size = r->size;
> >
> >         idx = prod_head & (r)->mask;
> >
> >         du32 = ring_start + idx * sizeof(uint32_t);
> >
> >         if (idx + num < size)
> >                 copy_elems(du32, obj_table, num, esize);
> >         else {
> >                 n = size - idx;
> >                 copy_elems(du32, obj_table, n, esize);
> >                 copy_elems(ring_start, obj_table + n * sizeof(uint32_t),
> >                         num - n, esize);
> >         }
> > }
> >
> > And then, in that function, instead of ENQUEUE_PTRS_ELEM(), just:
> >
> > enqueue_elems(r, &r[1], prod_head, obj_table, n, esize);
> >
> >
> > > > +{
> > > > +	uint32_t prod_head, prod_next;
> > > > +	uint32_t free_entries;
> > > > +
> > > > +	n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
> > > > +			&prod_head, &prod_next, &free_entries);
> > > > +	if (n == 0)
> > > > +		goto end;
> > > > +
> > > > +	ENQUEUE_PTRS_ELEM(r, &r[1], prod_head, obj_table, esize, n);
> > > > +
> > > > +	update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
> > > > +end:
> > > > +	if (free_space != NULL)
> > > > +		*free_space = free_entries - n;
> > > > +	return n;
> > > > +}
> > > > +
Honnappa Nagarahalli Oct. 17, 2019, 4:46 a.m. | #5
<snip>

> Hi Honnappa,
> 
> > > > >
> > > > > Current APIs assume ring elements to be pointers. However, in
> > > > > many use cases, the size can be different. Add new APIs to
> > > > > support configurable ring element sizes.
> > > > >
> > > > > Signed-off-by: Honnappa Nagarahalli
> > > > > <honnappa.nagarahalli@arm.com>
> > > > > Reviewed-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
> > > > > Reviewed-by: Gavin Hu <gavin.hu@arm.com>
> > > > > Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
> > > > > ---
> > > > >  lib/librte_ring/Makefile             |   3 +-
> > > > >  lib/librte_ring/meson.build          |   3 +
> > > > >  lib/librte_ring/rte_ring.c           |  45 +-
> > > > >  lib/librte_ring/rte_ring.h           |   1 +
> > > > >  lib/librte_ring/rte_ring_elem.h      | 946
> +++++++++++++++++++++++++++
> > > > >  lib/librte_ring/rte_ring_version.map |   2 +
> > > > >  6 files changed, 991 insertions(+), 9 deletions(-)  create mode
> > > > > 100644 lib/librte_ring/rte_ring_elem.h
> > > > >
> > > > > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
> > > > > index 21a36770d..515a967bb 100644
> > > > > --- a/lib/librte_ring/Makefile
> > > > > +++ b/lib/librte_ring/Makefile

<snip>

> > > > > +
> > > > > +# rte_ring_create_elem and rte_ring_get_memsize_elem are
> > > > > +experimental allow_experimental_apis = true
> > > > > diff --git a/lib/librte_ring/rte_ring.c
> > > > > b/lib/librte_ring/rte_ring.c index d9b308036..6fed3648b 100644
> > > > > --- a/lib/librte_ring/rte_ring.c
> > > > > +++ b/lib/librte_ring/rte_ring.c
> > > > > @@ -33,6 +33,7 @@
> > > > >  #include <rte_tailq.h>
> > > > >
> > > > >  #include "rte_ring.h"
> > > > > +#include "rte_ring_elem.h"
> > > > >

<snip>

> > > > > diff --git a/lib/librte_ring/rte_ring_elem.h
> > > > > b/lib/librte_ring/rte_ring_elem.h new file mode 100644 index
> > > > > 000000000..860f059ad
> > > > > --- /dev/null
> > > > > +++ b/lib/librte_ring/rte_ring_elem.h
> > > > > @@ -0,0 +1,946 @@
> > > > > +/* SPDX-License-Identifier: BSD-3-Clause
> > > > > + *
> > > > > + * Copyright (c) 2019 Arm Limited
> > > > > + * Copyright (c) 2010-2017 Intel Corporation
> > > > > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
> > > > > + * All rights reserved.
> > > > > + * Derived from FreeBSD's bufring.h
> > > > > + * Used as BSD-3 Licensed with permission from Kip Macy.
> > > > > + */
> > > > > +
> > > > > +#ifndef _RTE_RING_ELEM_H_
> > > > > +#define _RTE_RING_ELEM_H_
> > > > > +

<snip>

> > > > > +
> > > > > +/* the actual enqueue of pointers on the ring.
> > > > > + * Placed here since identical code needed in both
> > > > > + * single and multi producer enqueue functions.
> > > > > + */
> > > > > +#define ENQUEUE_PTRS_ELEM(r, ring_start, prod_head, obj_table,
> > > > > +esize, n)
> > > > > do { \
> > > > > +	if (esize == 4) \
> > > > > +		ENQUEUE_PTRS_32(r, ring_start, prod_head,
> obj_table, n); \
> > > > > +	else if (esize == 8) \
> > > > > +		ENQUEUE_PTRS_64(r, ring_start, prod_head,
> obj_table, n); \
> > > > > +	else if (esize == 16) \
> > > > > +		ENQUEUE_PTRS_128(r, ring_start, prod_head,
> obj_table, n);
> > > \ }
> > > > > while
> > > > > +(0)
> > > > > +
> > > > > +#define ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n)
> do { \
> > > > > +	unsigned int i; \
> > > > > +	const uint32_t size = (r)->size; \
> > > > > +	uint32_t idx = prod_head & (r)->mask; \
> > > > > +	uint32_t *ring = (uint32_t *)ring_start; \
> > > > > +	uint32_t *obj = (uint32_t *)obj_table; \
> > > > > +	if (likely(idx + n < size)) { \
> > > > > +		for (i = 0; i < (n & ((~(unsigned)0x7))); i += 8, idx += 8)
> { \
> > > > > +			ring[idx] = obj[i]; \
> > > > > +			ring[idx + 1] = obj[i + 1]; \
> > > > > +			ring[idx + 2] = obj[i + 2]; \
> > > > > +			ring[idx + 3] = obj[i + 3]; \
> > > > > +			ring[idx + 4] = obj[i + 4]; \
> > > > > +			ring[idx + 5] = obj[i + 5]; \
> > > > > +			ring[idx + 6] = obj[i + 6]; \
> > > > > +			ring[idx + 7] = obj[i + 7]; \
> > > > > +		} \
> > > > > +		switch (n & 0x7) { \
> > > > > +		case 7: \
> > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > +		case 6: \
> > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > +		case 5: \
> > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > +		case 4: \
> > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > +		case 3: \
> > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > +		case 2: \
> > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > +		case 1: \
> > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > +		} \
> > > > > +	} else { \
> > > > > +		for (i = 0; idx < size; i++, idx++)\
> > > > > +			ring[idx] = obj[i]; \
> > > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > > +			ring[idx] = obj[i]; \
> > > > > +	} \
> > > > > +} while (0)
> > > > > +
> > > > > +#define ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n)
> do { \
> > > > > +	unsigned int i; \
> > > > > +	const uint32_t size = (r)->size; \
> > > > > +	uint32_t idx = prod_head & (r)->mask; \
> > > > > +	uint64_t *ring = (uint64_t *)ring_start; \
> > > > > +	uint64_t *obj = (uint64_t *)obj_table; \
> > > > > +	if (likely(idx + n < size)) { \
> > > > > +		for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4)
> { \
> > > > > +			ring[idx] = obj[i]; \
> > > > > +			ring[idx + 1] = obj[i + 1]; \
> > > > > +			ring[idx + 2] = obj[i + 2]; \
> > > > > +			ring[idx + 3] = obj[i + 3]; \
> > > > > +		} \
> > > > > +		switch (n & 0x3) { \
> > > > > +		case 3: \
> > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > +		case 2: \
> > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > +		case 1: \
> > > > > +			ring[idx++] = obj[i++]; \
> > > > > +		} \
> > > > > +	} else { \
> > > > > +		for (i = 0; idx < size; i++, idx++)\
> > > > > +			ring[idx] = obj[i]; \
> > > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > > +			ring[idx] = obj[i]; \
> > > > > +	} \
> > > > > +} while (0)
> > > > > +
> > > > > +#define ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table,
> > > > > +n) do
> > > { \
> > > > > +	unsigned int i; \
> > > > > +	const uint32_t size = (r)->size; \
> > > > > +	uint32_t idx = prod_head & (r)->mask; \
> > > > > +	__uint128_t *ring = (__uint128_t *)ring_start; \
> > > > > +	__uint128_t *obj = (__uint128_t *)obj_table; \
> > > > > +	if (likely(idx + n < size)) { \
> > > > > +		for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
> > > > > +			ring[idx] = obj[i]; \
> > > > > +			ring[idx + 1] = obj[i + 1]; \
> > > > > +		} \
> > > > > +		switch (n & 0x1) { \
> > > > > +		case 1: \
> > > > > +			ring[idx++] = obj[i++]; \
> > > > > +		} \
> > > > > +	} else { \
> > > > > +		for (i = 0; idx < size; i++, idx++)\
> > > > > +			ring[idx] = obj[i]; \
> > > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > > +			ring[idx] = obj[i]; \
> > > > > +	} \
> > > > > +} while (0)
> > > > > +
> > > > > +/* the actual copy of pointers on the ring to obj_table.
> > > > > + * Placed here since identical code needed in both
> > > > > + * single and multi consumer dequeue functions.
> > > > > + */
> > > > > +#define DEQUEUE_PTRS_ELEM(r, ring_start, cons_head, obj_table,
> > > > > +esize, n)
> > > > > do { \
> > > > > +	if (esize == 4) \
> > > > > +		DEQUEUE_PTRS_32(r, ring_start, cons_head,
> obj_table, n); \
> > > > > +	else if (esize == 8) \
> > > > > +		DEQUEUE_PTRS_64(r, ring_start, cons_head,
> obj_table, n); \
> > > > > +	else if (esize == 16) \
> > > > > +		DEQUEUE_PTRS_128(r, ring_start, cons_head,
> obj_table, n);
> > > \ }
> > > > > while
> > > > > +(0)
> > > > > +
> > > > > +#define DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n) do
> { \
> > > > > +	unsigned int i; \
> > > > > +	uint32_t idx = cons_head & (r)->mask; \
> > > > > +	const uint32_t size = (r)->size; \
> > > > > +	uint32_t *ring = (uint32_t *)ring_start; \
> > > > > +	uint32_t *obj = (uint32_t *)obj_table; \
> > > > > +	if (likely(idx + n < size)) { \
> > > > > +		for (i = 0; i < (n & (~(unsigned)0x7)); i += 8, idx += 8)
> {\
> > > > > +			obj[i] = ring[idx]; \
> > > > > +			obj[i + 1] = ring[idx + 1]; \
> > > > > +			obj[i + 2] = ring[idx + 2]; \
> > > > > +			obj[i + 3] = ring[idx + 3]; \
> > > > > +			obj[i + 4] = ring[idx + 4]; \
> > > > > +			obj[i + 5] = ring[idx + 5]; \
> > > > > +			obj[i + 6] = ring[idx + 6]; \
> > > > > +			obj[i + 7] = ring[idx + 7]; \
> > > > > +		} \
> > > > > +		switch (n & 0x7) { \
> > > > > +		case 7: \
> > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > +		case 6: \
> > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > +		case 5: \
> > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > +		case 4: \
> > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > +		case 3: \
> > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > +		case 2: \
> > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > +		case 1: \
> > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > +		} \
> > > > > +	} else { \
> > > > > +		for (i = 0; idx < size; i++, idx++) \
> > > > > +			obj[i] = ring[idx]; \
> > > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > > +			obj[i] = ring[idx]; \
> > > > > +	} \
> > > > > +} while (0)
> > > > > +
> > > > > +#define DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n) do
> { \
> > > > > +	unsigned int i; \
> > > > > +	uint32_t idx = cons_head & (r)->mask; \
> > > > > +	const uint32_t size = (r)->size; \
> > > > > +	uint64_t *ring = (uint64_t *)ring_start; \
> > > > > +	uint64_t *obj = (uint64_t *)obj_table; \
> > > > > +	if (likely(idx + n < size)) { \
> > > > > +		for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4)
> {\
> > > > > +			obj[i] = ring[idx]; \
> > > > > +			obj[i + 1] = ring[idx + 1]; \
> > > > > +			obj[i + 2] = ring[idx + 2]; \
> > > > > +			obj[i + 3] = ring[idx + 3]; \
> > > > > +		} \
> > > > > +		switch (n & 0x3) { \
> > > > > +		case 3: \
> > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > +		case 2: \
> > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > +		case 1: \
> > > > > +			obj[i++] = ring[idx++]; \
> > > > > +		} \
> > > > > +	} else { \
> > > > > +		for (i = 0; idx < size; i++, idx++) \
> > > > > +			obj[i] = ring[idx]; \
> > > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > > +			obj[i] = ring[idx]; \
> > > > > +	} \
> > > > > +} while (0)
> > > > > +
> > > > > +#define DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table,
> > > > > +n) do
> > > { \
> > > > > +	unsigned int i; \
> > > > > +	uint32_t idx = cons_head & (r)->mask; \
> > > > > +	const uint32_t size = (r)->size; \
> > > > > +	__uint128_t *ring = (__uint128_t *)ring_start; \
> > > > > +	__uint128_t *obj = (__uint128_t *)obj_table; \
> > > > > +	if (likely(idx + n < size)) { \
> > > > > +		for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
> > > > > +			obj[i] = ring[idx]; \
> > > > > +			obj[i + 1] = ring[idx + 1]; \
> > > > > +		} \
> > > > > +		switch (n & 0x1) { \
> > > > > +		case 1: \
> > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > +		} \
> > > > > +	} else { \
> > > > > +		for (i = 0; idx < size; i++, idx++) \
> > > > > +			obj[i] = ring[idx]; \
> > > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > > +			obj[i] = ring[idx]; \
> > > > > +	} \
> > > > > +} while (0)
> > > > > +
> > > > > +/* Between load and load. there might be cpu reorder in weak
> > > > > +model
> > > > > + * (powerpc/arm).
> > > > > + * There are 2 choices for the users
> > > > > + * 1.use rmb() memory barrier
> > > > > + * 2.use one-direction load_acquire/store_release
> > > > > +barrier,defined by
> > > > > + * CONFIG_RTE_USE_C11_MEM_MODEL=y
> > > > > + * It depends on performance test results.
> > > > > + * By default, move common functions to rte_ring_generic.h  */
> > > > > +#ifdef RTE_USE_C11_MEM_MODEL #include "rte_ring_c11_mem.h"
> > > > > +#else
> > > > > +#include "rte_ring_generic.h"
> > > > > +#endif
> > > > > +
> > > > > +/**
> > > > > + * @internal Enqueue several objects on the ring
> > > > > + *
> > > > > + * @param r
> > > > > + *   A pointer to the ring structure.
> > > > > + * @param obj_table
> > > > > + *   A pointer to a table of void * pointers (objects).
> > > > > + * @param esize
> > > > > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > > > > + *   Currently, sizes 4, 8 and 16 are supported. This should be the
> same
> > > > > + *   as passed while creating the ring, otherwise the results are
> undefined.
> > > > > + * @param n
> > > > > + *   The number of objects to add in the ring from the obj_table.
> > > > > + * @param behavior
> > > > > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items
> from a
> > > ring
> > > > > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible
> > > from
> > > > > ring
> > > > > + * @param is_sp
> > > > > + *   Indicates whether to use single producer or multi-producer head
> > > update
> > > > > + * @param free_space
> > > > > + *   returns the amount of space after the enqueue operation has
> > > finished
> > > > > + * @return
> > > > > + *   Actual number of objects enqueued.
> > > > > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > > > > + */
> > > > > +static __rte_always_inline unsigned int
> > > > > +__rte_ring_do_enqueue_elem(struct rte_ring *r, void * const
> obj_table,
> > > > > +		unsigned int esize, unsigned int n,
> > > > > +		enum rte_ring_queue_behavior behavior, unsigned
> int is_sp,
> > > > > +		unsigned int *free_space)
> > >
> > >
> > > I like the idea to add esize as an argument to the public API, so
> > > the compiler can do it's jib optimizing calls with constant esize.
> > > Though I am not very happy with the rest of implementation:
> > > 1. It doesn't really provide configurable elem size - only 4/8/16B
> > > elems are supported.
> > Agree. I was thinking other sizes can be added on need basis.
> > However, I am wondering if we should just provide for 4B and then the
> users can use bulk operations to construct whatever they need?
> 
> I suppose it could be plan B... if there would be no agreement on generic case.
> And for 4B elems, I guess you do have a particular use-case?
Yes

> 
> > It
> > would mean extra work for the users.
> >
> > > 2. A lot of code duplication with these 3 copies of ENQUEUE/DEQUEUE
> > > macros.
> > >
> > > Looking at ENQUEUE/DEQUEUE macros, I can see that main loop always
> > > does 32B copy per iteration.
> > Yes, I tried to keep it the same as the existing one (originally, I
> > guess the intention was to allow for 256b vector instructions to be
> > generated)
> >
> > > So wonder can we make a generic function that would do 32B copy per
> > > iteration in a main loop, and copy tail  by 4B chunks?
> > > That would avoid copy duplication and will allow user to have any
> > > elem size (multiple of 4B) he wants.
> > > Something like that (note didn't test it, just a rough idea):
> > >
> > >  static inline void
> > > copy_elems(uint32_t du32[], const uint32_t su32[], uint32_t num,
> > > uint32_t
> > > esize) {
> > >         uint32_t i, sz;
> > >
> > >         sz = (num * esize) / sizeof(uint32_t);
> > If 'num' is a compile time constant, 'sz' will be a compile time constant.
> Otherwise, this will result in a multiplication operation.
> 
> Not always.
> If esize is compile time constant, then for esize as power of 2 (4,8,16,...), it
> would be just one shift.
> For other constant values it could be a 'mul' or in many cases just 2 shifts plus
> 'add' (if compiler is smart enough).
> I.E. let say for 24B elem is would be either num * 6 or (num << 2) + (num <<
> 1).
With num * 15 it has to be (num << 3) + (num << 2) + (num << 1) + num
Not sure if the compiler will do this.

> I suppose for non-power of 2 elems it might be ok to get such small perf hit.
Agree, should be ok not to focus on right now.

> 
> >I have tried
> > to avoid the multiplication operation and try to use shift and mask
> operations (just like how the rest of the ring code does).
> >
> > >
> > >         for (i = 0; i < (sz & ~7); i += 8)
> > >                 memcpy(du32 + i, su32 + i, 8 * sizeof(uint32_t));
> > I had used memcpy to start with (for the entire copy operation),
> > performance is not the same for 64b elements when compared with the
> existing ring APIs (some cases more and some cases less).
> 
> I remember that from one of your previous mails, that's why here I suggest to
> use in a loop memcpy() with fixed size.
> That way for each iteration complier will replace memcpy() with instructions
> to copy 32B in a way he thinks is optimal (same as for original macro, I think).
I tried this. On x86 (Xeon(R) Gold 6132 CPU @ 2.60GHz), the results are as follows. The numbers in brackets are with the code on master.
gcc (Ubuntu 7.4.0-1ubuntu1~18.04.1) 7.4.0

RTE>>ring_perf_elem_autotest
### Testing single element and burst enq/deq ###
SP/SC single enq/dequeue: 5
MP/MC single enq/dequeue: 40 (35)
SP/SC burst enq/dequeue (size: 8): 2
MP/MC burst enq/dequeue (size: 8): 6
SP/SC burst enq/dequeue (size: 32): 1 (2)
MP/MC burst enq/dequeue (size: 32): 2

### Testing empty dequeue ###
SC empty dequeue: 2.11
MC empty dequeue: 1.41 (2.11)

### Testing using a single lcore ###
SP/SC bulk enq/dequeue (size: 8): 2.15 (2.86)
MP/MC bulk enq/dequeue (size: 8): 6.35 (6.91)
SP/SC bulk enq/dequeue (size: 32): 1.35 (2.06)
MP/MC bulk enq/dequeue (size: 32): 2.38 (2.95)

### Testing using two physical cores ###
SP/SC bulk enq/dequeue (size: 8): 73.81 (15.33)
MP/MC bulk enq/dequeue (size: 8): 75.10 (71.27)
SP/SC bulk enq/dequeue (size: 32): 21.14 (9.58)
MP/MC bulk enq/dequeue (size: 32): 25.74 (20.91)

### Testing using two NUMA nodes ###
SP/SC bulk enq/dequeue (size: 8): 164.32 (50.66)
MP/MC bulk enq/dequeue (size: 8): 176.02 (173.43)
SP/SC bulk enq/dequeue (size: 32): 50.78 (23)
MP/MC bulk enq/dequeue (size: 32): 63.17 (46.74)

On one of the Arm platform
MP/MC bulk enq/dequeue (size: 32): 0.37 (0.33) (~12% hit, the rest are ok)

On another Arm platform, all numbers are same or slightly better.

I can post the patch with this change if you want to run some benchmarks on your platform.
I have not used the same code you have suggested, instead I have used the same logic in a single macro with memcpy.

> 
> >
> > IMO, we have to keep the performance of the 64b and 128b the same as
> > what we get with the existing ring and event-ring APIs. That would allow us
> to replace them with these new APIs. I suggest that we keep the macros in
> this patch for 64b and 128b.
> 
> I still think we probably can achieve that without duplicating macros, while
> still supporting arbitrary elem size.
> See above.
> 
> > For the rest of the sizes, we could put a for loop around 32b macro (this
> would allow for all sizes as well).
> >
> > >
> > >         switch (sz & 7) {
> > >         case 7: du32[sz - 7] = su32[sz - 7]; /* fallthrough */
> > >         case 6: du32[sz - 6] = su32[sz - 6]; /* fallthrough */
> > >         case 5: du32[sz - 5] = su32[sz - 5]; /* fallthrough */
> > >         case 4: du32[sz - 4] = su32[sz - 4]; /* fallthrough */
> > >         case 3: du32[sz - 3] = su32[sz - 3]; /* fallthrough */
> > >         case 2: du32[sz - 2] = su32[sz - 2]; /* fallthrough */
> > >         case 1: du32[sz - 1] = su32[sz - 1]; /* fallthrough */
> > >         }
> > > }
> > >
> > > static inline void
> > > enqueue_elems(struct rte_ring *r, void *ring_start, uint32_t prod_head,
> > >                 void *obj_table, uint32_t num, uint32_t esize) {
> > >         uint32_t idx, n;
> > >         uint32_t *du32;
> > >
> > >         const uint32_t size = r->size;
> > >
> > >         idx = prod_head & (r)->mask;
> > >
> > >         du32 = ring_start + idx * sizeof(uint32_t);
> > >
> > >         if (idx + num < size)
> > >                 copy_elems(du32, obj_table, num, esize);
> > >         else {
> > >                 n = size - idx;
> > >                 copy_elems(du32, obj_table, n, esize);
> > >                 copy_elems(ring_start, obj_table + n * sizeof(uint32_t),
> > >                         num - n, esize);
> > >         }
> > > }
> > >
> > > And then, in that function, instead of ENQUEUE_PTRS_ELEM(), just:
> > >
> > > enqueue_elems(r, &r[1], prod_head, obj_table, n, esize);
> > >
> > >
> > > > > +{
> > > > > +	uint32_t prod_head, prod_next;
> > > > > +	uint32_t free_entries;
> > > > > +
> > > > > +	n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
> > > > > +			&prod_head, &prod_next, &free_entries);
> > > > > +	if (n == 0)
> > > > > +		goto end;
> > > > > +
> > > > > +	ENQUEUE_PTRS_ELEM(r, &r[1], prod_head, obj_table, esize,
> n);
> > > > > +
> > > > > +	update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
> > > > > +end:
> > > > > +	if (free_space != NULL)
> > > > > +		*free_space = free_entries - n;
> > > > > +	return n;
> > > > > +}
> > > > > +
Ananyev, Konstantin Oct. 17, 2019, 11:51 a.m. | #6
> > > > > > Current APIs assume ring elements to be pointers. However, in
> > > > > > many use cases, the size can be different. Add new APIs to
> > > > > > support configurable ring element sizes.
> > > > > >
> > > > > > Signed-off-by: Honnappa Nagarahalli
> > > > > > <honnappa.nagarahalli@arm.com>
> > > > > > Reviewed-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
> > > > > > Reviewed-by: Gavin Hu <gavin.hu@arm.com>
> > > > > > Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
> > > > > > ---
> > > > > >  lib/librte_ring/Makefile             |   3 +-
> > > > > >  lib/librte_ring/meson.build          |   3 +
> > > > > >  lib/librte_ring/rte_ring.c           |  45 +-
> > > > > >  lib/librte_ring/rte_ring.h           |   1 +
> > > > > >  lib/librte_ring/rte_ring_elem.h      | 946
> > +++++++++++++++++++++++++++
> > > > > >  lib/librte_ring/rte_ring_version.map |   2 +
> > > > > >  6 files changed, 991 insertions(+), 9 deletions(-)  create mode
> > > > > > 100644 lib/librte_ring/rte_ring_elem.h
> > > > > >
> > > > > > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
> > > > > > index 21a36770d..515a967bb 100644
> > > > > > --- a/lib/librte_ring/Makefile
> > > > > > +++ b/lib/librte_ring/Makefile
> 
> <snip>
> 
> > > > > > +
> > > > > > +# rte_ring_create_elem and rte_ring_get_memsize_elem are
> > > > > > +experimental allow_experimental_apis = true
> > > > > > diff --git a/lib/librte_ring/rte_ring.c
> > > > > > b/lib/librte_ring/rte_ring.c index d9b308036..6fed3648b 100644
> > > > > > --- a/lib/librte_ring/rte_ring.c
> > > > > > +++ b/lib/librte_ring/rte_ring.c
> > > > > > @@ -33,6 +33,7 @@
> > > > > >  #include <rte_tailq.h>
> > > > > >
> > > > > >  #include "rte_ring.h"
> > > > > > +#include "rte_ring_elem.h"
> > > > > >
> 
> <snip>
> 
> > > > > > diff --git a/lib/librte_ring/rte_ring_elem.h
> > > > > > b/lib/librte_ring/rte_ring_elem.h new file mode 100644 index
> > > > > > 000000000..860f059ad
> > > > > > --- /dev/null
> > > > > > +++ b/lib/librte_ring/rte_ring_elem.h
> > > > > > @@ -0,0 +1,946 @@
> > > > > > +/* SPDX-License-Identifier: BSD-3-Clause
> > > > > > + *
> > > > > > + * Copyright (c) 2019 Arm Limited
> > > > > > + * Copyright (c) 2010-2017 Intel Corporation
> > > > > > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
> > > > > > + * All rights reserved.
> > > > > > + * Derived from FreeBSD's bufring.h
> > > > > > + * Used as BSD-3 Licensed with permission from Kip Macy.
> > > > > > + */
> > > > > > +
> > > > > > +#ifndef _RTE_RING_ELEM_H_
> > > > > > +#define _RTE_RING_ELEM_H_
> > > > > > +
> 
> <snip>
> 
> > > > > > +
> > > > > > +/* the actual enqueue of pointers on the ring.
> > > > > > + * Placed here since identical code needed in both
> > > > > > + * single and multi producer enqueue functions.
> > > > > > + */
> > > > > > +#define ENQUEUE_PTRS_ELEM(r, ring_start, prod_head, obj_table,
> > > > > > +esize, n)
> > > > > > do { \
> > > > > > +	if (esize == 4) \
> > > > > > +		ENQUEUE_PTRS_32(r, ring_start, prod_head,
> > obj_table, n); \
> > > > > > +	else if (esize == 8) \
> > > > > > +		ENQUEUE_PTRS_64(r, ring_start, prod_head,
> > obj_table, n); \
> > > > > > +	else if (esize == 16) \
> > > > > > +		ENQUEUE_PTRS_128(r, ring_start, prod_head,
> > obj_table, n);
> > > > \ }
> > > > > > while
> > > > > > +(0)
> > > > > > +
> > > > > > +#define ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n)
> > do { \
> > > > > > +	unsigned int i; \
> > > > > > +	const uint32_t size = (r)->size; \
> > > > > > +	uint32_t idx = prod_head & (r)->mask; \
> > > > > > +	uint32_t *ring = (uint32_t *)ring_start; \
> > > > > > +	uint32_t *obj = (uint32_t *)obj_table; \
> > > > > > +	if (likely(idx + n < size)) { \
> > > > > > +		for (i = 0; i < (n & ((~(unsigned)0x7))); i += 8, idx += 8)
> > { \
> > > > > > +			ring[idx] = obj[i]; \
> > > > > > +			ring[idx + 1] = obj[i + 1]; \
> > > > > > +			ring[idx + 2] = obj[i + 2]; \
> > > > > > +			ring[idx + 3] = obj[i + 3]; \
> > > > > > +			ring[idx + 4] = obj[i + 4]; \
> > > > > > +			ring[idx + 5] = obj[i + 5]; \
> > > > > > +			ring[idx + 6] = obj[i + 6]; \
> > > > > > +			ring[idx + 7] = obj[i + 7]; \
> > > > > > +		} \
> > > > > > +		switch (n & 0x7) { \
> > > > > > +		case 7: \
> > > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > > +		case 6: \
> > > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > > +		case 5: \
> > > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > > +		case 4: \
> > > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > > +		case 3: \
> > > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > > +		case 2: \
> > > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > > +		case 1: \
> > > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > > +		} \
> > > > > > +	} else { \
> > > > > > +		for (i = 0; idx < size; i++, idx++)\
> > > > > > +			ring[idx] = obj[i]; \
> > > > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > > > +			ring[idx] = obj[i]; \
> > > > > > +	} \
> > > > > > +} while (0)
> > > > > > +
> > > > > > +#define ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n)
> > do { \
> > > > > > +	unsigned int i; \
> > > > > > +	const uint32_t size = (r)->size; \
> > > > > > +	uint32_t idx = prod_head & (r)->mask; \
> > > > > > +	uint64_t *ring = (uint64_t *)ring_start; \
> > > > > > +	uint64_t *obj = (uint64_t *)obj_table; \
> > > > > > +	if (likely(idx + n < size)) { \
> > > > > > +		for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4)
> > { \
> > > > > > +			ring[idx] = obj[i]; \
> > > > > > +			ring[idx + 1] = obj[i + 1]; \
> > > > > > +			ring[idx + 2] = obj[i + 2]; \
> > > > > > +			ring[idx + 3] = obj[i + 3]; \
> > > > > > +		} \
> > > > > > +		switch (n & 0x3) { \
> > > > > > +		case 3: \
> > > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > > +		case 2: \
> > > > > > +			ring[idx++] = obj[i++]; /* fallthrough */ \
> > > > > > +		case 1: \
> > > > > > +			ring[idx++] = obj[i++]; \
> > > > > > +		} \
> > > > > > +	} else { \
> > > > > > +		for (i = 0; idx < size; i++, idx++)\
> > > > > > +			ring[idx] = obj[i]; \
> > > > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > > > +			ring[idx] = obj[i]; \
> > > > > > +	} \
> > > > > > +} while (0)
> > > > > > +
> > > > > > +#define ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table,
> > > > > > +n) do
> > > > { \
> > > > > > +	unsigned int i; \
> > > > > > +	const uint32_t size = (r)->size; \
> > > > > > +	uint32_t idx = prod_head & (r)->mask; \
> > > > > > +	__uint128_t *ring = (__uint128_t *)ring_start; \
> > > > > > +	__uint128_t *obj = (__uint128_t *)obj_table; \
> > > > > > +	if (likely(idx + n < size)) { \
> > > > > > +		for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
> > > > > > +			ring[idx] = obj[i]; \
> > > > > > +			ring[idx + 1] = obj[i + 1]; \
> > > > > > +		} \
> > > > > > +		switch (n & 0x1) { \
> > > > > > +		case 1: \
> > > > > > +			ring[idx++] = obj[i++]; \
> > > > > > +		} \
> > > > > > +	} else { \
> > > > > > +		for (i = 0; idx < size; i++, idx++)\
> > > > > > +			ring[idx] = obj[i]; \
> > > > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > > > +			ring[idx] = obj[i]; \
> > > > > > +	} \
> > > > > > +} while (0)
> > > > > > +
> > > > > > +/* the actual copy of pointers on the ring to obj_table.
> > > > > > + * Placed here since identical code needed in both
> > > > > > + * single and multi consumer dequeue functions.
> > > > > > + */
> > > > > > +#define DEQUEUE_PTRS_ELEM(r, ring_start, cons_head, obj_table,
> > > > > > +esize, n)
> > > > > > do { \
> > > > > > +	if (esize == 4) \
> > > > > > +		DEQUEUE_PTRS_32(r, ring_start, cons_head,
> > obj_table, n); \
> > > > > > +	else if (esize == 8) \
> > > > > > +		DEQUEUE_PTRS_64(r, ring_start, cons_head,
> > obj_table, n); \
> > > > > > +	else if (esize == 16) \
> > > > > > +		DEQUEUE_PTRS_128(r, ring_start, cons_head,
> > obj_table, n);
> > > > \ }
> > > > > > while
> > > > > > +(0)
> > > > > > +
> > > > > > +#define DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n) do
> > { \
> > > > > > +	unsigned int i; \
> > > > > > +	uint32_t idx = cons_head & (r)->mask; \
> > > > > > +	const uint32_t size = (r)->size; \
> > > > > > +	uint32_t *ring = (uint32_t *)ring_start; \
> > > > > > +	uint32_t *obj = (uint32_t *)obj_table; \
> > > > > > +	if (likely(idx + n < size)) { \
> > > > > > +		for (i = 0; i < (n & (~(unsigned)0x7)); i += 8, idx += 8)
> > {\
> > > > > > +			obj[i] = ring[idx]; \
> > > > > > +			obj[i + 1] = ring[idx + 1]; \
> > > > > > +			obj[i + 2] = ring[idx + 2]; \
> > > > > > +			obj[i + 3] = ring[idx + 3]; \
> > > > > > +			obj[i + 4] = ring[idx + 4]; \
> > > > > > +			obj[i + 5] = ring[idx + 5]; \
> > > > > > +			obj[i + 6] = ring[idx + 6]; \
> > > > > > +			obj[i + 7] = ring[idx + 7]; \
> > > > > > +		} \
> > > > > > +		switch (n & 0x7) { \
> > > > > > +		case 7: \
> > > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > > +		case 6: \
> > > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > > +		case 5: \
> > > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > > +		case 4: \
> > > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > > +		case 3: \
> > > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > > +		case 2: \
> > > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > > +		case 1: \
> > > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > > +		} \
> > > > > > +	} else { \
> > > > > > +		for (i = 0; idx < size; i++, idx++) \
> > > > > > +			obj[i] = ring[idx]; \
> > > > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > > > +			obj[i] = ring[idx]; \
> > > > > > +	} \
> > > > > > +} while (0)
> > > > > > +
> > > > > > +#define DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n) do
> > { \
> > > > > > +	unsigned int i; \
> > > > > > +	uint32_t idx = cons_head & (r)->mask; \
> > > > > > +	const uint32_t size = (r)->size; \
> > > > > > +	uint64_t *ring = (uint64_t *)ring_start; \
> > > > > > +	uint64_t *obj = (uint64_t *)obj_table; \
> > > > > > +	if (likely(idx + n < size)) { \
> > > > > > +		for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4)
> > {\
> > > > > > +			obj[i] = ring[idx]; \
> > > > > > +			obj[i + 1] = ring[idx + 1]; \
> > > > > > +			obj[i + 2] = ring[idx + 2]; \
> > > > > > +			obj[i + 3] = ring[idx + 3]; \
> > > > > > +		} \
> > > > > > +		switch (n & 0x3) { \
> > > > > > +		case 3: \
> > > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > > +		case 2: \
> > > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > > +		case 1: \
> > > > > > +			obj[i++] = ring[idx++]; \
> > > > > > +		} \
> > > > > > +	} else { \
> > > > > > +		for (i = 0; idx < size; i++, idx++) \
> > > > > > +			obj[i] = ring[idx]; \
> > > > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > > > +			obj[i] = ring[idx]; \
> > > > > > +	} \
> > > > > > +} while (0)
> > > > > > +
> > > > > > +#define DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table,
> > > > > > +n) do
> > > > { \
> > > > > > +	unsigned int i; \
> > > > > > +	uint32_t idx = cons_head & (r)->mask; \
> > > > > > +	const uint32_t size = (r)->size; \
> > > > > > +	__uint128_t *ring = (__uint128_t *)ring_start; \
> > > > > > +	__uint128_t *obj = (__uint128_t *)obj_table; \
> > > > > > +	if (likely(idx + n < size)) { \
> > > > > > +		for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
> > > > > > +			obj[i] = ring[idx]; \
> > > > > > +			obj[i + 1] = ring[idx + 1]; \
> > > > > > +		} \
> > > > > > +		switch (n & 0x1) { \
> > > > > > +		case 1: \
> > > > > > +			obj[i++] = ring[idx++]; /* fallthrough */ \
> > > > > > +		} \
> > > > > > +	} else { \
> > > > > > +		for (i = 0; idx < size; i++, idx++) \
> > > > > > +			obj[i] = ring[idx]; \
> > > > > > +		for (idx = 0; i < n; i++, idx++) \
> > > > > > +			obj[i] = ring[idx]; \
> > > > > > +	} \
> > > > > > +} while (0)
> > > > > > +
> > > > > > +/* Between load and load. there might be cpu reorder in weak
> > > > > > +model
> > > > > > + * (powerpc/arm).
> > > > > > + * There are 2 choices for the users
> > > > > > + * 1.use rmb() memory barrier
> > > > > > + * 2.use one-direction load_acquire/store_release
> > > > > > +barrier,defined by
> > > > > > + * CONFIG_RTE_USE_C11_MEM_MODEL=y
> > > > > > + * It depends on performance test results.
> > > > > > + * By default, move common functions to rte_ring_generic.h  */
> > > > > > +#ifdef RTE_USE_C11_MEM_MODEL #include "rte_ring_c11_mem.h"
> > > > > > +#else
> > > > > > +#include "rte_ring_generic.h"
> > > > > > +#endif
> > > > > > +
> > > > > > +/**
> > > > > > + * @internal Enqueue several objects on the ring
> > > > > > + *
> > > > > > + * @param r
> > > > > > + *   A pointer to the ring structure.
> > > > > > + * @param obj_table
> > > > > > + *   A pointer to a table of void * pointers (objects).
> > > > > > + * @param esize
> > > > > > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > > > > > + *   Currently, sizes 4, 8 and 16 are supported. This should be the
> > same
> > > > > > + *   as passed while creating the ring, otherwise the results are
> > undefined.
> > > > > > + * @param n
> > > > > > + *   The number of objects to add in the ring from the obj_table.
> > > > > > + * @param behavior
> > > > > > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items
> > from a
> > > > ring
> > > > > > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible
> > > > from
> > > > > > ring
> > > > > > + * @param is_sp
> > > > > > + *   Indicates whether to use single producer or multi-producer head
> > > > update
> > > > > > + * @param free_space
> > > > > > + *   returns the amount of space after the enqueue operation has
> > > > finished
> > > > > > + * @return
> > > > > > + *   Actual number of objects enqueued.
> > > > > > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > > > > > + */
> > > > > > +static __rte_always_inline unsigned int
> > > > > > +__rte_ring_do_enqueue_elem(struct rte_ring *r, void * const
> > obj_table,
> > > > > > +		unsigned int esize, unsigned int n,
> > > > > > +		enum rte_ring_queue_behavior behavior, unsigned
> > int is_sp,
> > > > > > +		unsigned int *free_space)
> > > >
> > > >
> > > > I like the idea to add esize as an argument to the public API, so
> > > > the compiler can do it's jib optimizing calls with constant esize.
> > > > Though I am not very happy with the rest of implementation:
> > > > 1. It doesn't really provide configurable elem size - only 4/8/16B
> > > > elems are supported.
> > > Agree. I was thinking other sizes can be added on need basis.
> > > However, I am wondering if we should just provide for 4B and then the
> > users can use bulk operations to construct whatever they need?
> >
> > I suppose it could be plan B... if there would be no agreement on generic case.
> > And for 4B elems, I guess you do have a particular use-case?
> Yes
> 
> >
> > > It
> > > would mean extra work for the users.
> > >
> > > > 2. A lot of code duplication with these 3 copies of ENQUEUE/DEQUEUE
> > > > macros.
> > > >
> > > > Looking at ENQUEUE/DEQUEUE macros, I can see that main loop always
> > > > does 32B copy per iteration.
> > > Yes, I tried to keep it the same as the existing one (originally, I
> > > guess the intention was to allow for 256b vector instructions to be
> > > generated)
> > >
> > > > So wonder can we make a generic function that would do 32B copy per
> > > > iteration in a main loop, and copy tail  by 4B chunks?
> > > > That would avoid copy duplication and will allow user to have any
> > > > elem size (multiple of 4B) he wants.
> > > > Something like that (note didn't test it, just a rough idea):
> > > >
> > > >  static inline void
> > > > copy_elems(uint32_t du32[], const uint32_t su32[], uint32_t num,
> > > > uint32_t
> > > > esize) {
> > > >         uint32_t i, sz;
> > > >
> > > >         sz = (num * esize) / sizeof(uint32_t);
> > > If 'num' is a compile time constant, 'sz' will be a compile time constant.
> > Otherwise, this will result in a multiplication operation.
> >
> > Not always.
> > If esize is compile time constant, then for esize as power of 2 (4,8,16,...), it
> > would be just one shift.
> > For other constant values it could be a 'mul' or in many cases just 2 shifts plus
> > 'add' (if compiler is smart enough).
> > I.E. let say for 24B elem is would be either num * 6 or (num << 2) + (num <<
> > 1).
> With num * 15 it has to be (num << 3) + (num << 2) + (num << 1) + num
> Not sure if the compiler will do this.

For 15, it can be just (num << 4) - num

> 
> > I suppose for non-power of 2 elems it might be ok to get such small perf hit.
> Agree, should be ok not to focus on right now.
> 
> >
> > >I have tried
> > > to avoid the multiplication operation and try to use shift and mask
> > operations (just like how the rest of the ring code does).
> > >
> > > >
> > > >         for (i = 0; i < (sz & ~7); i += 8)
> > > >                 memcpy(du32 + i, su32 + i, 8 * sizeof(uint32_t));
> > > I had used memcpy to start with (for the entire copy operation),
> > > performance is not the same for 64b elements when compared with the
> > existing ring APIs (some cases more and some cases less).
> >
> > I remember that from one of your previous mails, that's why here I suggest to
> > use in a loop memcpy() with fixed size.
> > That way for each iteration complier will replace memcpy() with instructions
> > to copy 32B in a way he thinks is optimal (same as for original macro, I think).
> I tried this. On x86 (Xeon(R) Gold 6132 CPU @ 2.60GHz), the results are as follows. The numbers in brackets are with the code on master.
> gcc (Ubuntu 7.4.0-1ubuntu1~18.04.1) 7.4.0
> 
> RTE>>ring_perf_elem_autotest
> ### Testing single element and burst enq/deq ###
> SP/SC single enq/dequeue: 5
> MP/MC single enq/dequeue: 40 (35)
> SP/SC burst enq/dequeue (size: 8): 2
> MP/MC burst enq/dequeue (size: 8): 6
> SP/SC burst enq/dequeue (size: 32): 1 (2)
> MP/MC burst enq/dequeue (size: 32): 2
> 
> ### Testing empty dequeue ###
> SC empty dequeue: 2.11
> MC empty dequeue: 1.41 (2.11)
> 
> ### Testing using a single lcore ###
> SP/SC bulk enq/dequeue (size: 8): 2.15 (2.86)
> MP/MC bulk enq/dequeue (size: 8): 6.35 (6.91)
> SP/SC bulk enq/dequeue (size: 32): 1.35 (2.06)
> MP/MC bulk enq/dequeue (size: 32): 2.38 (2.95)
> 
> ### Testing using two physical cores ###
> SP/SC bulk enq/dequeue (size: 8): 73.81 (15.33)
> MP/MC bulk enq/dequeue (size: 8): 75.10 (71.27)
> SP/SC bulk enq/dequeue (size: 32): 21.14 (9.58)
> MP/MC bulk enq/dequeue (size: 32): 25.74 (20.91)
> 
> ### Testing using two NUMA nodes ###
> SP/SC bulk enq/dequeue (size: 8): 164.32 (50.66)
> MP/MC bulk enq/dequeue (size: 8): 176.02 (173.43)
> SP/SC bulk enq/dequeue (size: 32): 50.78 (23)
> MP/MC bulk enq/dequeue (size: 32): 63.17 (46.74)
> 
> On one of the Arm platform
> MP/MC bulk enq/dequeue (size: 32): 0.37 (0.33) (~12% hit, the rest are ok)

So it shows better numbers for one core, but worse on 2, right?

 
> On another Arm platform, all numbers are same or slightly better.
> 
> I can post the patch with this change if you want to run some benchmarks on your platform.

Sure, please do.
I'll try to run on my boxes.

> I have not used the same code you have suggested, instead I have used the same logic in a single macro with memcpy.
>
Honnappa Nagarahalli Oct. 17, 2019, 8:16 p.m. | #7
<snip>

+ David Christensen for Power architecture

> > >
> > > > It
> > > > would mean extra work for the users.
> > > >
> > > > > 2. A lot of code duplication with these 3 copies of
> > > > > ENQUEUE/DEQUEUE macros.
> > > > >
> > > > > Looking at ENQUEUE/DEQUEUE macros, I can see that main loop
> > > > > always does 32B copy per iteration.
> > > > Yes, I tried to keep it the same as the existing one (originally,
> > > > I guess the intention was to allow for 256b vector instructions to
> > > > be
> > > > generated)
> > > >
> > > > > So wonder can we make a generic function that would do 32B copy
> > > > > per iteration in a main loop, and copy tail  by 4B chunks?
> > > > > That would avoid copy duplication and will allow user to have
> > > > > any elem size (multiple of 4B) he wants.
> > > > > Something like that (note didn't test it, just a rough idea):
> > > > >
> > > > >  static inline void
> > > > > copy_elems(uint32_t du32[], const uint32_t su32[], uint32_t num,
> > > > > uint32_t
> > > > > esize) {
> > > > >         uint32_t i, sz;
> > > > >
> > > > >         sz = (num * esize) / sizeof(uint32_t);
> > > > If 'num' is a compile time constant, 'sz' will be a compile time constant.
> > > Otherwise, this will result in a multiplication operation.
> > >
> > > Not always.
> > > If esize is compile time constant, then for esize as power of 2
> > > (4,8,16,...), it would be just one shift.
> > > For other constant values it could be a 'mul' or in many cases just
> > > 2 shifts plus 'add' (if compiler is smart enough).
> > > I.E. let say for 24B elem is would be either num * 6 or (num << 2) +
> > > (num << 1).
> > With num * 15 it has to be (num << 3) + (num << 2) + (num << 1) + num
> > Not sure if the compiler will do this.
> 
> For 15, it can be just (num << 4) - num
> 
> >
> > > I suppose for non-power of 2 elems it might be ok to get such small perf hit.
> > Agree, should be ok not to focus on right now.
> >
> > >
> > > >I have tried
> > > > to avoid the multiplication operation and try to use shift and
> > > >mask
> > > operations (just like how the rest of the ring code does).
> > > >
> > > > >
> > > > >         for (i = 0; i < (sz & ~7); i += 8)
> > > > >                 memcpy(du32 + i, su32 + i, 8 *
> > > > > sizeof(uint32_t));
> > > > I had used memcpy to start with (for the entire copy operation),
> > > > performance is not the same for 64b elements when compared with
> > > > the
> > > existing ring APIs (some cases more and some cases less).
> > >
> > > I remember that from one of your previous mails, that's why here I
> > > suggest to use in a loop memcpy() with fixed size.
> > > That way for each iteration complier will replace memcpy() with
> > > instructions to copy 32B in a way he thinks is optimal (same as for original
> macro, I think).
> > I tried this. On x86 (Xeon(R) Gold 6132 CPU @ 2.60GHz), the results are as
> follows. The numbers in brackets are with the code on master.
> > gcc (Ubuntu 7.4.0-1ubuntu1~18.04.1) 7.4.0
> >
> > RTE>>ring_perf_elem_autotest
> > ### Testing single element and burst enq/deq ### SP/SC single
> > enq/dequeue: 5 MP/MC single enq/dequeue: 40 (35) SP/SC burst
> > enq/dequeue (size: 8): 2 MP/MC burst enq/dequeue (size: 8): 6 SP/SC
> > burst enq/dequeue (size: 32): 1 (2) MP/MC burst enq/dequeue (size:
> > 32): 2
> >
> > ### Testing empty dequeue ###
> > SC empty dequeue: 2.11
> > MC empty dequeue: 1.41 (2.11)
> >
> > ### Testing using a single lcore ###
> > SP/SC bulk enq/dequeue (size: 8): 2.15 (2.86) MP/MC bulk enq/dequeue
> > (size: 8): 6.35 (6.91) SP/SC bulk enq/dequeue (size: 32): 1.35 (2.06)
> > MP/MC bulk enq/dequeue (size: 32): 2.38 (2.95)
> >
> > ### Testing using two physical cores ### SP/SC bulk enq/dequeue (size:
> > 8): 73.81 (15.33) MP/MC bulk enq/dequeue (size: 8): 75.10 (71.27)
> > SP/SC bulk enq/dequeue (size: 32): 21.14 (9.58) MP/MC bulk enq/dequeue
> > (size: 32): 25.74 (20.91)
> >
> > ### Testing using two NUMA nodes ###
> > SP/SC bulk enq/dequeue (size: 8): 164.32 (50.66) MP/MC bulk
> > enq/dequeue (size: 8): 176.02 (173.43) SP/SC bulk enq/dequeue (size:
> > 32): 50.78 (23) MP/MC bulk enq/dequeue (size: 32): 63.17 (46.74)
> >
> > On one of the Arm platform
> > MP/MC bulk enq/dequeue (size: 32): 0.37 (0.33) (~12% hit, the rest are
> > ok)
> 
> So it shows better numbers for one core, but worse on 2, right?
> 
> 
> > On another Arm platform, all numbers are same or slightly better.
> >
> > I can post the patch with this change if you want to run some benchmarks on
> your platform.
> 
> Sure, please do.
> I'll try to run on my boxes.
Sent v5, please check. Other platform owners should run this as well.

> 
> > I have not used the same code you have suggested, instead I have used the
> same logic in a single macro with memcpy.
> >
David Christensen Oct. 17, 2019, 11:17 p.m. | #8
>>> I tried this. On x86 (Xeon(R) Gold 6132 CPU @ 2.60GHz), the results are as
>> follows. The numbers in brackets are with the code on master.
>>> gcc (Ubuntu 7.4.0-1ubuntu1~18.04.1) 7.4.0
>>>
>>> RTE>>ring_perf_elem_autotest
>>> ### Testing single element and burst enq/deq ### SP/SC single
>>> enq/dequeue: 5 MP/MC single enq/dequeue: 40 (35) SP/SC burst
>>> enq/dequeue (size: 8): 2 MP/MC burst enq/dequeue (size: 8): 6 SP/SC
>>> burst enq/dequeue (size: 32): 1 (2) MP/MC burst enq/dequeue (size:
>>> 32): 2
>>>
>>> ### Testing empty dequeue ###
>>> SC empty dequeue: 2.11
>>> MC empty dequeue: 1.41 (2.11)
>>>
>>> ### Testing using a single lcore ###
>>> SP/SC bulk enq/dequeue (size: 8): 2.15 (2.86) MP/MC bulk enq/dequeue
>>> (size: 8): 6.35 (6.91) SP/SC bulk enq/dequeue (size: 32): 1.35 (2.06)
>>> MP/MC bulk enq/dequeue (size: 32): 2.38 (2.95)
>>>
>>> ### Testing using two physical cores ### SP/SC bulk enq/dequeue (size:
>>> 8): 73.81 (15.33) MP/MC bulk enq/dequeue (size: 8): 75.10 (71.27)
>>> SP/SC bulk enq/dequeue (size: 32): 21.14 (9.58) MP/MC bulk enq/dequeue
>>> (size: 32): 25.74 (20.91)
>>>
>>> ### Testing using two NUMA nodes ###
>>> SP/SC bulk enq/dequeue (size: 8): 164.32 (50.66) MP/MC bulk
>>> enq/dequeue (size: 8): 176.02 (173.43) SP/SC bulk enq/dequeue (size:
>>> 32): 50.78 (23) MP/MC bulk enq/dequeue (size: 32): 63.17 (46.74)
>>>
>>> On one of the Arm platform
>>> MP/MC bulk enq/dequeue (size: 32): 0.37 (0.33) (~12% hit, the rest are
>>> ok)

Tried this on a Power9 platform (3.6GHz), with two numa nodes and 16 
cores/node (SMT=4).  Applied all 3 patches in v5, test results are as 
follows:

RTE>>ring_perf_elem_autotest
### Testing single element and burst enq/deq ###
SP/SC single enq/dequeue: 42
MP/MC single enq/dequeue: 59
SP/SC burst enq/dequeue (size: 8): 5
MP/MC burst enq/dequeue (size: 8): 7
SP/SC burst enq/dequeue (size: 32): 2
MP/MC burst enq/dequeue (size: 32): 2

### Testing empty dequeue ###
SC empty dequeue: 7.81
MC empty dequeue: 7.81

### Testing using a single lcore ###
SP/SC bulk enq/dequeue (size: 8): 5.76
MP/MC bulk enq/dequeue (size: 8): 7.66
SP/SC bulk enq/dequeue (size: 32): 2.10
MP/MC bulk enq/dequeue (size: 32): 2.57

### Testing using two hyperthreads ###
SP/SC bulk enq/dequeue (size: 8): 13.13
MP/MC bulk enq/dequeue (size: 8): 13.98
SP/SC bulk enq/dequeue (size: 32): 3.41
MP/MC bulk enq/dequeue (size: 32): 4.45

### Testing using two physical cores ###
SP/SC bulk enq/dequeue (size: 8): 11.00
MP/MC bulk enq/dequeue (size: 8): 10.95
SP/SC bulk enq/dequeue (size: 32): 3.08
MP/MC bulk enq/dequeue (size: 32): 3.40

### Testing using two NUMA nodes ###
SP/SC bulk enq/dequeue (size: 8): 63.41
MP/MC bulk enq/dequeue (size: 8): 62.70
SP/SC bulk enq/dequeue (size: 32): 15.39
MP/MC bulk enq/dequeue (size: 32): 22.96

Dave
Honnappa Nagarahalli Oct. 18, 2019, 3:18 a.m. | #9
<snip>

> Subject: Re: [PATCH v4 1/2] lib/ring: apis to support configurable element
> size
> 
> >>> I tried this. On x86 (Xeon(R) Gold 6132 CPU @ 2.60GHz), the results
> >>> are as
> >> follows. The numbers in brackets are with the code on master.
> >>> gcc (Ubuntu 7.4.0-1ubuntu1~18.04.1) 7.4.0
> >>>
> >>> RTE>>ring_perf_elem_autotest
> >>> ### Testing single element and burst enq/deq ### SP/SC single
> >>> enq/dequeue: 5 MP/MC single enq/dequeue: 40 (35) SP/SC burst
> >>> enq/dequeue (size: 8): 2 MP/MC burst enq/dequeue (size: 8): 6 SP/SC
> >>> burst enq/dequeue (size: 32): 1 (2) MP/MC burst enq/dequeue (size:
> >>> 32): 2
> >>>
> >>> ### Testing empty dequeue ###
> >>> SC empty dequeue: 2.11
> >>> MC empty dequeue: 1.41 (2.11)
> >>>
> >>> ### Testing using a single lcore ### SP/SC bulk enq/dequeue (size:
> >>> 8): 2.15 (2.86) MP/MC bulk enq/dequeue
> >>> (size: 8): 6.35 (6.91) SP/SC bulk enq/dequeue (size: 32): 1.35
> >>> (2.06) MP/MC bulk enq/dequeue (size: 32): 2.38 (2.95)
> >>>
> >>> ### Testing using two physical cores ### SP/SC bulk enq/dequeue (size:
> >>> 8): 73.81 (15.33) MP/MC bulk enq/dequeue (size: 8): 75.10 (71.27)
> >>> SP/SC bulk enq/dequeue (size: 32): 21.14 (9.58) MP/MC bulk
> >>> enq/dequeue
> >>> (size: 32): 25.74 (20.91)
> >>>
> >>> ### Testing using two NUMA nodes ### SP/SC bulk enq/dequeue (size:
> >>> 8): 164.32 (50.66) MP/MC bulk enq/dequeue (size: 8): 176.02 (173.43)
> >>> SP/SC bulk enq/dequeue (size:
> >>> 32): 50.78 (23) MP/MC bulk enq/dequeue (size: 32): 63.17 (46.74)
> >>>
> >>> On one of the Arm platform
> >>> MP/MC bulk enq/dequeue (size: 32): 0.37 (0.33) (~12% hit, the rest
> >>> are
> >>> ok)
> 
> Tried this on a Power9 platform (3.6GHz), with two numa nodes and 16
> cores/node (SMT=4).  Applied all 3 patches in v5, test results are as
> follows:
> 
> RTE>>ring_perf_elem_autotest
> ### Testing single element and burst enq/deq ### SP/SC single enq/dequeue:
> 42 MP/MC single enq/dequeue: 59 SP/SC burst enq/dequeue (size: 8): 5
> MP/MC burst enq/dequeue (size: 8): 7 SP/SC burst enq/dequeue (size: 32): 2
> MP/MC burst enq/dequeue (size: 32): 2
> 
> ### Testing empty dequeue ###
> SC empty dequeue: 7.81
> MC empty dequeue: 7.81
> 
> ### Testing using a single lcore ###
> SP/SC bulk enq/dequeue (size: 8): 5.76
> MP/MC bulk enq/dequeue (size: 8): 7.66
> SP/SC bulk enq/dequeue (size: 32): 2.10
> MP/MC bulk enq/dequeue (size: 32): 2.57
> 
> ### Testing using two hyperthreads ###
> SP/SC bulk enq/dequeue (size: 8): 13.13
> MP/MC bulk enq/dequeue (size: 8): 13.98
> SP/SC bulk enq/dequeue (size: 32): 3.41
> MP/MC bulk enq/dequeue (size: 32): 4.45
> 
> ### Testing using two physical cores ### SP/SC bulk enq/dequeue (size: 8):
> 11.00 MP/MC bulk enq/dequeue (size: 8): 10.95 SP/SC bulk enq/dequeue
> (size: 32): 3.08 MP/MC bulk enq/dequeue (size: 32): 3.40
> 
> ### Testing using two NUMA nodes ###
> SP/SC bulk enq/dequeue (size: 8): 63.41
> MP/MC bulk enq/dequeue (size: 8): 62.70
> SP/SC bulk enq/dequeue (size: 32): 15.39 MP/MC bulk enq/dequeue (size:
> 32): 22.96
> 
Thanks for running this. There is another test 'ring_perf_autotest' which provides the numbers with the original implementation. The goal is to make sure the numbers with the original implementation are the same as these. Can you please run that as well?

> Dave

Patch

diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
index 21a36770d..515a967bb 100644
--- a/lib/librte_ring/Makefile
+++ b/lib/librte_ring/Makefile
@@ -6,7 +6,7 @@  include $(RTE_SDK)/mk/rte.vars.mk
 # library name
 LIB = librte_ring.a
 
-CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
+CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 -DALLOW_EXPERIMENTAL_API
 LDLIBS += -lrte_eal
 
 EXPORT_MAP := rte_ring_version.map
@@ -18,6 +18,7 @@  SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
 
 # install includes
 SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
+					rte_ring_elem.h \
 					rte_ring_generic.h \
 					rte_ring_c11_mem.h
 
diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build
index ab8b0b469..74219840a 100644
--- a/lib/librte_ring/meson.build
+++ b/lib/librte_ring/meson.build
@@ -6,3 +6,6 @@  sources = files('rte_ring.c')
 headers = files('rte_ring.h',
 		'rte_ring_c11_mem.h',
 		'rte_ring_generic.h')
+
+# rte_ring_create_elem and rte_ring_get_memsize_elem are experimental
+allow_experimental_apis = true
diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
index d9b308036..6fed3648b 100644
--- a/lib/librte_ring/rte_ring.c
+++ b/lib/librte_ring/rte_ring.c
@@ -33,6 +33,7 @@ 
 #include <rte_tailq.h>
 
 #include "rte_ring.h"
+#include "rte_ring_elem.h"
 
 TAILQ_HEAD(rte_ring_list, rte_tailq_entry);
 
@@ -46,23 +47,42 @@  EAL_REGISTER_TAILQ(rte_ring_tailq)
 
 /* return the size of memory occupied by a ring */
 ssize_t
-rte_ring_get_memsize(unsigned count)
+rte_ring_get_memsize_elem(unsigned count, unsigned esize)
 {
 	ssize_t sz;
 
+	/* Supported esize values are 4/8/16.
+	 * Others can be added on need basis.
+	 */
+	if ((esize != 4) && (esize != 8) && (esize != 16)) {
+		RTE_LOG(ERR, RING,
+			"Unsupported esize value. Supported values are 4, 8 and 16\n");
+
+		return -EINVAL;
+	}
+
 	/* count must be a power of 2 */
 	if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
 		RTE_LOG(ERR, RING,
-			"Requested size is invalid, must be power of 2, and "
-			"do not exceed the size limit %u\n", RTE_RING_SZ_MASK);
+			"Requested number of elements is invalid, must be "
+			"power of 2, and do not exceed the limit %u\n",
+			RTE_RING_SZ_MASK);
+
 		return -EINVAL;
 	}
 
-	sz = sizeof(struct rte_ring) + count * sizeof(void *);
+	sz = sizeof(struct rte_ring) + count * esize;
 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
 	return sz;
 }
 
+/* return the size of memory occupied by a ring */
+ssize_t
+rte_ring_get_memsize(unsigned count)
+{
+	return rte_ring_get_memsize_elem(count, sizeof(void *));
+}
+
 void
 rte_ring_reset(struct rte_ring *r)
 {
@@ -114,10 +134,10 @@  rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
 	return 0;
 }
 
-/* create the ring */
+/* create the ring for a given element size */
 struct rte_ring *
-rte_ring_create(const char *name, unsigned count, int socket_id,
-		unsigned flags)
+rte_ring_create_elem(const char *name, unsigned count, unsigned esize,
+		int socket_id, unsigned flags)
 {
 	char mz_name[RTE_MEMZONE_NAMESIZE];
 	struct rte_ring *r;
@@ -135,7 +155,7 @@  rte_ring_create(const char *name, unsigned count, int socket_id,
 	if (flags & RING_F_EXACT_SZ)
 		count = rte_align32pow2(count + 1);
 
-	ring_size = rte_ring_get_memsize(count);
+	ring_size = rte_ring_get_memsize_elem(count, esize);
 	if (ring_size < 0) {
 		rte_errno = ring_size;
 		return NULL;
@@ -182,6 +202,15 @@  rte_ring_create(const char *name, unsigned count, int socket_id,
 	return r;
 }
 
+/* create the ring */
+struct rte_ring *
+rte_ring_create(const char *name, unsigned count, int socket_id,
+		unsigned flags)
+{
+	return rte_ring_create_elem(name, count, sizeof(void *), socket_id,
+		flags);
+}
+
 /* free the ring */
 void
 rte_ring_free(struct rte_ring *r)
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 2a9f768a1..18fc5d845 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -216,6 +216,7 @@  int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
  */
 struct rte_ring *rte_ring_create(const char *name, unsigned count,
 				 int socket_id, unsigned flags);
+
 /**
  * De-allocate all memory used by the ring.
  *
diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h
new file mode 100644
index 000000000..860f059ad
--- /dev/null
+++ b/lib/librte_ring/rte_ring_elem.h
@@ -0,0 +1,946 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2019 Arm Limited
+ * Copyright (c) 2010-2017 Intel Corporation
+ * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
+ * All rights reserved.
+ * Derived from FreeBSD's bufring.h
+ * Used as BSD-3 Licensed with permission from Kip Macy.
+ */
+
+#ifndef _RTE_RING_ELEM_H_
+#define _RTE_RING_ELEM_H_
+
+/**
+ * @file
+ * RTE Ring with flexible element size
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdio.h>
+#include <stdint.h>
+#include <sys/queue.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_memzone.h>
+#include <rte_pause.h>
+
+#include "rte_ring.h"
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Calculate the memory size needed for a ring with given element size
+ *
+ * This function returns the number of bytes needed for a ring, given
+ * the number of elements in it and the size of the element. This value
+ * is the sum of the size of the structure rte_ring and the size of the
+ * memory needed for storing the elements. The value is aligned to a cache
+ * line size.
+ *
+ * @param count
+ *   The number of elements in the ring (must be a power of 2).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported.
+ * @return
+ *   - The memory size needed for the ring on success.
+ *   - -EINVAL if count is not a power of 2.
+ */
+__rte_experimental
+ssize_t rte_ring_get_memsize_elem(unsigned count, unsigned esize);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Create a new ring named *name* that stores elements with given size.
+ *
+ * This function uses ``memzone_reserve()`` to allocate memory. Then it
+ * calls rte_ring_init() to initialize an empty ring.
+ *
+ * The new ring size is set to *count*, which must be a power of
+ * two. Water marking is disabled by default. The real usable ring size
+ * is *count-1* instead of *count* to differentiate a free ring from an
+ * empty ring.
+ *
+ * The ring is added in RTE_TAILQ_RING list.
+ *
+ * @param name
+ *   The name of the ring.
+ * @param count
+ *   The number of elements in the ring (must be a power of 2).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported.
+ * @param socket_id
+ *   The *socket_id* argument is the socket identifier in case of
+ *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
+ *   constraint for the reserved zone.
+ * @param flags
+ *   An OR of the following:
+ *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
+ *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
+ *      is "single-producer". Otherwise, it is "multi-producers".
+ *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
+ *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
+ *      is "single-consumer". Otherwise, it is "multi-consumers".
+ * @return
+ *   On success, the pointer to the new allocated ring. NULL on error with
+ *    rte_errno set appropriately. Possible errno values include:
+ *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
+ *    - E_RTE_SECONDARY - function was called from a secondary process instance
+ *    - EINVAL - count provided is not a power of 2
+ *    - ENOSPC - the maximum number of memzones has already been allocated
+ *    - EEXIST - a memzone with the same name already exists
+ *    - ENOMEM - no appropriate memory area found in which to create memzone
+ */
+__rte_experimental
+struct rte_ring *rte_ring_create_elem(const char *name, unsigned count,
+				unsigned esize, int socket_id, unsigned flags);
+
+/* the actual enqueue of pointers on the ring.
+ * Placed here since identical code needed in both
+ * single and multi producer enqueue functions.
+ */
+#define ENQUEUE_PTRS_ELEM(r, ring_start, prod_head, obj_table, esize, n) do { \
+	if (esize == 4) \
+		ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n); \
+	else if (esize == 8) \
+		ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n); \
+	else if (esize == 16) \
+		ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n); \
+} while (0)
+
+#define ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n) do { \
+	unsigned int i; \
+	const uint32_t size = (r)->size; \
+	uint32_t idx = prod_head & (r)->mask; \
+	uint32_t *ring = (uint32_t *)ring_start; \
+	uint32_t *obj = (uint32_t *)obj_table; \
+	if (likely(idx + n < size)) { \
+		for (i = 0; i < (n & ((~(unsigned)0x7))); i += 8, idx += 8) { \
+			ring[idx] = obj[i]; \
+			ring[idx + 1] = obj[i + 1]; \
+			ring[idx + 2] = obj[i + 2]; \
+			ring[idx + 3] = obj[i + 3]; \
+			ring[idx + 4] = obj[i + 4]; \
+			ring[idx + 5] = obj[i + 5]; \
+			ring[idx + 6] = obj[i + 6]; \
+			ring[idx + 7] = obj[i + 7]; \
+		} \
+		switch (n & 0x7) { \
+		case 7: \
+			ring[idx++] = obj[i++]; /* fallthrough */ \
+		case 6: \
+			ring[idx++] = obj[i++]; /* fallthrough */ \
+		case 5: \
+			ring[idx++] = obj[i++]; /* fallthrough */ \
+		case 4: \
+			ring[idx++] = obj[i++]; /* fallthrough */ \
+		case 3: \
+			ring[idx++] = obj[i++]; /* fallthrough */ \
+		case 2: \
+			ring[idx++] = obj[i++]; /* fallthrough */ \
+		case 1: \
+			ring[idx++] = obj[i++]; /* fallthrough */ \
+		} \
+	} else { \
+		for (i = 0; idx < size; i++, idx++)\
+			ring[idx] = obj[i]; \
+		for (idx = 0; i < n; i++, idx++) \
+			ring[idx] = obj[i]; \
+	} \
+} while (0)
+
+#define ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n) do { \
+	unsigned int i; \
+	const uint32_t size = (r)->size; \
+	uint32_t idx = prod_head & (r)->mask; \
+	uint64_t *ring = (uint64_t *)ring_start; \
+	uint64_t *obj = (uint64_t *)obj_table; \
+	if (likely(idx + n < size)) { \
+		for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4) { \
+			ring[idx] = obj[i]; \
+			ring[idx + 1] = obj[i + 1]; \
+			ring[idx + 2] = obj[i + 2]; \
+			ring[idx + 3] = obj[i + 3]; \
+		} \
+		switch (n & 0x3) { \
+		case 3: \
+			ring[idx++] = obj[i++]; /* fallthrough */ \
+		case 2: \
+			ring[idx++] = obj[i++]; /* fallthrough */ \
+		case 1: \
+			ring[idx++] = obj[i++]; \
+		} \
+	} else { \
+		for (i = 0; idx < size; i++, idx++)\
+			ring[idx] = obj[i]; \
+		for (idx = 0; i < n; i++, idx++) \
+			ring[idx] = obj[i]; \
+	} \
+} while (0)
+
+#define ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n) do { \
+	unsigned int i; \
+	const uint32_t size = (r)->size; \
+	uint32_t idx = prod_head & (r)->mask; \
+	__uint128_t *ring = (__uint128_t *)ring_start; \
+	__uint128_t *obj = (__uint128_t *)obj_table; \
+	if (likely(idx + n < size)) { \
+		for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
+			ring[idx] = obj[i]; \
+			ring[idx + 1] = obj[i + 1]; \
+		} \
+		switch (n & 0x1) { \
+		case 1: \
+			ring[idx++] = obj[i++]; \
+		} \
+	} else { \
+		for (i = 0; idx < size; i++, idx++)\
+			ring[idx] = obj[i]; \
+		for (idx = 0; i < n; i++, idx++) \
+			ring[idx] = obj[i]; \
+	} \
+} while (0)
+
+/* the actual copy of pointers on the ring to obj_table.
+ * Placed here since identical code needed in both
+ * single and multi consumer dequeue functions.
+ */
+#define DEQUEUE_PTRS_ELEM(r, ring_start, cons_head, obj_table, esize, n) do { \
+	if (esize == 4) \
+		DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n); \
+	else if (esize == 8) \
+		DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n); \
+	else if (esize == 16) \
+		DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n); \
+} while (0)
+
+#define DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n) do { \
+	unsigned int i; \
+	uint32_t idx = cons_head & (r)->mask; \
+	const uint32_t size = (r)->size; \
+	uint32_t *ring = (uint32_t *)ring_start; \
+	uint32_t *obj = (uint32_t *)obj_table; \
+	if (likely(idx + n < size)) { \
+		for (i = 0; i < (n & (~(unsigned)0x7)); i += 8, idx += 8) {\
+			obj[i] = ring[idx]; \
+			obj[i + 1] = ring[idx + 1]; \
+			obj[i + 2] = ring[idx + 2]; \
+			obj[i + 3] = ring[idx + 3]; \
+			obj[i + 4] = ring[idx + 4]; \
+			obj[i + 5] = ring[idx + 5]; \
+			obj[i + 6] = ring[idx + 6]; \
+			obj[i + 7] = ring[idx + 7]; \
+		} \
+		switch (n & 0x7) { \
+		case 7: \
+			obj[i++] = ring[idx++]; /* fallthrough */ \
+		case 6: \
+			obj[i++] = ring[idx++]; /* fallthrough */ \
+		case 5: \
+			obj[i++] = ring[idx++]; /* fallthrough */ \
+		case 4: \
+			obj[i++] = ring[idx++]; /* fallthrough */ \
+		case 3: \
+			obj[i++] = ring[idx++]; /* fallthrough */ \
+		case 2: \
+			obj[i++] = ring[idx++]; /* fallthrough */ \
+		case 1: \
+			obj[i++] = ring[idx++]; /* fallthrough */ \
+		} \
+	} else { \
+		for (i = 0; idx < size; i++, idx++) \
+			obj[i] = ring[idx]; \
+		for (idx = 0; i < n; i++, idx++) \
+			obj[i] = ring[idx]; \
+	} \
+} while (0)
+
+#define DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n) do { \
+	unsigned int i; \
+	uint32_t idx = cons_head & (r)->mask; \
+	const uint32_t size = (r)->size; \
+	uint64_t *ring = (uint64_t *)ring_start; \
+	uint64_t *obj = (uint64_t *)obj_table; \
+	if (likely(idx + n < size)) { \
+		for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4) {\
+			obj[i] = ring[idx]; \
+			obj[i + 1] = ring[idx + 1]; \
+			obj[i + 2] = ring[idx + 2]; \
+			obj[i + 3] = ring[idx + 3]; \
+		} \
+		switch (n & 0x3) { \
+		case 3: \
+			obj[i++] = ring[idx++]; /* fallthrough */ \
+		case 2: \
+			obj[i++] = ring[idx++]; /* fallthrough */ \
+		case 1: \
+			obj[i++] = ring[idx++]; \
+		} \
+	} else { \
+		for (i = 0; idx < size; i++, idx++) \
+			obj[i] = ring[idx]; \
+		for (idx = 0; i < n; i++, idx++) \
+			obj[i] = ring[idx]; \
+	} \
+} while (0)
+
+#define DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n) do { \
+	unsigned int i; \
+	uint32_t idx = cons_head & (r)->mask; \
+	const uint32_t size = (r)->size; \
+	__uint128_t *ring = (__uint128_t *)ring_start; \
+	__uint128_t *obj = (__uint128_t *)obj_table; \
+	if (likely(idx + n < size)) { \
+		for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
+			obj[i] = ring[idx]; \
+			obj[i + 1] = ring[idx + 1]; \
+		} \
+		switch (n & 0x1) { \
+		case 1: \
+			obj[i++] = ring[idx++]; /* fallthrough */ \
+		} \
+	} else { \
+		for (i = 0; idx < size; i++, idx++) \
+			obj[i] = ring[idx]; \
+		for (idx = 0; i < n; i++, idx++) \
+			obj[i] = ring[idx]; \
+	} \
+} while (0)
+
+/* Between load and load. there might be cpu reorder in weak model
+ * (powerpc/arm).
+ * There are 2 choices for the users
+ * 1.use rmb() memory barrier
+ * 2.use one-direction load_acquire/store_release barrier,defined by
+ * CONFIG_RTE_USE_C11_MEM_MODEL=y
+ * It depends on performance test results.
+ * By default, move common functions to rte_ring_generic.h
+ */
+#ifdef RTE_USE_C11_MEM_MODEL
+#include "rte_ring_c11_mem.h"
+#else
+#include "rte_ring_generic.h"
+#endif
+
+/**
+ * @internal Enqueue several objects on the ring
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported. This should be the same
+ *   as passed while creating the ring, otherwise the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param is_sp
+ *   Indicates whether to use single producer or multi-producer head update
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_enqueue_elem(struct rte_ring *r, void * const obj_table,
+		unsigned int esize, unsigned int n,
+		enum rte_ring_queue_behavior behavior, unsigned int is_sp,
+		unsigned int *free_space)
+{
+	uint32_t prod_head, prod_next;
+	uint32_t free_entries;
+
+	n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
+			&prod_head, &prod_next, &free_entries);
+	if (n == 0)
+		goto end;
+
+	ENQUEUE_PTRS_ELEM(r, &r[1], prod_head, obj_table, esize, n);
+
+	update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
+end:
+	if (free_space != NULL)
+		*free_space = free_entries - n;
+	return n;
+}
+
+/**
+ * @internal Dequeue several objects from the ring
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported. This should be the same
+ *   as passed while creating the ring, otherwise the results are undefined.
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param is_sc
+ *   Indicates whether to use single consumer or multi-consumer head update
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table,
+		unsigned int esize, unsigned int n,
+		enum rte_ring_queue_behavior behavior, unsigned int is_sc,
+		unsigned int *available)
+{
+	uint32_t cons_head, cons_next;
+	uint32_t entries;
+
+	n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
+			&cons_head, &cons_next, &entries);
+	if (n == 0)
+		goto end;
+
+	DEQUEUE_PTRS_ELEM(r, &r[1], cons_head, obj_table, esize, n);
+
+	update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
+
+end:
+	if (available != NULL)
+		*available = entries - n;
+	return n;
+}
+
+/**
+ * Enqueue several objects on the ring (multi-producers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * producer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported. This should be the same
+ *   as passed while creating the ring, otherwise the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, void * const obj_table,
+		unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_FIXED, __IS_MP, free_space);
+}
+
+/**
+ * Enqueue several objects on a ring (NOT multi-producers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported. This should be the same
+ *   as passed while creating the ring, otherwise the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, void * const obj_table,
+		unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_FIXED, __IS_SP, free_space);
+}
+
+/**
+ * Enqueue several objects on a ring.
+ *
+ * This function calls the multi-producer or the single-producer
+ * version depending on the default behavior that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported. This should be the same
+ *   as passed while creating the ring, otherwise the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_enqueue_bulk_elem(struct rte_ring *r, void * const obj_table,
+		unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_FIXED, r->prod.single, free_space);
+}
+
+/**
+ * Enqueue one object on a ring (multi-producers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * producer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj
+ *   A pointer to the object to be added.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported. This should be the same
+ *   as passed while creating the ring, otherwise the results are undefined.
+ * @return
+ *   - 0: Success; objects enqueued.
+ *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ */
+static __rte_always_inline int
+rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
+{
+	return rte_ring_mp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
+								-ENOBUFS;
+}
+
+/**
+ * Enqueue one object on a ring (NOT multi-producers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj
+ *   A pointer to the object to be added.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported. This should be the same
+ *   as passed while creating the ring, otherwise the results are undefined.
+ * @return
+ *   - 0: Success; objects enqueued.
+ *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ */
+static __rte_always_inline int
+rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
+{
+	return rte_ring_sp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
+								-ENOBUFS;
+}
+
+/**
+ * Enqueue one object on a ring.
+ *
+ * This function calls the multi-producer or the single-producer
+ * version, depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj
+ *   A pointer to the object to be added.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported. This should be the same
+ *   as passed while creating the ring, otherwise the results are undefined.
+ * @return
+ *   - 0: Success; objects enqueued.
+ *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ */
+static __rte_always_inline int
+rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
+{
+	return rte_ring_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
+								-ENOBUFS;
+}
+
+/**
+ * Dequeue several objects from a ring (multi-consumers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * consumer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported. This should be the same
+ *   as passed while creating the ring, otherwise the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+				RTE_RING_QUEUE_FIXED, __IS_MC, available);
+}
+
+/**
+ * Dequeue several objects from a ring (NOT multi-consumers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported. This should be the same
+ *   as passed while creating the ring, otherwise the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table,
+ *   must be strictly positive.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_FIXED, __IS_SC, available);
+}
+
+/**
+ * Dequeue several objects from a ring.
+ *
+ * This function calls the multi-consumers or the single-consumer
+ * version, depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported. This should be the same
+ *   as passed while creating the ring, otherwise the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_FIXED, r->cons.single, available);
+}
+
+/**
+ * Dequeue one object from a ring (multi-consumers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * consumer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_p
+ *   A pointer to a void * pointer (object) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported. This should be the same
+ *   as passed while creating the ring, otherwise the results are undefined.
+ * @return
+ *   - 0: Success; objects dequeued.
+ *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
+ *     dequeued.
+ */
+static __rte_always_inline int
+rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p,
+				unsigned int esize)
+{
+	return rte_ring_mc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL)  ? 0 :
+								-ENOENT;
+}
+
+/**
+ * Dequeue one object from a ring (NOT multi-consumers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_p
+ *   A pointer to a void * pointer (object) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported. This should be the same
+ *   as passed while creating the ring, otherwise the results are undefined.
+ * @return
+ *   - 0: Success; objects dequeued.
+ *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
+ *     dequeued.
+ */
+static __rte_always_inline int
+rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p,
+				unsigned int esize)
+{
+	return rte_ring_sc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
+								-ENOENT;
+}
+
+/**
+ * Dequeue one object from a ring.
+ *
+ * This function calls the multi-consumers or the single-consumer
+ * version depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_p
+ *   A pointer to a void * pointer (object) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported. This should be the same
+ *   as passed while creating the ring, otherwise the results are undefined.
+ * @return
+ *   - 0: Success, objects dequeued.
+ *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
+ *     dequeued.
+ */
+static __rte_always_inline int
+rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize)
+{
+	return rte_ring_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
+								-ENOENT;
+}
+
+/**
+ * Enqueue several objects on the ring (multi-producers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * producer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported. This should be the same
+ *   as passed while creating the ring, otherwise the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+static __rte_always_inline unsigned
+rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, void * const obj_table,
+		unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
+}
+
+/**
+ * Enqueue several objects on a ring (NOT multi-producers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported. This should be the same
+ *   as passed while creating the ring, otherwise the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+static __rte_always_inline unsigned
+rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, void * const obj_table,
+		unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
+}
+
+/**
+ * Enqueue several objects on a ring.
+ *
+ * This function calls the multi-producer or the single-producer
+ * version depending on the default behavior that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported. This should be the same
+ *   as passed while creating the ring, otherwise the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+static __rte_always_inline unsigned
+rte_ring_enqueue_burst_elem(struct rte_ring *r, void * const obj_table,
+		unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_VARIABLE, r->prod.single, free_space);
+}
+
+/**
+ * Dequeue several objects from a ring (multi-consumers safe). When the request
+ * objects are more than the available objects, only dequeue the actual number
+ * of objects
+ *
+ * This function uses a "compare and set" instruction to move the
+ * consumer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported. This should be the same
+ *   as passed while creating the ring, otherwise the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   - n: Actual number of objects dequeued, 0 if ring is empty
+ */
+static __rte_always_inline unsigned
+rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
+}
+
+/**
+ * Dequeue several objects from a ring (NOT multi-consumers safe).When the
+ * request objects are more than the available objects, only dequeue the
+ * actual number of objects
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported. This should be the same
+ *   as passed while creating the ring, otherwise the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   - n: Actual number of objects dequeued, 0 if ring is empty
+ */
+static __rte_always_inline unsigned
+rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
+}
+
+/**
+ * Dequeue multiple objects from a ring up to a maximum number.
+ *
+ * This function calls the multi-consumers or the single-consumer
+ * version, depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   Currently, sizes 4, 8 and 16 are supported. This should be the same
+ *   as passed while creating the ring, otherwise the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   - Number of objects dequeued
+ */
+static __rte_always_inline unsigned
+rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+				RTE_RING_QUEUE_VARIABLE,
+				r->cons.single, available);
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_RING_ELEM_H_ */
diff --git a/lib/librte_ring/rte_ring_version.map b/lib/librte_ring/rte_ring_version.map
index 510c1386e..e410a7503 100644
--- a/lib/librte_ring/rte_ring_version.map
+++ b/lib/librte_ring/rte_ring_version.map
@@ -21,6 +21,8 @@  DPDK_2.2 {
 EXPERIMENTAL {
 	global:
 
+	rte_ring_create_elem;
+	rte_ring_get_memsize_elem;
 	rte_ring_reset;
 
 };