[v7,02/17] lib/ring: apis to support configurable element size

Message ID 20191220044524.32910-3-honnappa.nagarahalli@arm.com (mailing list archive)
State Superseded, archived
Delegated to: David Marchand
Headers
Series lib/ring: APIs to support custom element size |

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/Intel-compilation fail Compilation issues

Commit Message

Honnappa Nagarahalli Dec. 20, 2019, 4:45 a.m. UTC
  Current APIs assume ring elements to be pointers. However, in many
use cases, the size can be different. Add new APIs to support
configurable ring element sizes.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Reviewed-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
Reviewed-by: Gavin Hu <gavin.hu@arm.com>
Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
---
 lib/librte_ring/Makefile             |    3 +-
 lib/librte_ring/meson.build          |    4 +
 lib/librte_ring/rte_ring.c           |   41 +-
 lib/librte_ring/rte_ring.h           |    1 +
 lib/librte_ring/rte_ring_elem.h      | 1002 ++++++++++++++++++++++++++
 lib/librte_ring/rte_ring_version.map |    2 +
 6 files changed, 1044 insertions(+), 9 deletions(-)
 create mode 100644 lib/librte_ring/rte_ring_elem.h
  

Comments

Ananyev, Konstantin Jan. 2, 2020, 4:42 p.m. UTC | #1
> diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h
> new file mode 100644
> index 000000000..fc7fe127c
> --- /dev/null
> +++ b/lib/librte_ring/rte_ring_elem.h
> @@ -0,0 +1,1002 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2019 Arm Limited
> + * Copyright (c) 2010-2017 Intel Corporation
> + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
> + * All rights reserved.
> + * Derived from FreeBSD's bufring.h
> + * Used as BSD-3 Licensed with permission from Kip Macy.
> + */
> +
> +#ifndef _RTE_RING_ELEM_H_
> +#define _RTE_RING_ELEM_H_
> +
> +/**
> + * @file
> + * RTE Ring with user defined element size
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdio.h>
> +#include <stdint.h>
> +#include <sys/queue.h>
> +#include <errno.h>
> +#include <rte_common.h>
> +#include <rte_config.h>
> +#include <rte_memory.h>
> +#include <rte_lcore.h>
> +#include <rte_atomic.h>
> +#include <rte_branch_prediction.h>
> +#include <rte_memzone.h>
> +#include <rte_pause.h>
> +
> +#include "rte_ring.h"
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Calculate the memory size needed for a ring with given element size
> + *
> + * This function returns the number of bytes needed for a ring, given
> + * the number of elements in it and the size of the element. This value
> + * is the sum of the size of the structure rte_ring and the size of the
> + * memory needed for storing the elements. The value is aligned to a cache
> + * line size.
> + *
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + * @param count
> + *   The number of elements in the ring (must be a power of 2).
> + * @return
> + *   - The memory size needed for the ring on success.
> + *   - -EINVAL - esize is not a multiple of 4 or count provided is not a
> + *		 power of 2.
> + */
> +__rte_experimental
> +ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Create a new ring named *name* that stores elements with given size.
> + *
> + * This function uses ``memzone_reserve()`` to allocate memory. Then it
> + * calls rte_ring_init() to initialize an empty ring.
> + *
> + * The new ring size is set to *count*, which must be a power of
> + * two. Water marking is disabled by default. The real usable ring size
> + * is *count-1* instead of *count* to differentiate a free ring from an
> + * empty ring.
> + *
> + * The ring is added in RTE_TAILQ_RING list.
> + *
> + * @param name
> + *   The name of the ring.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + * @param count
> + *   The number of elements in the ring (must be a power of 2).
> + * @param socket_id
> + *   The *socket_id* argument is the socket identifier in case of
> + *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
> + *   constraint for the reserved zone.
> + * @param flags
> + *   An OR of the following:
> + *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
> + *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
> + *      is "single-producer". Otherwise, it is "multi-producers".
> + *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
> + *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
> + *      is "single-consumer". Otherwise, it is "multi-consumers".
> + * @return
> + *   On success, the pointer to the new allocated ring. NULL on error with
> + *    rte_errno set appropriately. Possible errno values include:
> + *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
> + *    - E_RTE_SECONDARY - function was called from a secondary process instance
> + *    - EINVAL - esize is not a multiple of 4 or count provided is not a
> + *		 power of 2.
> + *    - ENOSPC - the maximum number of memzones has already been allocated
> + *    - EEXIST - a memzone with the same name already exists
> + *    - ENOMEM - no appropriate memory area found in which to create memzone
> + */
> +__rte_experimental
> +struct rte_ring *rte_ring_create_elem(const char *name, unsigned int esize,
> +			unsigned int count, int socket_id, unsigned int flags);
> +
> +static __rte_always_inline void
> +enqueue_elems_32(struct rte_ring *r, uint32_t idx,
> +		const void *obj_table, uint32_t n)
> +{
> +	unsigned int i;
> +	const uint32_t size = r->size;
> +	uint32_t *ring = (uint32_t *)&r[1];
> +	const uint32_t *obj = (const uint32_t *)obj_table;
> +	if (likely(idx + n < size)) {
> +		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
> +			ring[idx] = obj[i];
> +			ring[idx + 1] = obj[i + 1];
> +			ring[idx + 2] = obj[i + 2];
> +			ring[idx + 3] = obj[i + 3];
> +			ring[idx + 4] = obj[i + 4];
> +			ring[idx + 5] = obj[i + 5];
> +			ring[idx + 6] = obj[i + 6];
> +			ring[idx + 7] = obj[i + 7];
> +		}
> +		switch (n & 0x7) {
> +		case 7:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 6:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 5:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 4:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 3:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 2:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 1:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		}
> +	} else {
> +		for (i = 0; idx < size; i++, idx++)
> +			ring[idx] = obj[i];
> +		/* Start at the beginning */
> +		for (idx = 0; i < n; i++, idx++)
> +			ring[idx] = obj[i];
> +	}
> +}
> +
> +static __rte_always_inline void
> +enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
> +		const void *obj_table, uint32_t n)
> +{
> +	unsigned int i;
> +	const uint32_t size = r->size;
> +	uint32_t idx = prod_head & r->mask;
> +	uint64_t *ring = (uint64_t *)&r[1];
> +	const uint64_t *obj = (const uint64_t *)obj_table;
> +	if (likely(idx + n < size)) {
> +		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
> +			ring[idx] = obj[i];
> +			ring[idx + 1] = obj[i + 1];
> +			ring[idx + 2] = obj[i + 2];
> +			ring[idx + 3] = obj[i + 3];
> +		}
> +		switch (n & 0x3) {
> +		case 3:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 2:
> +			ring[idx++] = obj[i++]; /* fallthrough */
> +		case 1:
> +			ring[idx++] = obj[i++];
> +		}
> +	} else {
> +		for (i = 0; idx < size; i++, idx++)
> +			ring[idx] = obj[i];
> +		/* Start at the beginning */
> +		for (idx = 0; i < n; i++, idx++)
> +			ring[idx] = obj[i];
> +	}
> +}
> +
> +static __rte_always_inline void
> +enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
> +		const void *obj_table, uint32_t n)
> +{
> +	unsigned int i;
> +	const uint32_t size = r->size;
> +	uint32_t idx = prod_head & r->mask;
> +	__uint128_t *ring = (__uint128_t *)&r[1];
> +	const __uint128_t *obj = (const __uint128_t *)obj_table;
> +	if (likely(idx + n < size)) {
> +		for (i = 0; i < (n & ~0x1); i += 2, idx += 2) {
> +			ring[idx] = obj[i];
> +			ring[idx + 1] = obj[i + 1];


AFAIK, that implies 16B aligned obj_table...
Would it always be the case?  

> +		}
> +		switch (n & 0x1) {
> +		case 1:
> +			ring[idx++] = obj[i++];
> +		}
> +	} else {
> +		for (i = 0; idx < size; i++, idx++)
> +			ring[idx] = obj[i];
> +		/* Start at the beginning */
> +		for (idx = 0; i < n; i++, idx++)
> +			ring[idx] = obj[i];
> +	}
> +}
> +
> +/* the actual enqueue of elements on the ring.
> + * Placed here since identical code needed in both
> + * single and multi producer enqueue functions.
> + */
> +static __rte_always_inline void
> +enqueue_elems(struct rte_ring *r, uint32_t prod_head, const void *obj_table,
> +		uint32_t esize, uint32_t num)
> +{
> +	uint32_t idx, nr_idx, nr_num;
> +
> +	/* 8B and 16B copies implemented individually to retain
> +	 * the current performance.
> +	 */
> +	if (esize == 8)
> +		enqueue_elems_64(r, prod_head, obj_table, num);
> +	else if (esize == 16)
> +		enqueue_elems_128(r, prod_head, obj_table, num);
> +	else {
> +		/* Normalize to uint32_t */
> +		uint32_t scale = esize / sizeof(uint32_t);
> +		nr_num = num * scale;
> +		idx = prod_head & r->mask;
> +		nr_idx = idx * scale;
> +		enqueue_elems_32(r, nr_idx, obj_table, nr_num);
> +	}
> +}
> +
  
Honnappa Nagarahalli Jan. 7, 2020, 5:35 a.m. UTC | #2
<snip>

> > diff --git a/lib/librte_ring/rte_ring_elem.h
> > b/lib/librte_ring/rte_ring_elem.h new file mode 100644 index
> > 000000000..fc7fe127c
> > --- /dev/null
> > +++ b/lib/librte_ring/rte_ring_elem.h
> > @@ -0,0 +1,1002 @@

<snip>

> > +
> > +static __rte_always_inline void
> > +enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
> > +		const void *obj_table, uint32_t n)
> > +{
> > +	unsigned int i;
> > +	const uint32_t size = r->size;
> > +	uint32_t idx = prod_head & r->mask;
> > +	__uint128_t *ring = (__uint128_t *)&r[1];
> > +	const __uint128_t *obj = (const __uint128_t *)obj_table;
> > +	if (likely(idx + n < size)) {
> > +		for (i = 0; i < (n & ~0x1); i += 2, idx += 2) {
> > +			ring[idx] = obj[i];
> > +			ring[idx + 1] = obj[i + 1];
> 
> 
> AFAIK, that implies 16B aligned obj_table...
> Would it always be the case?
I am not sure from the compiler perspective.
At least on Arm architecture, unaligned access (address that is accessed is not aligned to the size of the data element being accessed) will result in faults or require additional cycles. So, aligning on 16B should be fine.

> 
> > +		}
> > +		switch (n & 0x1) {
> > +		case 1:
> > +			ring[idx++] = obj[i++];
> > +		}
> > +	} else {
> > +		for (i = 0; idx < size; i++, idx++)
> > +			ring[idx] = obj[i];
> > +		/* Start at the beginning */
> > +		for (idx = 0; i < n; i++, idx++)
> > +			ring[idx] = obj[i];
> > +	}
> > +}
> > +
> > +/* the actual enqueue of elements on the ring.
> > + * Placed here since identical code needed in both
> > + * single and multi producer enqueue functions.
> > + */
> > +static __rte_always_inline void
> > +enqueue_elems(struct rte_ring *r, uint32_t prod_head, const void
> *obj_table,
> > +		uint32_t esize, uint32_t num)
> > +{
> > +	uint32_t idx, nr_idx, nr_num;
> > +
> > +	/* 8B and 16B copies implemented individually to retain
> > +	 * the current performance.
> > +	 */
> > +	if (esize == 8)
> > +		enqueue_elems_64(r, prod_head, obj_table, num);
> > +	else if (esize == 16)
> > +		enqueue_elems_128(r, prod_head, obj_table, num);
> > +	else {
> > +		/* Normalize to uint32_t */
> > +		uint32_t scale = esize / sizeof(uint32_t);
> > +		nr_num = num * scale;
> > +		idx = prod_head & r->mask;
> > +		nr_idx = idx * scale;
> > +		enqueue_elems_32(r, nr_idx, obj_table, nr_num);
> > +	}
> > +}
> > +
  
Honnappa Nagarahalli Jan. 7, 2020, 6 a.m. UTC | #3
<snip>
> > > +
> > > +static __rte_always_inline void
> > > +enqueue_elems_128(struct rte_ring *r, uint32_t prod_head, const
> > > +void *obj_table, uint32_t n) { unsigned int i; const uint32_t size
> > > += r->size; uint32_t idx = prod_head & r->mask; __uint128_t *ring =
> > > +(__uint128_t *)&r[1]; const __uint128_t *obj = (const __uint128_t
> > > +*)obj_table; if (likely(idx + n < size)) { for (i = 0; i < (n &
> > > +~0x1); i += 2, idx += 2) { ring[idx] = obj[i]; ring[idx + 1] =
> > > +obj[i + 1];
> >
> >
> > AFAIK, that implies 16B aligned obj_table...
> > Would it always be the case?
> I am not sure from the compiler perspective.
> At least on Arm architecture, unaligned access (address that is accessed is not
> aligned to the size of the data element being accessed) will result in faults or
> require additional cycles. So, aligning on 16B should be fine.
Further, I would be changing this to use 'rte_int128_t' as '__uint128_t' is not defined on 32b systems.

> 
> >
> > > +}
> > > +switch (n & 0x1) {
> > > +case 1:
> > > +ring[idx++] = obj[i++];
> > > +}
> > > +} else {
> > > +for (i = 0; idx < size; i++, idx++) ring[idx] = obj[i];
> > > +/* Start at the beginning */
> > > +for (idx = 0; i < n; i++, idx++)
> > > +ring[idx] = obj[i];
> > > +}
> > > +}
> > > +
> > > +/* the actual enqueue of elements on the ring.
> > > + * Placed here since identical code needed in both
> > > + * single and multi producer enqueue functions.
> > > + */
> > > +static __rte_always_inline void
> > > +enqueue_elems(struct rte_ring *r, uint32_t prod_head, const void
> > *obj_table,
> > > +uint32_t esize, uint32_t num)
> > > +{
> > > +uint32_t idx, nr_idx, nr_num;
> > > +
> > > +/* 8B and 16B copies implemented individually to retain
> > > + * the current performance.
> > > + */
> > > +if (esize == 8)
> > > +enqueue_elems_64(r, prod_head, obj_table, num); else if (esize ==
> > > +16) enqueue_elems_128(r, prod_head, obj_table, num); else {
> > > +/* Normalize to uint32_t */
> > > +uint32_t scale = esize / sizeof(uint32_t); nr_num = num * scale;
> > > +idx = prod_head & r->mask; nr_idx = idx * scale;
> > > +enqueue_elems_32(r, nr_idx, obj_table, nr_num); } }
> > > +
  
Ananyev, Konstantin Jan. 7, 2020, 10:21 a.m. UTC | #4
> <snip>
> > > > +
> > > > +static __rte_always_inline void
> > > > +enqueue_elems_128(struct rte_ring *r, uint32_t prod_head, const
> > > > +void *obj_table, uint32_t n) { unsigned int i; const uint32_t size
> > > > += r->size; uint32_t idx = prod_head & r->mask; __uint128_t *ring =
> > > > +(__uint128_t *)&r[1]; const __uint128_t *obj = (const __uint128_t
> > > > +*)obj_table; if (likely(idx + n < size)) { for (i = 0; i < (n &
> > > > +~0x1); i += 2, idx += 2) { ring[idx] = obj[i]; ring[idx + 1] =
> > > > +obj[i + 1];
> > >
> > >
> > > AFAIK, that implies 16B aligned obj_table...
> > > Would it always be the case?
> > I am not sure from the compiler perspective.
> > At least on Arm architecture, unaligned access (address that is accessed is not
> > aligned to the size of the data element being accessed) will result in faults or
> > require additional cycles. So, aligning on 16B should be fine.
> Further, I would be changing this to use 'rte_int128_t' as '__uint128_t' is not defined on 32b systems.

What I am trying to say: with this code we imply new requirement for
elems in the ring: when sizeof(elem)==16 it's alignment also has to be at least 16.
Which from my perspective is not ideal.
Note that for elem sizes > 16 (24, 32), there is no such constraint.

> 
> >
> > >
> > > > +}
> > > > +switch (n & 0x1) {
> > > > +case 1:
> > > > +ring[idx++] = obj[i++];
> > > > +}
> > > > +} else {
> > > > +for (i = 0; idx < size; i++, idx++) ring[idx] = obj[i];
> > > > +/* Start at the beginning */
> > > > +for (idx = 0; i < n; i++, idx++)
> > > > +ring[idx] = obj[i];
> > > > +}
> > > > +}
> > > > +
> > > > +/* the actual enqueue of elements on the ring.
> > > > + * Placed here since identical code needed in both
> > > > + * single and multi producer enqueue functions.
> > > > + */
> > > > +static __rte_always_inline void
> > > > +enqueue_elems(struct rte_ring *r, uint32_t prod_head, const void
> > > *obj_table,
> > > > +uint32_t esize, uint32_t num)
> > > > +{
> > > > +uint32_t idx, nr_idx, nr_num;
> > > > +
> > > > +/* 8B and 16B copies implemented individually to retain
> > > > + * the current performance.
> > > > + */
> > > > +if (esize == 8)
> > > > +enqueue_elems_64(r, prod_head, obj_table, num); else if (esize ==
> > > > +16) enqueue_elems_128(r, prod_head, obj_table, num); else {
> > > > +/* Normalize to uint32_t */
> > > > +uint32_t scale = esize / sizeof(uint32_t); nr_num = num * scale;
> > > > +idx = prod_head & r->mask; nr_idx = idx * scale;
> > > > +enqueue_elems_32(r, nr_idx, obj_table, nr_num); } }
> > > > +
  
Honnappa Nagarahalli Jan. 7, 2020, 3:21 p.m. UTC | #5
<snip>
> > > > > +
> > > > > +static __rte_always_inline void enqueue_elems_128(struct
> > > > > +rte_ring *r, uint32_t prod_head, const void *obj_table,
> > > > > +uint32_t n) { unsigned int i; const uint32_t size = r->size;
> > > > > +uint32_t idx = prod_head & r->mask; __uint128_t *ring =
> > > > > +(__uint128_t *)&r[1]; const __uint128_t *obj = (const
> > > > > +__uint128_t *)obj_table; if (likely(idx + n < size)) { for (i =
> > > > > +0; i < (n & ~0x1); i += 2, idx += 2) { ring[idx] = obj[i];
> > > > > +ring[idx + 1] = obj[i + 1];
> > > >
> > > >
> > > > AFAIK, that implies 16B aligned obj_table...
> > > > Would it always be the case?
> > > I am not sure from the compiler perspective.
> > > At least on Arm architecture, unaligned access (address that is
> > > accessed is not aligned to the size of the data element being
> > > accessed) will result in faults or require additional cycles. So, aligning on
> 16B should be fine.
> > Further, I would be changing this to use 'rte_int128_t' as '__uint128_t' is
> not defined on 32b systems.
> 
> What I am trying to say: with this code we imply new requirement for elems
The only existing use case in DPDK for 16B is the event ring. The event ring already does similar kind of copy (using 'struct rte_event'). So, there is no change in expectations for event ring.
For future code, I think this expectation should be fine since it allows for optimal code.

> in the ring: when sizeof(elem)==16 it's alignment also has to be at least 16.
> Which from my perspective is not ideal.
Any reasoning?

> Note that for elem sizes > 16 (24, 32), there is no such constraint.
The rest of them need to be aligned on 4B boundary. However, this should not affect the existing code.
The code for 8B and 16B is kept as is to ensure the performance is not affected for the existing code.

> 
> >
> > >
> > > >
> > > > > +}
> > > > > +switch (n & 0x1) {
> > > > > +case 1:
> > > > > +ring[idx++] = obj[i++];
> > > > > +}
> > > > > +} else {
> > > > > +for (i = 0; idx < size; i++, idx++) ring[idx] = obj[i];
> > > > > +/* Start at the beginning */
> > > > > +for (idx = 0; i < n; i++, idx++) ring[idx] = obj[i]; } }
> > > > > +
> > > > > +/* the actual enqueue of elements on the ring.
> > > > > + * Placed here since identical code needed in both
> > > > > + * single and multi producer enqueue functions.
> > > > > + */
> > > > > +static __rte_always_inline void enqueue_elems(struct rte_ring
> > > > > +*r, uint32_t prod_head, const void
> > > > *obj_table,
> > > > > +uint32_t esize, uint32_t num)
> > > > > +{
> > > > > +uint32_t idx, nr_idx, nr_num;
> > > > > +
> > > > > +/* 8B and 16B copies implemented individually to retain
> > > > > + * the current performance.
> > > > > + */
> > > > > +if (esize == 8)
> > > > > +enqueue_elems_64(r, prod_head, obj_table, num); else if (esize
> > > > > +==
> > > > > +16) enqueue_elems_128(r, prod_head, obj_table, num); else {
> > > > > +/* Normalize to uint32_t */
> > > > > +uint32_t scale = esize / sizeof(uint32_t); nr_num = num *
> > > > > +scale; idx = prod_head & r->mask; nr_idx = idx * scale;
> > > > > +enqueue_elems_32(r, nr_idx, obj_table, nr_num); } }
> > > > > +
  
Ananyev, Konstantin Jan. 7, 2020, 3:41 p.m. UTC | #6
> <snip>
> > > > > > +
> > > > > > +static __rte_always_inline void enqueue_elems_128(struct
> > > > > > +rte_ring *r, uint32_t prod_head, const void *obj_table,
> > > > > > +uint32_t n) { unsigned int i; const uint32_t size = r->size;
> > > > > > +uint32_t idx = prod_head & r->mask; __uint128_t *ring =
> > > > > > +(__uint128_t *)&r[1]; const __uint128_t *obj = (const
> > > > > > +__uint128_t *)obj_table; if (likely(idx + n < size)) { for (i =
> > > > > > +0; i < (n & ~0x1); i += 2, idx += 2) { ring[idx] = obj[i];
> > > > > > +ring[idx + 1] = obj[i + 1];
> > > > >
> > > > >
> > > > > AFAIK, that implies 16B aligned obj_table...
> > > > > Would it always be the case?
> > > > I am not sure from the compiler perspective.
> > > > At least on Arm architecture, unaligned access (address that is
> > > > accessed is not aligned to the size of the data element being
> > > > accessed) will result in faults or require additional cycles. So, aligning on
> > 16B should be fine.
> > > Further, I would be changing this to use 'rte_int128_t' as '__uint128_t' is
> > not defined on 32b systems.
> >
> > What I am trying to say: with this code we imply new requirement for elems
> The only existing use case in DPDK for 16B is the event ring. The event ring already does similar kind of copy (using 'struct rte_event').
> So, there is no change in expectations for event ring.
> For future code, I think this expectation should be fine since it allows for optimal code.
> 
> > in the ring: when sizeof(elem)==16 it's alignment also has to be at least 16.
> > Which from my perspective is not ideal.
> Any reasoning?

New implicit requirement and inconsistency.
Code like that:

struct ring_elem {uint64_t a, b;};
....
struct ring_elem elem; 
rte_ring_dequeue_elem(ring, &elem, sizeof(elem));
 
might cause a crash.
While exactly the same code with:

struct ring_elem {uint64_t a, b, c;}; OR struct ring_elem {uint64_t a, b, c, d;};

will work ok.

> 
> > Note that for elem sizes > 16 (24, 32), there is no such constraint.
> The rest of them need to be aligned on 4B boundary. However, this should not affect the existing code.
> The code for 8B and 16B is kept as is to ensure the performance is not affected for the existing code.
> 
> >
> > >
> > > >
> > > > >
> > > > > > +}
> > > > > > +switch (n & 0x1) {
> > > > > > +case 1:
> > > > > > +ring[idx++] = obj[i++];
> > > > > > +}
> > > > > > +} else {
> > > > > > +for (i = 0; idx < size; i++, idx++) ring[idx] = obj[i];
> > > > > > +/* Start at the beginning */
> > > > > > +for (idx = 0; i < n; i++, idx++) ring[idx] = obj[i]; } }
> > > > > > +
> > > > > > +/* the actual enqueue of elements on the ring.
> > > > > > + * Placed here since identical code needed in both
> > > > > > + * single and multi producer enqueue functions.
> > > > > > + */
> > > > > > +static __rte_always_inline void enqueue_elems(struct rte_ring
> > > > > > +*r, uint32_t prod_head, const void
> > > > > *obj_table,
> > > > > > +uint32_t esize, uint32_t num)
> > > > > > +{
> > > > > > +uint32_t idx, nr_idx, nr_num;
> > > > > > +
> > > > > > +/* 8B and 16B copies implemented individually to retain
> > > > > > + * the current performance.
> > > > > > + */
> > > > > > +if (esize == 8)
> > > > > > +enqueue_elems_64(r, prod_head, obj_table, num); else if (esize
> > > > > > +==
> > > > > > +16) enqueue_elems_128(r, prod_head, obj_table, num); else {
> > > > > > +/* Normalize to uint32_t */
> > > > > > +uint32_t scale = esize / sizeof(uint32_t); nr_num = num *
> > > > > > +scale; idx = prod_head & r->mask; nr_idx = idx * scale;
> > > > > > +enqueue_elems_32(r, nr_idx, obj_table, nr_num); } }
> > > > > > +
  
Honnappa Nagarahalli Jan. 8, 2020, 6:17 a.m. UTC | #7
<snip>
> > > > > > > +
> > > > > > > +static __rte_always_inline void enqueue_elems_128(struct
> > > > > > > +rte_ring *r, uint32_t prod_head, const void *obj_table,
> > > > > > > +uint32_t n) { unsigned int i; const uint32_t size =
> > > > > > > +r->size; uint32_t idx = prod_head & r->mask; __uint128_t
> > > > > > > +*ring = (__uint128_t *)&r[1]; const __uint128_t *obj =
> > > > > > > +(const __uint128_t *)obj_table; if (likely(idx + n < size))
> > > > > > > +{ for (i = 0; i < (n & ~0x1); i += 2, idx += 2) { ring[idx]
> > > > > > > += obj[i]; ring[idx + 1] = obj[i + 1];
> > > > > >
> > > > > >
> > > > > > AFAIK, that implies 16B aligned obj_table...
> > > > > > Would it always be the case?
> > > > > I am not sure from the compiler perspective.
> > > > > At least on Arm architecture, unaligned access (address that is
> > > > > accessed is not aligned to the size of the data element being
> > > > > accessed) will result in faults or require additional cycles.
> > > > > So, aligning on
> > > 16B should be fine.
> > > > Further, I would be changing this to use 'rte_int128_t' as
> > > > '__uint128_t' is
> > > not defined on 32b systems.
> > >
> > > What I am trying to say: with this code we imply new requirement for
> > > elems
> > The only existing use case in DPDK for 16B is the event ring. The event ring
> already does similar kind of copy (using 'struct rte_event').
> > So, there is no change in expectations for event ring.
> > For future code, I think this expectation should be fine since it allows for
> optimal code.
> >
> > > in the ring: when sizeof(elem)==16 it's alignment also has to be at least
> 16.
> > > Which from my perspective is not ideal.
> > Any reasoning?
> 
> New implicit requirement and inconsistency.
> Code like that:
> 
> struct ring_elem {uint64_t a, b;};
> ....
> struct ring_elem elem;
> rte_ring_dequeue_elem(ring, &elem, sizeof(elem));
> 
> might cause a crash.
The alignment here is 8B. Assuming that instructions generated will require 16B alignment, it will result in a crash, if configured to generate exception.
But, these instructions are not atomic instructions. At least on aarch64, unaligned access will not result in an exception for non-atomic loads/stores. I believe it is the same behavior for x86 as well.

> While exactly the same code with:
> 
> struct ring_elem {uint64_t a, b, c;}; OR struct ring_elem {uint64_t a, b, c, d;};
> 
> will work ok.
The alignment for these structures is still 8B. Are you saying this will work because these will be copied using pointer to uint32_t (whose alignment is 4B)?

> 
> >
> > > Note that for elem sizes > 16 (24, 32), there is no such constraint.
> > The rest of them need to be aligned on 4B boundary. However, this should
> not affect the existing code.
> > The code for 8B and 16B is kept as is to ensure the performance is not
> affected for the existing code.
<snip>
  
Ananyev, Konstantin Jan. 8, 2020, 10:05 a.m. UTC | #8
> <snip>
> > > > > > > > +
> > > > > > > > +static __rte_always_inline void enqueue_elems_128(struct
> > > > > > > > +rte_ring *r, uint32_t prod_head, const void *obj_table,
> > > > > > > > +uint32_t n) { unsigned int i; const uint32_t size =
> > > > > > > > +r->size; uint32_t idx = prod_head & r->mask; __uint128_t
> > > > > > > > +*ring = (__uint128_t *)&r[1]; const __uint128_t *obj =
> > > > > > > > +(const __uint128_t *)obj_table; if (likely(idx + n < size))
> > > > > > > > +{ for (i = 0; i < (n & ~0x1); i += 2, idx += 2) { ring[idx]
> > > > > > > > += obj[i]; ring[idx + 1] = obj[i + 1];
> > > > > > >
> > > > > > >
> > > > > > > AFAIK, that implies 16B aligned obj_table...
> > > > > > > Would it always be the case?
> > > > > > I am not sure from the compiler perspective.
> > > > > > At least on Arm architecture, unaligned access (address that is
> > > > > > accessed is not aligned to the size of the data element being
> > > > > > accessed) will result in faults or require additional cycles.
> > > > > > So, aligning on
> > > > 16B should be fine.
> > > > > Further, I would be changing this to use 'rte_int128_t' as
> > > > > '__uint128_t' is
> > > > not defined on 32b systems.
> > > >
> > > > What I am trying to say: with this code we imply new requirement for
> > > > elems
> > > The only existing use case in DPDK for 16B is the event ring. The event ring
> > already does similar kind of copy (using 'struct rte_event').
> > > So, there is no change in expectations for event ring.
> > > For future code, I think this expectation should be fine since it allows for
> > optimal code.
> > >
> > > > in the ring: when sizeof(elem)==16 it's alignment also has to be at least
> > 16.
> > > > Which from my perspective is not ideal.
> > > Any reasoning?
> >
> > New implicit requirement and inconsistency.
> > Code like that:
> >
> > struct ring_elem {uint64_t a, b;};
> > ....
> > struct ring_elem elem;
> > rte_ring_dequeue_elem(ring, &elem, sizeof(elem));
> >
> > might cause a crash.
> The alignment here is 8B. Assuming that instructions generated will require 16B alignment, it will result in a crash, if configured to generate
> exception.
> But, these instructions are not atomic instructions. At least on aarch64, unaligned access will not result in an exception for non-atomic
> loads/stores. I believe it is the same behavior for x86 as well.

On IA, there are 2 types of 16B load/store instructions: aligned and unaligned.
Aligned are a bit faster, but will cause an exception if used on non 16B aligned address. 
As you using uint128_t * compiler will assume that both src and dst are 16B aligned
and might generate code with aligned instructions.

> 
> > While exactly the same code with:
> >
> > struct ring_elem {uint64_t a, b, c;}; OR struct ring_elem {uint64_t a, b, c, d;};
> >
> > will work ok.
> The alignment for these structures is still 8B. Are you saying this will work because these will be copied using pointer to uint32_t (whose
> alignment is 4B)?

Yes, as we doing uint32_t copies, compiler can't assume the data will be 16B aligned
and will use unaligned instructions.

> 
> >
> > >
> > > > Note that for elem sizes > 16 (24, 32), there is no such constraint.
> > > The rest of them need to be aligned on 4B boundary. However, this should
> > not affect the existing code.
> > > The code for 8B and 16B is kept as is to ensure the performance is not
> > affected for the existing code.
> <snip>
  
Honnappa Nagarahalli Jan. 8, 2020, 11:40 p.m. UTC | #9
<snip>
> > > > > > > > > +
> > > > > > > > > +static __rte_always_inline void
> > > > > > > > > +enqueue_elems_128(struct rte_ring *r, uint32_t
> > > > > > > > > +prod_head, const void *obj_table, uint32_t n) {
> > > > > > > > > +unsigned int i; const uint32_t size =
> > > > > > > > > +r->size; uint32_t idx = prod_head & r->mask;
> > > > > > > > > +r->__uint128_t
> > > > > > > > > +*ring = (__uint128_t *)&r[1]; const __uint128_t *obj =
> > > > > > > > > +(const __uint128_t *)obj_table; if (likely(idx + n <
> > > > > > > > > +size)) { for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
> > > > > > > > > +{ ring[idx] = obj[i]; ring[idx + 1] = obj[i + 1];
> > > > > > > >
> > > > > > > >
> > > > > > > > AFAIK, that implies 16B aligned obj_table...
> > > > > > > > Would it always be the case?
> > > > > > > I am not sure from the compiler perspective.
> > > > > > > At least on Arm architecture, unaligned access (address that
> > > > > > > is accessed is not aligned to the size of the data element
> > > > > > > being
> > > > > > > accessed) will result in faults or require additional cycles.
> > > > > > > So, aligning on
> > > > > 16B should be fine.
> > > > > > Further, I would be changing this to use 'rte_int128_t' as
> > > > > > '__uint128_t' is
> > > > > not defined on 32b systems.
> > > > >
> > > > > What I am trying to say: with this code we imply new requirement
> > > > > for elems
> > > > The only existing use case in DPDK for 16B is the event ring. The
> > > > event ring
> > > already does similar kind of copy (using 'struct rte_event').
> > > > So, there is no change in expectations for event ring.
> > > > For future code, I think this expectation should be fine since it
> > > > allows for
> > > optimal code.
> > > >
> > > > > in the ring: when sizeof(elem)==16 it's alignment also has to be
> > > > > at least
> > > 16.
> > > > > Which from my perspective is not ideal.
> > > > Any reasoning?
> > >
> > > New implicit requirement and inconsistency.
> > > Code like that:
> > >
> > > struct ring_elem {uint64_t a, b;};
> > > ....
> > > struct ring_elem elem;
> > > rte_ring_dequeue_elem(ring, &elem, sizeof(elem));
> > >
> > > might cause a crash.
> > The alignment here is 8B. Assuming that instructions generated will
> > require 16B alignment, it will result in a crash, if configured to generate
> exception.
> > But, these instructions are not atomic instructions. At least on
> > aarch64, unaligned access will not result in an exception for non-atomic
> loads/stores. I believe it is the same behavior for x86 as well.
> 
> On IA, there are 2 types of 16B load/store instructions: aligned and unaligned.
> Aligned are a bit faster, but will cause an exception if used on non 16B aligned
> address.
> As you using uint128_t * compiler will assume that both src and dst are 16B
> aligned and might generate code with aligned instructions.
Ok, looking at few articles, I read that if the address is aligned, the unaligned instructions do not incur the penalty. Is this understanding correct?

I see 2 solutions here:
1) We can switch this copy to use uint32_t pointer. It would still allow the compiler to generate (unaligned) instructions for up to 256b load/store. The 2 multiplications (to normalize the index and the size of copy) can use shifts. This should make it safer. If one wants performance, they can align the obj table to 16B (the ring itself is already aligned on the cache line boundary).

2) Considering that performance is paramount, we could document that the obj table needs to be aligned on 16B boundary. This would affect event dev (if we go ahead with replacing the event ring implementation) significantly.

Note that we have to do the same thing for 64b elements as well.

> 
> >
> > > While exactly the same code with:
> > >
> > > struct ring_elem {uint64_t a, b, c;}; OR struct ring_elem {uint64_t
> > > a, b, c, d;};
> > >
> > > will work ok.
> > The alignment for these structures is still 8B. Are you saying this
> > will work because these will be copied using pointer to uint32_t (whose
> alignment is 4B)?
> 
> Yes, as we doing uint32_t copies, compiler can't assume the data will be 16B
> aligned and will use unaligned instructions.
> 
> >
> > >
> > > >
> > > > > Note that for elem sizes > 16 (24, 32), there is no such constraint.
> > > > The rest of them need to be aligned on 4B boundary. However, this
> > > > should
> > > not affect the existing code.
> > > > The code for 8B and 16B is kept as is to ensure the performance is
> > > > not
> > > affected for the existing code.
> > <snip>
  
Ananyev, Konstantin Jan. 9, 2020, 12:48 a.m. UTC | #10
> <snip>
> > > > > > > > > > +
> > > > > > > > > > +static __rte_always_inline void
> > > > > > > > > > +enqueue_elems_128(struct rte_ring *r, uint32_t
> > > > > > > > > > +prod_head, const void *obj_table, uint32_t n) {
> > > > > > > > > > +unsigned int i; const uint32_t size =
> > > > > > > > > > +r->size; uint32_t idx = prod_head & r->mask;
> > > > > > > > > > +r->__uint128_t
> > > > > > > > > > +*ring = (__uint128_t *)&r[1]; const __uint128_t *obj =
> > > > > > > > > > +(const __uint128_t *)obj_table; if (likely(idx + n <
> > > > > > > > > > +size)) { for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
> > > > > > > > > > +{ ring[idx] = obj[i]; ring[idx + 1] = obj[i + 1];
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > AFAIK, that implies 16B aligned obj_table...
> > > > > > > > > Would it always be the case?
> > > > > > > > I am not sure from the compiler perspective.
> > > > > > > > At least on Arm architecture, unaligned access (address that
> > > > > > > > is accessed is not aligned to the size of the data element
> > > > > > > > being
> > > > > > > > accessed) will result in faults or require additional cycles.
> > > > > > > > So, aligning on
> > > > > > 16B should be fine.
> > > > > > > Further, I would be changing this to use 'rte_int128_t' as
> > > > > > > '__uint128_t' is
> > > > > > not defined on 32b systems.
> > > > > >
> > > > > > What I am trying to say: with this code we imply new requirement
> > > > > > for elems
> > > > > The only existing use case in DPDK for 16B is the event ring. The
> > > > > event ring
> > > > already does similar kind of copy (using 'struct rte_event').
> > > > > So, there is no change in expectations for event ring.
> > > > > For future code, I think this expectation should be fine since it
> > > > > allows for
> > > > optimal code.
> > > > >
> > > > > > in the ring: when sizeof(elem)==16 it's alignment also has to be
> > > > > > at least
> > > > 16.
> > > > > > Which from my perspective is not ideal.
> > > > > Any reasoning?
> > > >
> > > > New implicit requirement and inconsistency.
> > > > Code like that:
> > > >
> > > > struct ring_elem {uint64_t a, b;};
> > > > ....
> > > > struct ring_elem elem;
> > > > rte_ring_dequeue_elem(ring, &elem, sizeof(elem));
> > > >
> > > > might cause a crash.
> > > The alignment here is 8B. Assuming that instructions generated will
> > > require 16B alignment, it will result in a crash, if configured to generate
> > exception.
> > > But, these instructions are not atomic instructions. At least on
> > > aarch64, unaligned access will not result in an exception for non-atomic
> > loads/stores. I believe it is the same behavior for x86 as well.
> >
> > On IA, there are 2 types of 16B load/store instructions: aligned and unaligned.
> > Aligned are a bit faster, but will cause an exception if used on non 16B aligned
> > address.
> > As you using uint128_t * compiler will assume that both src and dst are 16B
> > aligned and might generate code with aligned instructions.
> Ok, looking at few articles, I read that if the address is aligned, the unaligned instructions do not incur the penalty. Is this understanding
> correct?

Yes, from my experience the difference is negligible.

> 
> I see 2 solutions here:
> 1) We can switch this copy to use uint32_t pointer. It would still allow the compiler to generate (unaligned) instructions for up to 256b
> load/store. The 2 multiplications (to normalize the index and the size of copy) can use shifts. This should make it safer. If one wants
> performance, they can align the obj table to 16B (the ring itself is already aligned on the cache line boundary).

Sounds good to me.

> 
> 2) Considering that performance is paramount, we could document that the obj table needs to be aligned on 16B boundary. This would
> affect event dev (if we go ahead with replacing the event ring implementation) significantly.

I don't think perf difference would be that significant to justify such constraint.
I am in favor of #1.
 
> Note that we have to do the same thing for 64b elements as well.

I don't mind to have one unified copy procedure, which would always use 32bit elems,
but AFAIK, on IA there is no such limitation for 64bit load/stores.


> 
> >
> > >
> > > > While exactly the same code with:
> > > >
> > > > struct ring_elem {uint64_t a, b, c;}; OR struct ring_elem {uint64_t
> > > > a, b, c, d;};
> > > >
> > > > will work ok.
> > > The alignment for these structures is still 8B. Are you saying this
> > > will work because these will be copied using pointer to uint32_t (whose
> > alignment is 4B)?
> >
> > Yes, as we doing uint32_t copies, compiler can't assume the data will be 16B
> > aligned and will use unaligned instructions.
> >
> > >
> > > >
> > > > >
> > > > > > Note that for elem sizes > 16 (24, 32), there is no such constraint.
> > > > > The rest of them need to be aligned on 4B boundary. However, this
> > > > > should
> > > > not affect the existing code.
> > > > > The code for 8B and 16B is kept as is to ensure the performance is
> > > > > not
> > > > affected for the existing code.
> > > <snip>
  
Honnappa Nagarahalli Jan. 9, 2020, 4:06 p.m. UTC | #11
<snip>
> > > > > > > > > > > +
> > > > > > > > > > > +static __rte_always_inline void
> > > > > > > > > > > +enqueue_elems_128(struct rte_ring *r, uint32_t
> > > > > > > > > > > +prod_head, const void *obj_table, uint32_t n) {
> > > > > > > > > > > +unsigned int i; const uint32_t size =
> > > > > > > > > > > +r->size; uint32_t idx = prod_head & r->mask;
> > > > > > > > > > > +r->__uint128_t
> > > > > > > > > > > +*ring = (__uint128_t *)&r[1]; const __uint128_t
> > > > > > > > > > > +*obj = (const __uint128_t *)obj_table; if
> > > > > > > > > > > +(likely(idx + n <
> > > > > > > > > > > +size)) { for (i = 0; i < (n & ~0x1); i += 2, idx +=
> > > > > > > > > > > +2) { ring[idx] = obj[i]; ring[idx + 1] = obj[i +
> > > > > > > > > > > +1];
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > AFAIK, that implies 16B aligned obj_table...
> > > > > > > > > > Would it always be the case?
> > > > > > > > > I am not sure from the compiler perspective.
> > > > > > > > > At least on Arm architecture, unaligned access (address
> > > > > > > > > that is accessed is not aligned to the size of the data
> > > > > > > > > element being
> > > > > > > > > accessed) will result in faults or require additional cycles.
> > > > > > > > > So, aligning on
> > > > > > > 16B should be fine.
> > > > > > > > Further, I would be changing this to use 'rte_int128_t' as
> > > > > > > > '__uint128_t' is
> > > > > > > not defined on 32b systems.
> > > > > > >
> > > > > > > What I am trying to say: with this code we imply new
> > > > > > > requirement for elems
> > > > > > The only existing use case in DPDK for 16B is the event ring.
> > > > > > The event ring
> > > > > already does similar kind of copy (using 'struct rte_event').
> > > > > > So, there is no change in expectations for event ring.
> > > > > > For future code, I think this expectation should be fine since
> > > > > > it allows for
> > > > > optimal code.
> > > > > >
> > > > > > > in the ring: when sizeof(elem)==16 it's alignment also has
> > > > > > > to be at least
> > > > > 16.
> > > > > > > Which from my perspective is not ideal.
> > > > > > Any reasoning?
> > > > >
> > > > > New implicit requirement and inconsistency.
> > > > > Code like that:
> > > > >
> > > > > struct ring_elem {uint64_t a, b;}; ....
> > > > > struct ring_elem elem;
> > > > > rte_ring_dequeue_elem(ring, &elem, sizeof(elem));
> > > > >
> > > > > might cause a crash.
> > > > The alignment here is 8B. Assuming that instructions generated
> > > > will require 16B alignment, it will result in a crash, if
> > > > configured to generate
> > > exception.
> > > > But, these instructions are not atomic instructions. At least on
> > > > aarch64, unaligned access will not result in an exception for
> > > > non-atomic
> > > loads/stores. I believe it is the same behavior for x86 as well.
> > >
> > > On IA, there are 2 types of 16B load/store instructions: aligned and
> unaligned.
> > > Aligned are a bit faster, but will cause an exception if used on non
> > > 16B aligned address.
> > > As you using uint128_t * compiler will assume that both src and dst
> > > are 16B aligned and might generate code with aligned instructions.
> > Ok, looking at few articles, I read that if the address is aligned,
> > the unaligned instructions do not incur the penalty. Is this understanding
> correct?
> 
> Yes, from my experience the difference is negligible.
> 
> >
> > I see 2 solutions here:
> > 1) We can switch this copy to use uint32_t pointer. It would still
> > allow the compiler to generate (unaligned) instructions for up to 256b
> > load/store. The 2 multiplications (to normalize the index and the size of copy)
> can use shifts. This should make it safer. If one wants performance, they can
> align the obj table to 16B (the ring itself is already aligned on the cache line
> boundary).
> 
> Sounds good to me.
> 
> >
> > 2) Considering that performance is paramount, we could document that
> > the obj table needs to be aligned on 16B boundary. This would affect event
> dev (if we go ahead with replacing the event ring implementation) significantly.
> 
> I don't think perf difference would be that significant to justify such constraint.
> I am in favor of #1.
Ok, will go with this.
Is it ok if I squash the intermediate commits for test cases? I can keep one commit for functional tests and another for performance tests.

> 
> > Note that we have to do the same thing for 64b elements as well.
> 
> I don't mind to have one unified copy procedure, which would always use 32bit
> elems, but AFAIK, on IA there is no such limitation for 64bit load/stores.
> 
> 
> >
> > >
> > > >
> > > > > While exactly the same code with:
> > > > >
> > > > > struct ring_elem {uint64_t a, b, c;}; OR struct ring_elem
> > > > > {uint64_t a, b, c, d;};
> > > > >
> > > > > will work ok.
> > > > The alignment for these structures is still 8B. Are you saying
> > > > this will work because these will be copied using pointer to
> > > > uint32_t (whose
> > > alignment is 4B)?
> > >
> > > Yes, as we doing uint32_t copies, compiler can't assume the data
> > > will be 16B aligned and will use unaligned instructions.
> > >
> > > >
> > > > >
> > > > > >
> > > > > > > Note that for elem sizes > 16 (24, 32), there is no such constraint.
> > > > > > The rest of them need to be aligned on 4B boundary. However,
> > > > > > this should
> > > > > not affect the existing code.
> > > > > > The code for 8B and 16B is kept as is to ensure the
> > > > > > performance is not
> > > > > affected for the existing code.
> > > > <snip>
  
Ananyev, Konstantin Jan. 13, 2020, 11:53 a.m. UTC | #12
> <snip>
> > > > > > > > > > > > +
> > > > > > > > > > > > +static __rte_always_inline void
> > > > > > > > > > > > +enqueue_elems_128(struct rte_ring *r, uint32_t
> > > > > > > > > > > > +prod_head, const void *obj_table, uint32_t n) {
> > > > > > > > > > > > +unsigned int i; const uint32_t size =
> > > > > > > > > > > > +r->size; uint32_t idx = prod_head & r->mask;
> > > > > > > > > > > > +r->__uint128_t
> > > > > > > > > > > > +*ring = (__uint128_t *)&r[1]; const __uint128_t
> > > > > > > > > > > > +*obj = (const __uint128_t *)obj_table; if
> > > > > > > > > > > > +(likely(idx + n <
> > > > > > > > > > > > +size)) { for (i = 0; i < (n & ~0x1); i += 2, idx +=
> > > > > > > > > > > > +2) { ring[idx] = obj[i]; ring[idx + 1] = obj[i +
> > > > > > > > > > > > +1];
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > AFAIK, that implies 16B aligned obj_table...
> > > > > > > > > > > Would it always be the case?
> > > > > > > > > > I am not sure from the compiler perspective.
> > > > > > > > > > At least on Arm architecture, unaligned access (address
> > > > > > > > > > that is accessed is not aligned to the size of the data
> > > > > > > > > > element being
> > > > > > > > > > accessed) will result in faults or require additional cycles.
> > > > > > > > > > So, aligning on
> > > > > > > > 16B should be fine.
> > > > > > > > > Further, I would be changing this to use 'rte_int128_t' as
> > > > > > > > > '__uint128_t' is
> > > > > > > > not defined on 32b systems.
> > > > > > > >
> > > > > > > > What I am trying to say: with this code we imply new
> > > > > > > > requirement for elems
> > > > > > > The only existing use case in DPDK for 16B is the event ring.
> > > > > > > The event ring
> > > > > > already does similar kind of copy (using 'struct rte_event').
> > > > > > > So, there is no change in expectations for event ring.
> > > > > > > For future code, I think this expectation should be fine since
> > > > > > > it allows for
> > > > > > optimal code.
> > > > > > >
> > > > > > > > in the ring: when sizeof(elem)==16 it's alignment also has
> > > > > > > > to be at least
> > > > > > 16.
> > > > > > > > Which from my perspective is not ideal.
> > > > > > > Any reasoning?
> > > > > >
> > > > > > New implicit requirement and inconsistency.
> > > > > > Code like that:
> > > > > >
> > > > > > struct ring_elem {uint64_t a, b;}; ....
> > > > > > struct ring_elem elem;
> > > > > > rte_ring_dequeue_elem(ring, &elem, sizeof(elem));
> > > > > >
> > > > > > might cause a crash.
> > > > > The alignment here is 8B. Assuming that instructions generated
> > > > > will require 16B alignment, it will result in a crash, if
> > > > > configured to generate
> > > > exception.
> > > > > But, these instructions are not atomic instructions. At least on
> > > > > aarch64, unaligned access will not result in an exception for
> > > > > non-atomic
> > > > loads/stores. I believe it is the same behavior for x86 as well.
> > > >
> > > > On IA, there are 2 types of 16B load/store instructions: aligned and
> > unaligned.
> > > > Aligned are a bit faster, but will cause an exception if used on non
> > > > 16B aligned address.
> > > > As you using uint128_t * compiler will assume that both src and dst
> > > > are 16B aligned and might generate code with aligned instructions.
> > > Ok, looking at few articles, I read that if the address is aligned,
> > > the unaligned instructions do not incur the penalty. Is this understanding
> > correct?
> >
> > Yes, from my experience the difference is negligible.
> >
> > >
> > > I see 2 solutions here:
> > > 1) We can switch this copy to use uint32_t pointer. It would still
> > > allow the compiler to generate (unaligned) instructions for up to 256b
> > > load/store. The 2 multiplications (to normalize the index and the size of copy)
> > can use shifts. This should make it safer. If one wants performance, they can
> > align the obj table to 16B (the ring itself is already aligned on the cache line
> > boundary).
> >
> > Sounds good to me.
> >
> > >
> > > 2) Considering that performance is paramount, we could document that
> > > the obj table needs to be aligned on 16B boundary. This would affect event
> > dev (if we go ahead with replacing the event ring implementation) significantly.
> >
> > I don't think perf difference would be that significant to justify such constraint.
> > I am in favor of #1.
> Ok, will go with this.
> Is it ok if I squash the intermediate commits for test cases? I can keep one commit for functional tests and another for performance tests.

Yes, sounds like a good thing for me.
Konstantin 


> 
> >
> > > Note that we have to do the same thing for 64b elements as well.
> >
> > I don't mind to have one unified copy procedure, which would always use 32bit
> > elems, but AFAIK, on IA there is no such limitation for 64bit load/stores.
> >
> >
> > >
> > > >
> > > > >
> > > > > > While exactly the same code with:
> > > > > >
> > > > > > struct ring_elem {uint64_t a, b, c;}; OR struct ring_elem
> > > > > > {uint64_t a, b, c, d;};
> > > > > >
> > > > > > will work ok.
> > > > > The alignment for these structures is still 8B. Are you saying
> > > > > this will work because these will be copied using pointer to
> > > > > uint32_t (whose
> > > > alignment is 4B)?
> > > >
> > > > Yes, as we doing uint32_t copies, compiler can't assume the data
> > > > will be 16B aligned and will use unaligned instructions.
> > > >
> > > > >
> > > > > >
> > > > > > >
> > > > > > > > Note that for elem sizes > 16 (24, 32), there is no such constraint.
> > > > > > > The rest of them need to be aligned on 4B boundary. However,
> > > > > > > this should
> > > > > > not affect the existing code.
> > > > > > > The code for 8B and 16B is kept as is to ensure the
> > > > > > > performance is not
> > > > > > affected for the existing code.
> > > > > <snip>
  

Patch

diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
index 22454b084..917c560ad 100644
--- a/lib/librte_ring/Makefile
+++ b/lib/librte_ring/Makefile
@@ -6,7 +6,7 @@  include $(RTE_SDK)/mk/rte.vars.mk
 # library name
 LIB = librte_ring.a
 
-CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
+CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 -DALLOW_EXPERIMENTAL_API
 LDLIBS += -lrte_eal
 
 EXPORT_MAP := rte_ring_version.map
@@ -16,6 +16,7 @@  SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
 
 # install includes
 SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
+					rte_ring_elem.h \
 					rte_ring_generic.h \
 					rte_ring_c11_mem.h
 
diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build
index ca8a435e9..f2f3ccc88 100644
--- a/lib/librte_ring/meson.build
+++ b/lib/librte_ring/meson.build
@@ -3,5 +3,9 @@ 
 
 sources = files('rte_ring.c')
 headers = files('rte_ring.h',
+		'rte_ring_elem.h',
 		'rte_ring_c11_mem.h',
 		'rte_ring_generic.h')
+
+# rte_ring_create_elem and rte_ring_get_memsize_elem are experimental
+allow_experimental_apis = true
diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
index d9b308036..3e15dc398 100644
--- a/lib/librte_ring/rte_ring.c
+++ b/lib/librte_ring/rte_ring.c
@@ -33,6 +33,7 @@ 
 #include <rte_tailq.h>
 
 #include "rte_ring.h"
+#include "rte_ring_elem.h"
 
 TAILQ_HEAD(rte_ring_list, rte_tailq_entry);
 
@@ -46,23 +47,38 @@  EAL_REGISTER_TAILQ(rte_ring_tailq)
 
 /* return the size of memory occupied by a ring */
 ssize_t
-rte_ring_get_memsize(unsigned count)
+rte_ring_get_memsize_elem(unsigned int esize, unsigned int count)
 {
 	ssize_t sz;
 
+	/* Check if element size is a multiple of 4B */
+	if (esize % 4 != 0) {
+		RTE_LOG(ERR, RING, "element size is not a multiple of 4\n");
+
+		return -EINVAL;
+	}
+
 	/* count must be a power of 2 */
 	if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
 		RTE_LOG(ERR, RING,
-			"Requested size is invalid, must be power of 2, and "
-			"do not exceed the size limit %u\n", RTE_RING_SZ_MASK);
+			"Requested number of elements is invalid, must be power of 2, and not exceed %u\n",
+			RTE_RING_SZ_MASK);
+
 		return -EINVAL;
 	}
 
-	sz = sizeof(struct rte_ring) + count * sizeof(void *);
+	sz = sizeof(struct rte_ring) + count * esize;
 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
 	return sz;
 }
 
+/* return the size of memory occupied by a ring */
+ssize_t
+rte_ring_get_memsize(unsigned count)
+{
+	return rte_ring_get_memsize_elem(sizeof(void *), count);
+}
+
 void
 rte_ring_reset(struct rte_ring *r)
 {
@@ -114,10 +130,10 @@  rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
 	return 0;
 }
 
-/* create the ring */
+/* create the ring for a given element size */
 struct rte_ring *
-rte_ring_create(const char *name, unsigned count, int socket_id,
-		unsigned flags)
+rte_ring_create_elem(const char *name, unsigned int esize, unsigned int count,
+		int socket_id, unsigned int flags)
 {
 	char mz_name[RTE_MEMZONE_NAMESIZE];
 	struct rte_ring *r;
@@ -135,7 +151,7 @@  rte_ring_create(const char *name, unsigned count, int socket_id,
 	if (flags & RING_F_EXACT_SZ)
 		count = rte_align32pow2(count + 1);
 
-	ring_size = rte_ring_get_memsize(count);
+	ring_size = rte_ring_get_memsize_elem(esize, count);
 	if (ring_size < 0) {
 		rte_errno = ring_size;
 		return NULL;
@@ -182,6 +198,15 @@  rte_ring_create(const char *name, unsigned count, int socket_id,
 	return r;
 }
 
+/* create the ring */
+struct rte_ring *
+rte_ring_create(const char *name, unsigned count, int socket_id,
+		unsigned flags)
+{
+	return rte_ring_create_elem(name, sizeof(void *), count, socket_id,
+		flags);
+}
+
 /* free the ring */
 void
 rte_ring_free(struct rte_ring *r)
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 2a9f768a1..18fc5d845 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -216,6 +216,7 @@  int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
  */
 struct rte_ring *rte_ring_create(const char *name, unsigned count,
 				 int socket_id, unsigned flags);
+
 /**
  * De-allocate all memory used by the ring.
  *
diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h
new file mode 100644
index 000000000..fc7fe127c
--- /dev/null
+++ b/lib/librte_ring/rte_ring_elem.h
@@ -0,0 +1,1002 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2019 Arm Limited
+ * Copyright (c) 2010-2017 Intel Corporation
+ * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
+ * All rights reserved.
+ * Derived from FreeBSD's bufring.h
+ * Used as BSD-3 Licensed with permission from Kip Macy.
+ */
+
+#ifndef _RTE_RING_ELEM_H_
+#define _RTE_RING_ELEM_H_
+
+/**
+ * @file
+ * RTE Ring with user defined element size
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdio.h>
+#include <stdint.h>
+#include <sys/queue.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_memzone.h>
+#include <rte_pause.h>
+
+#include "rte_ring.h"
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Calculate the memory size needed for a ring with given element size
+ *
+ * This function returns the number of bytes needed for a ring, given
+ * the number of elements in it and the size of the element. This value
+ * is the sum of the size of the structure rte_ring and the size of the
+ * memory needed for storing the elements. The value is aligned to a cache
+ * line size.
+ *
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ * @param count
+ *   The number of elements in the ring (must be a power of 2).
+ * @return
+ *   - The memory size needed for the ring on success.
+ *   - -EINVAL - esize is not a multiple of 4 or count provided is not a
+ *		 power of 2.
+ */
+__rte_experimental
+ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Create a new ring named *name* that stores elements with given size.
+ *
+ * This function uses ``memzone_reserve()`` to allocate memory. Then it
+ * calls rte_ring_init() to initialize an empty ring.
+ *
+ * The new ring size is set to *count*, which must be a power of
+ * two. Water marking is disabled by default. The real usable ring size
+ * is *count-1* instead of *count* to differentiate a free ring from an
+ * empty ring.
+ *
+ * The ring is added in RTE_TAILQ_RING list.
+ *
+ * @param name
+ *   The name of the ring.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ * @param count
+ *   The number of elements in the ring (must be a power of 2).
+ * @param socket_id
+ *   The *socket_id* argument is the socket identifier in case of
+ *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
+ *   constraint for the reserved zone.
+ * @param flags
+ *   An OR of the following:
+ *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
+ *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
+ *      is "single-producer". Otherwise, it is "multi-producers".
+ *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
+ *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
+ *      is "single-consumer". Otherwise, it is "multi-consumers".
+ * @return
+ *   On success, the pointer to the new allocated ring. NULL on error with
+ *    rte_errno set appropriately. Possible errno values include:
+ *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
+ *    - E_RTE_SECONDARY - function was called from a secondary process instance
+ *    - EINVAL - esize is not a multiple of 4 or count provided is not a
+ *		 power of 2.
+ *    - ENOSPC - the maximum number of memzones has already been allocated
+ *    - EEXIST - a memzone with the same name already exists
+ *    - ENOMEM - no appropriate memory area found in which to create memzone
+ */
+__rte_experimental
+struct rte_ring *rte_ring_create_elem(const char *name, unsigned int esize,
+			unsigned int count, int socket_id, unsigned int flags);
+
+static __rte_always_inline void
+enqueue_elems_32(struct rte_ring *r, uint32_t idx,
+		const void *obj_table, uint32_t n)
+{
+	unsigned int i;
+	const uint32_t size = r->size;
+	uint32_t *ring = (uint32_t *)&r[1];
+	const uint32_t *obj = (const uint32_t *)obj_table;
+	if (likely(idx + n < size)) {
+		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
+			ring[idx] = obj[i];
+			ring[idx + 1] = obj[i + 1];
+			ring[idx + 2] = obj[i + 2];
+			ring[idx + 3] = obj[i + 3];
+			ring[idx + 4] = obj[i + 4];
+			ring[idx + 5] = obj[i + 5];
+			ring[idx + 6] = obj[i + 6];
+			ring[idx + 7] = obj[i + 7];
+		}
+		switch (n & 0x7) {
+		case 7:
+			ring[idx++] = obj[i++]; /* fallthrough */
+		case 6:
+			ring[idx++] = obj[i++]; /* fallthrough */
+		case 5:
+			ring[idx++] = obj[i++]; /* fallthrough */
+		case 4:
+			ring[idx++] = obj[i++]; /* fallthrough */
+		case 3:
+			ring[idx++] = obj[i++]; /* fallthrough */
+		case 2:
+			ring[idx++] = obj[i++]; /* fallthrough */
+		case 1:
+			ring[idx++] = obj[i++]; /* fallthrough */
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			ring[idx] = obj[i];
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			ring[idx] = obj[i];
+	}
+}
+
+static __rte_always_inline void
+enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
+		const void *obj_table, uint32_t n)
+{
+	unsigned int i;
+	const uint32_t size = r->size;
+	uint32_t idx = prod_head & r->mask;
+	uint64_t *ring = (uint64_t *)&r[1];
+	const uint64_t *obj = (const uint64_t *)obj_table;
+	if (likely(idx + n < size)) {
+		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
+			ring[idx] = obj[i];
+			ring[idx + 1] = obj[i + 1];
+			ring[idx + 2] = obj[i + 2];
+			ring[idx + 3] = obj[i + 3];
+		}
+		switch (n & 0x3) {
+		case 3:
+			ring[idx++] = obj[i++]; /* fallthrough */
+		case 2:
+			ring[idx++] = obj[i++]; /* fallthrough */
+		case 1:
+			ring[idx++] = obj[i++];
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			ring[idx] = obj[i];
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			ring[idx] = obj[i];
+	}
+}
+
+static __rte_always_inline void
+enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
+		const void *obj_table, uint32_t n)
+{
+	unsigned int i;
+	const uint32_t size = r->size;
+	uint32_t idx = prod_head & r->mask;
+	__uint128_t *ring = (__uint128_t *)&r[1];
+	const __uint128_t *obj = (const __uint128_t *)obj_table;
+	if (likely(idx + n < size)) {
+		for (i = 0; i < (n & ~0x1); i += 2, idx += 2) {
+			ring[idx] = obj[i];
+			ring[idx + 1] = obj[i + 1];
+		}
+		switch (n & 0x1) {
+		case 1:
+			ring[idx++] = obj[i++];
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			ring[idx] = obj[i];
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			ring[idx] = obj[i];
+	}
+}
+
+/* the actual enqueue of elements on the ring.
+ * Placed here since identical code needed in both
+ * single and multi producer enqueue functions.
+ */
+static __rte_always_inline void
+enqueue_elems(struct rte_ring *r, uint32_t prod_head, const void *obj_table,
+		uint32_t esize, uint32_t num)
+{
+	uint32_t idx, nr_idx, nr_num;
+
+	/* 8B and 16B copies implemented individually to retain
+	 * the current performance.
+	 */
+	if (esize == 8)
+		enqueue_elems_64(r, prod_head, obj_table, num);
+	else if (esize == 16)
+		enqueue_elems_128(r, prod_head, obj_table, num);
+	else {
+		/* Normalize to uint32_t */
+		uint32_t scale = esize / sizeof(uint32_t);
+		nr_num = num * scale;
+		idx = prod_head & r->mask;
+		nr_idx = idx * scale;
+		enqueue_elems_32(r, nr_idx, obj_table, nr_num);
+	}
+}
+
+static __rte_always_inline void
+dequeue_elems_32(struct rte_ring *r, uint32_t idx,
+		void *obj_table, uint32_t n)
+{
+	unsigned int i;
+	const uint32_t size = r->size;
+	uint32_t *ring = (uint32_t *)&r[1];
+	uint32_t *obj = (uint32_t *)obj_table;
+	if (likely(idx + n < size)) {
+		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
+			obj[i] = ring[idx];
+			obj[i + 1] = ring[idx + 1];
+			obj[i + 2] = ring[idx + 2];
+			obj[i + 3] = ring[idx + 3];
+			obj[i + 4] = ring[idx + 4];
+			obj[i + 5] = ring[idx + 5];
+			obj[i + 6] = ring[idx + 6];
+			obj[i + 7] = ring[idx + 7];
+		}
+		switch (n & 0x7) {
+		case 7:
+			obj[i++] = ring[idx++]; /* fallthrough */
+		case 6:
+			obj[i++] = ring[idx++]; /* fallthrough */
+		case 5:
+			obj[i++] = ring[idx++]; /* fallthrough */
+		case 4:
+			obj[i++] = ring[idx++]; /* fallthrough */
+		case 3:
+			obj[i++] = ring[idx++]; /* fallthrough */
+		case 2:
+			obj[i++] = ring[idx++]; /* fallthrough */
+		case 1:
+			obj[i++] = ring[idx++]; /* fallthrough */
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			obj[i] = ring[idx];
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			obj[i] = ring[idx];
+	}
+}
+
+static __rte_always_inline void
+dequeue_elems_64(struct rte_ring *r, uint32_t prod_head,
+		void *obj_table, uint32_t n)
+{
+	unsigned int i;
+	const uint32_t size = r->size;
+	uint32_t idx = prod_head & r->mask;
+	uint64_t *ring = (uint64_t *)&r[1];
+	uint64_t *obj = (uint64_t *)obj_table;
+	if (likely(idx + n < size)) {
+		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
+			obj[i] = ring[idx];
+			obj[i + 1] = ring[idx + 1];
+			obj[i + 2] = ring[idx + 2];
+			obj[i + 3] = ring[idx + 3];
+		}
+		switch (n & 0x3) {
+		case 3:
+			obj[i++] = ring[idx++]; /* fallthrough */
+		case 2:
+			obj[i++] = ring[idx++]; /* fallthrough */
+		case 1:
+			obj[i++] = ring[idx++]; /* fallthrough */
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			obj[i] = ring[idx];
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			obj[i] = ring[idx];
+	}
+}
+
+static __rte_always_inline void
+dequeue_elems_128(struct rte_ring *r, uint32_t prod_head,
+		void *obj_table, uint32_t n)
+{
+	unsigned int i;
+	const uint32_t size = r->size;
+	uint32_t idx = prod_head & r->mask;
+	__uint128_t *ring = (__uint128_t *)&r[1];
+	__uint128_t *obj = (__uint128_t *)obj_table;
+	if (likely(idx + n < size)) {
+		for (i = 0; i < (n & ~0x1); i += 2, idx += 2) {
+			obj[i] = ring[idx];
+			obj[i + 1] = ring[idx + 1];
+		}
+		switch (n & 0x1) {
+		case 1:
+			obj[i++] = ring[idx++]; /* fallthrough */
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			obj[i] = ring[idx];
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			obj[i] = ring[idx];
+	}
+}
+
+/* the actual dequeue of elements from the ring.
+ * Placed here since identical code needed in both
+ * single and multi producer enqueue functions.
+ */
+static __rte_always_inline void
+dequeue_elems(struct rte_ring *r, uint32_t cons_head, void *obj_table,
+		uint32_t esize, uint32_t num)
+{
+	uint32_t idx, nr_idx, nr_num;
+
+	/* 8B and 16B copies implemented individually to retain
+	 * the current performance.
+	 */
+	if (esize == 8)
+		dequeue_elems_64(r, cons_head, obj_table, num);
+	else if (esize == 16)
+		dequeue_elems_128(r, cons_head, obj_table, num);
+	else {
+		/* Normalize to uint32_t */
+		uint32_t scale = esize / sizeof(uint32_t);
+		nr_num = num * scale;
+		idx = cons_head & r->mask;
+		nr_idx = idx * scale;
+		dequeue_elems_32(r, nr_idx, obj_table, nr_num);
+	}
+}
+
+/* Between load and load. there might be cpu reorder in weak model
+ * (powerpc/arm).
+ * There are 2 choices for the users
+ * 1.use rmb() memory barrier
+ * 2.use one-direction load_acquire/store_release barrier,defined by
+ * CONFIG_RTE_USE_C11_MEM_MODEL=y
+ * It depends on performance test results.
+ * By default, move common functions to rte_ring_generic.h
+ */
+#ifdef RTE_USE_C11_MEM_MODEL
+#include "rte_ring_c11_mem.h"
+#else
+#include "rte_ring_generic.h"
+#endif
+
+/**
+ * @internal Enqueue several objects on the ring
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param is_sp
+ *   Indicates whether to use single producer or multi-producer head update
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table,
+		unsigned int esize, unsigned int n,
+		enum rte_ring_queue_behavior behavior, unsigned int is_sp,
+		unsigned int *free_space)
+{
+	uint32_t prod_head, prod_next;
+	uint32_t free_entries;
+
+	n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
+			&prod_head, &prod_next, &free_entries);
+	if (n == 0)
+		goto end;
+
+	enqueue_elems(r, prod_head, obj_table, esize, n);
+
+	update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
+end:
+	if (free_space != NULL)
+		*free_space = free_entries - n;
+	return n;
+}
+
+/**
+ * @internal Dequeue several objects from the ring
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param is_sc
+ *   Indicates whether to use single consumer or multi-consumer head update
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table,
+		unsigned int esize, unsigned int n,
+		enum rte_ring_queue_behavior behavior, unsigned int is_sc,
+		unsigned int *available)
+{
+	uint32_t cons_head, cons_next;
+	uint32_t entries;
+
+	n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
+			&cons_head, &cons_next, &entries);
+	if (n == 0)
+		goto end;
+
+	dequeue_elems(r, cons_head, obj_table, esize, n);
+
+	update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
+
+end:
+	if (available != NULL)
+		*available = entries - n;
+	return n;
+}
+
+/**
+ * Enqueue several objects on the ring (multi-producers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * producer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_FIXED, __IS_MP, free_space);
+}
+
+/**
+ * Enqueue several objects on a ring
+ *
+ * @warning This API is NOT multi-producers safe
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_FIXED, __IS_SP, free_space);
+}
+
+/**
+ * Enqueue several objects on a ring.
+ *
+ * This function calls the multi-producer or the single-producer
+ * version depending on the default behavior that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_FIXED, r->prod.single, free_space);
+}
+
+/**
+ * Enqueue one object on a ring (multi-producers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * producer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj
+ *   A pointer to the object to be added.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @return
+ *   - 0: Success; objects enqueued.
+ *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ */
+static __rte_always_inline int
+rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
+{
+	return rte_ring_mp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
+								-ENOBUFS;
+}
+
+/**
+ * Enqueue one object on a ring
+ *
+ * @warning This API is NOT multi-producers safe
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj
+ *   A pointer to the object to be added.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @return
+ *   - 0: Success; objects enqueued.
+ *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ */
+static __rte_always_inline int
+rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
+{
+	return rte_ring_sp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
+								-ENOBUFS;
+}
+
+/**
+ * Enqueue one object on a ring.
+ *
+ * This function calls the multi-producer or the single-producer
+ * version, depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj
+ *   A pointer to the object to be added.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @return
+ *   - 0: Success; objects enqueued.
+ *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ */
+static __rte_always_inline int
+rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
+{
+	return rte_ring_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
+								-ENOBUFS;
+}
+
+/**
+ * Dequeue several objects from a ring (multi-consumers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * consumer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+				RTE_RING_QUEUE_FIXED, __IS_MC, available);
+}
+
+/**
+ * Dequeue several objects from a ring (NOT multi-consumers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table,
+ *   must be strictly positive.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_FIXED, __IS_SC, available);
+}
+
+/**
+ * Dequeue several objects from a ring.
+ *
+ * This function calls the multi-consumers or the single-consumer
+ * version, depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_FIXED, r->cons.single, available);
+}
+
+/**
+ * Dequeue one object from a ring (multi-consumers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * consumer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_p
+ *   A pointer to a void * pointer (object) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @return
+ *   - 0: Success; objects dequeued.
+ *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
+ *     dequeued.
+ */
+static __rte_always_inline int
+rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p,
+				unsigned int esize)
+{
+	return rte_ring_mc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL)  ? 0 :
+								-ENOENT;
+}
+
+/**
+ * Dequeue one object from a ring (NOT multi-consumers safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_p
+ *   A pointer to a void * pointer (object) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @return
+ *   - 0: Success; objects dequeued.
+ *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
+ *     dequeued.
+ */
+static __rte_always_inline int
+rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p,
+				unsigned int esize)
+{
+	return rte_ring_sc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
+								-ENOENT;
+}
+
+/**
+ * Dequeue one object from a ring.
+ *
+ * This function calls the multi-consumers or the single-consumer
+ * version depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_p
+ *   A pointer to a void * pointer (object) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @return
+ *   - 0: Success, objects dequeued.
+ *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
+ *     dequeued.
+ */
+static __rte_always_inline int
+rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize)
+{
+	return rte_ring_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
+								-ENOENT;
+}
+
+/**
+ * Enqueue several objects on the ring (multi-producers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * producer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+static __rte_always_inline unsigned
+rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
+}
+
+/**
+ * Enqueue several objects on a ring
+ *
+ * @warning This API is NOT multi-producers safe
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+static __rte_always_inline unsigned
+rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
+}
+
+/**
+ * Enqueue several objects on a ring.
+ *
+ * This function calls the multi-producer or the single-producer
+ * version depending on the default behavior that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+static __rte_always_inline unsigned
+rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_VARIABLE, r->prod.single, free_space);
+}
+
+/**
+ * Dequeue several objects from a ring (multi-consumers safe). When the request
+ * objects are more than the available objects, only dequeue the actual number
+ * of objects
+ *
+ * This function uses a "compare and set" instruction to move the
+ * consumer index atomically.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   - n: Actual number of objects dequeued, 0 if ring is empty
+ */
+static __rte_always_inline unsigned
+rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
+}
+
+/**
+ * Dequeue several objects from a ring (NOT multi-consumers safe).When the
+ * request objects are more than the available objects, only dequeue the
+ * actual number of objects
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   - n: Actual number of objects dequeued, 0 if ring is empty
+ */
+static __rte_always_inline unsigned
+rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+			RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
+}
+
+/**
+ * Dequeue multiple objects from a ring up to a maximum number.
+ *
+ * This function calls the multi-consumers or the single-consumer
+ * version, depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   - Number of objects dequeued
+ */
+static __rte_always_inline unsigned int
+rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
+		unsigned int esize, unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
+				RTE_RING_QUEUE_VARIABLE,
+				r->cons.single, available);
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_RING_ELEM_H_ */
diff --git a/lib/librte_ring/rte_ring_version.map b/lib/librte_ring/rte_ring_version.map
index 89d84bcf4..7a5328dd5 100644
--- a/lib/librte_ring/rte_ring_version.map
+++ b/lib/librte_ring/rte_ring_version.map
@@ -15,6 +15,8 @@  DPDK_20.0 {
 EXPERIMENTAL {
 	global:
 
+	rte_ring_create_elem;
+	rte_ring_get_memsize_elem;
 	rte_ring_reset;
 
 };