[RFC,1/1] lib/ring: add scatter gather and serial dequeue APIs
diff mbox series

Message ID 20200224203931.21256-2-honnappa.nagarahalli@arm.com
State New
Delegated to: Thomas Monjalon
Headers show
Series
  • lib/ring: add scatter gather and serial dequeue APIs
Related show

Checks

Context Check Description
ci/Intel-compilation success Compilation OK
ci/checkpatch success coding style OK

Commit Message

Honnappa Nagarahalli Feb. 24, 2020, 8:39 p.m. UTC
Add scatter gather APIs to avoid intermediate memcpy. Serial
dequeue APIs are added to support access to ring elements
before actual dequeue.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Reviewed-by: Gavin Hu <gavin.hu@arm.com>
Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com>
---
 lib/librte_ring/Makefile           |   1 +
 lib/librte_ring/meson.build        |   1 +
 lib/librte_ring/rte_ring_c11_mem.h |  98 +++++++
 lib/librte_ring/rte_ring_elem_sg.h | 417 +++++++++++++++++++++++++++++
 lib/librte_ring/rte_ring_generic.h |  93 +++++++
 5 files changed, 610 insertions(+)
 create mode 100644 lib/librte_ring/rte_ring_elem_sg.h

Comments

Konstantin Ananyev Feb. 26, 2020, 8:38 p.m. UTC | #1
Hi Honnappa,

> Add scatter gather APIs to avoid intermediate memcpy. Serial
> dequeue APIs are added to support access to ring elements
> before actual dequeue.
> 
> Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> Reviewed-by: Gavin Hu <gavin.hu@arm.com>
> Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com>
> ---
>  lib/librte_ring/Makefile           |   1 +
>  lib/librte_ring/meson.build        |   1 +
>  lib/librte_ring/rte_ring_c11_mem.h |  98 +++++++
>  lib/librte_ring/rte_ring_elem_sg.h | 417 +++++++++++++++++++++++++++++
>  lib/librte_ring/rte_ring_generic.h |  93 +++++++
>  5 files changed, 610 insertions(+)

As was already noticed by you this patch overlaps quite a bit with another one:
http://patches.dpdk.org/patch/66006/

Though it seems there are few significant differences in
our approaches (both API and implementation).
So we probably need to come-up with some common
view first, before moving forward with some unified version.
To start a discussion, I produced some comments, pls see below. 

I don't see changes in rte_ring.h itself, but I suppose
that's just because it is an RFC and it would be added in later versions?
Another similar question there seems only _bulk_ (RTE_RING_QUEUE_FIXED) mode,
I suppose _burst_ will also be added in later versions?

> diff --git a/lib/librte_ring/rte_ring_elem_sg.h b/lib/librte_ring/rte_ring_elem_sg.h
> new file mode 100644
> index 000000000..a73f4fbfe
> --- /dev/null
> +++ b/lib/librte_ring/rte_ring_elem_sg.h
> @@ -0,0 +1,417 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2020 Arm Limited
> + * Copyright (c) 2010-2017 Intel Corporation
> + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
> + * All rights reserved.
> + * Derived from FreeBSD's bufring.h
> + * Used as BSD-3 Licensed with permission from Kip Macy.
> + */
> +
> +#ifndef _RTE_RING_ELEM_SG_H_
> +#define _RTE_RING_ELEM_SG_H_
> +
> +/**
> + * @file
> + * RTE Ring with
> + * 1) user defined element size
> + * 2) scatter gather feature to copy objects to/from the ring
> + * 3) ability to reserve, consume/discard elements in the ring
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdio.h>
> +#include <stdint.h>
> +#include <string.h>
> +#include <sys/queue.h>
> +#include <errno.h>
> +#include <rte_common.h>
> +#include <rte_config.h>
> +#include <rte_memory.h>
> +#include <rte_lcore.h>
> +#include <rte_atomic.h>
> +#include <rte_branch_prediction.h>
> +#include <rte_memzone.h>
> +#include <rte_pause.h>
> +
> +#include "rte_ring.h"
> +#include "rte_ring_elem.h"
> +
> +/* Between load and load. there might be cpu reorder in weak model
> + * (powerpc/arm).
> + * There are 2 choices for the users
> + * 1.use rmb() memory barrier
> + * 2.use one-direction load_acquire/store_release barrier,defined by
> + * CONFIG_RTE_USE_C11_MEM_MODEL=y
> + * It depends on performance test results.
> + * By default, move common functions to rte_ring_generic.h
> + */
> +#ifdef RTE_USE_C11_MEM_MODEL
> +#include "rte_ring_c11_mem.h"
> +#else
> +#include "rte_ring_generic.h"
> +#endif
> +
> +static __rte_always_inline void
> +__rte_ring_get_elem_addr_64(struct rte_ring *r, uint32_t head,
> +	uint32_t num, void **dst1, uint32_t *n1, void **dst2)
> +{
> +	uint32_t idx = head & r->mask;
> +	uint64_t *ring = (uint64_t *)&r[1];
> +
> +	*dst1 = ring + idx;
> +	*n1 = num;
> +
> +	if (idx + num > r->size) {
> +		*n1 = num - (r->size - idx - 1);
> +		*dst2 = ring;
> +	}
> +}
> +
> +static __rte_always_inline void
> +__rte_ring_get_elem_addr_128(struct rte_ring *r, uint32_t head,
> +	uint32_t num, void **dst1, uint32_t *n1, void **dst2)
> +{
> +	uint32_t idx = head & r->mask;
> +	rte_int128_t *ring = (rte_int128_t *)&r[1];
> +
> +	*dst1 = ring + idx;
> +	*n1 = num;
> +
> +	if (idx + num > r->size) {
> +		*n1 = num - (r->size - idx - 1);
> +		*dst2 = ring;
> +	}
> +}
> +
> +static __rte_always_inline void
> +__rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head,
> +	uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2)
> +{
> +	if (esize == 8)
> +		return __rte_ring_get_elem_addr_64(r, head,
> +						num, dst1, n1, dst2);
> +	else if (esize == 16)
> +		return __rte_ring_get_elem_addr_128(r, head,
> +						num, dst1, n1, dst2);
> +	else {
> +		uint32_t idx, scale, nr_idx;
> +		uint32_t *ring = (uint32_t *)&r[1];
> +
> +		/* Normalize to uint32_t */
> +		scale = esize / sizeof(uint32_t);
> +		idx = head & r->mask;
> +		nr_idx = idx * scale;
> +
> +		*dst1 = ring + nr_idx;
> +		*n1 = num;
> +
> +		if (idx + num > r->size) {
> +			*n1 = num - (r->size - idx - 1);
> +			*dst2 = ring;
> +		}
> +	}
> +}
> +
> +/**
> + * @internal Reserve ring elements to enqueue several objects on the ring
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the ring. Otherwise
> + *   the results are undefined.
> + * @param n
> + *   The number of elements to reserve in the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Reserve a fixed number of elements from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible from ring
> + * @param is_sp
> + *   Indicates whether to use single producer or multi-producer reserve
> + * @param old_head
> + *   Producer's head index before reservation.
> + * @param new_head
> + *   Producer's head index after reservation.
> + * @param free_space
> + *   returns the amount of space after the reserve operation has finished.
> + *   It is not updated if the number of reserved elements is zero.
> + * @param dst1
> + *   Pointer to location in the ring to copy the data.
> + * @param n1
> + *   Number of elements to copy at dst1
> + * @param dst2
> + *   In case of ring wrap around, this pointer provides the location to
> + *   copy the remaining elements. The number of elements to copy at this
> + *   location is equal to (number of elements reserved - n1)
> + * @return
> + *   Actual number of elements reserved.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_enqueue_elem_reserve(struct rte_ring *r, unsigned int esize,


I do understand the purpose of reserve, then either commit/abort for serial sync mode,
but what is the purpose of non-serial version of reserve/commit?
In serial  MP/MC case, after _reserve_(n) you always have to do
_commit_(n) - you can't reduce number of elements, or do _abort_.
Again you cannot avoid memcpy(n) here anyhow.
So what is the point of these functions for non-serial case? 

BTW, I think it would be good to have serial version of _enqueue_ too.

> +		unsigned int n, enum rte_ring_queue_behavior behavior,
> +		unsigned int is_sp, unsigned int *old_head,
> +		unsigned int *new_head, unsigned int *free_space,
> +		void **dst1, unsigned int *n1, void **dst2)

I do understand the intention to avoid memcpy(), but proposed API
seems overcomplicated, error prone, and not very convenient for the user.
I don't think that avoiding memcpy() will save us that many cycles here,
so probably better to keep API model a bit more regular:

n = rte_ring_mp_serial_enqueue_bulk_reserve(ring, num, &free_space);
...
/* performs actual memcpy(), m<=n */ 
rte_ring_mp_serial_enqueue_bulk_commit(ring, obj,  m);

/* performs actual memcpy for num elems */ 
n = rte_ring_mp_serial_dequeue_bulk_reserve(ring, obj, num, &free_space);
....
/* m<=n */
rte_ring_mp_serial_dequeue_bulk_commit(ring, obj,  m);

Plus, we can have usual enqueue/dequeue API for serial sync mode:
rte_ring_serial_(enqueue/dequeue)_(bulk/burst)

> +{
> +	uint32_t free_entries;
> +
> +	n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
> +			old_head, new_head, &free_entries);
> +
> +	if (n == 0)
> +		goto end;

Here and in other similar places, why not just 'return 0;'?

> +
> +	__rte_ring_get_elem_addr(r, *old_head, esize, n, dst1, n1, dst2);
> +
> +	if (free_space != NULL)
> +		*free_space = free_entries - n;
> +
> +end:
> +	return n;
> +}
> +
> +/**
> + * @internal Consume previously reserved ring elements (for enqueue)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param old_head
> + *   Producer's head index before reservation.
> + * @param new_head
> + *   Producer's head index after reservation.
> + * @param is_sp
> + *   Indicates whether to use single producer or multi-producer head update
> + */
> +static __rte_always_inline void
> +__rte_ring_do_enqueue_elem_commit(struct rte_ring *r,
> +		unsigned int old_head, unsigned int new_head,
> +		unsigned int is_sp)
> +{
> +	update_tail(&r->prod, old_head, new_head, is_sp, 1);
> +}
> +
> +/**
> + * Reserve one element for enqueuing one object on a ring
> + * (multi-producers safe). Application must call
> + * 'rte_ring_mp_enqueue_elem_commit' to complete the enqueue operation.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the ring. Otherwise
> + *   the results are undefined.
> + * @param old_head
> + *   Producer's head index before reservation. The same should be passed to
> + *   'rte_ring_mp_enqueue_elem_commit' function.
> + * @param new_head
> + *   Producer's head index after reservation. The same should be passed to
> + *   'rte_ring_mp_enqueue_elem_commit' function.
> + * @param free_space
> + *   Returns the amount of space after the reservation operation has finished.
> + *   It is not updated if the number of reserved elements is zero.
> + * @param dst
> + *   Pointer to location in the ring to copy the data.
> + * @return
> + *   - 0: Success; objects enqueued.
> + *   - -ENOBUFS: Not enough room in the ring to reserve; no element is reserved.
> + */
> +static __rte_always_inline int
> +rte_ring_mp_enqueue_elem_reserve(struct rte_ring *r, unsigned int esize,
> +		unsigned int *old_head, unsigned int *new_head,
> +		unsigned int *free_space, void **dst)
> +{
> +	unsigned int n;
> +
> +	return __rte_ring_do_enqueue_elem_reserve(r, esize, 1,
> +			RTE_RING_QUEUE_FIXED, 0, old_head, new_head,
> +			free_space, dst, &n, NULL) ? 0 : -ENOBUFS;
> +}
> +
> +/**
> + * Consume previously reserved elements (for enqueue) in a ring
> + * (multi-producers safe). This API completes the enqueue operation.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param old_head
> + *   Producer's head index before reservation. This value was returned
> + *   when the API 'rte_ring_mp_enqueue_elem_reserve' was called.
> + * @param new_head
> + *   Producer's head index after reservation. This value was returned
> + *   when the API 'rte_ring_mp_enqueue_elem_reserve' was called.
> + */
> +static __rte_always_inline void
> +rte_ring_mp_enqueue_elem_commit(struct rte_ring *r, unsigned int old_head,
> +		unsigned int new_head)
> +{
> +	__rte_ring_do_enqueue_elem_commit(r, old_head, new_head, 0);
> +}
> +
> +/**
> + * @internal Reserve elements to dequeue several objects on the ring.
> + * This function blocks if there are elements reserved already.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the ring. Otherwise
> + *   the results are undefined.
> + * @param n
> + *   The number of objects to reserve in the ring
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Reserve fixed number of elements in a ring
> + *   RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible in a ring
> + * @param is_sc
> + *   Indicates whether to use single consumer or multi-consumer head update
> + * @param old_head
> + *   Consumer's head index before reservation.
> + * @param new_head
> + *   Consumer's head index after reservation.
> + * @param available
> + *   returns the number of remaining ring elements after the reservation
> + *   It is not updated if the number of reserved elements is zero.
> + * @param src1
> + *   Pointer to location in the ring to copy the data from.
> + * @param n1
> + *   Number of elements to copy from src1
> + * @param src2
> + *   In case of wrap around in the ring, this pointer provides the location
> + *   to copy the remaining elements from. The number of elements to copy from
> + *   this pointer is equal to (number of elements reserved - n1)
> + * @return
> + *   Actual number of elements reserved.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_dequeue_elem_reserve_serial(struct rte_ring *r,
> +		unsigned int esize, unsigned int n,
> +		enum rte_ring_queue_behavior behavior, unsigned int is_sc,
> +		unsigned int *old_head, unsigned int *new_head,
> +		unsigned int *available, void **src1, unsigned int *n1,
> +		void **src2)
> +{
> +	uint32_t entries;
> +
> +	n = __rte_ring_move_cons_head_serial(r, is_sc, n, behavior,
> +			old_head, new_head, &entries);
> +
> +	if (n == 0)
> +		goto end;
> +
> +	__rte_ring_get_elem_addr(r, *old_head, esize, n, src1, n1, src2);
> +
> +	if (available != NULL)
> +		*available = entries - n;
> +
> +end:
> +	return n;
> +}
> +
> +/**
> + * @internal Consume previously reserved ring elements (for dequeue)
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param old_head
> + *   Consumer's head index before reservation.
> + * @param new_head
> + *   Consumer's head index after reservation.
> + * @param is_sc
> + *   Indicates whether to use single consumer or multi-consumer head update
> + */
> +static __rte_always_inline void
> +__rte_ring_do_dequeue_elem_commit(struct rte_ring *r,
> +		unsigned int old_head, unsigned int new_head,
> +		unsigned int is_sc)
> +{

I think it is a bit dangerous and error-prone approach to let user
specify old_head/new_head manually.
Seems better just _commit(ring, num) - see above.
That way suer don't have to calculate new head mannualy,
plus we can have a check that ring.tail - ring.head >= num.    

> +	update_tail(&r->cons, old_head, new_head, is_sc, 1);

I think update_tail() is not enough here.
As in some cases we need to update  ring.head also:
let say user reserved 2 elems, but then decided to commit only one.  
So I think we need a special new function instead of update_tail() here.
BTW, in HTS I use atomic 64-bit read/write to get/set both head and tail in one go.
This is not really required - two 32bit ops would work too, I think.
As usual, both ways have some pros and cons:
using 64bit ops might be faster on 64-bit target, plus it is less error prone
(no need to think about head/tail read/write ordering, etc.),
though for 32-bit target it would mean some extra overhead. 

> +}
> +
> +/**
> + * Reserve one element on a ring for dequeue. This function blocks if there
> + * are elements reserved already. Application must call
> + * 'rte_ring_do_dequeue_elem_commit' or
> + * `rte_ring_do_dequeue_elem_revert_serial' to complete the dequeue operation.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the ring. Otherwise
> + *   the results are undefined.
> + * @param old_head
> + *   Consumer's head index before reservation. The same should be passed to
> + *   'rte_ring_dequeue_elem_commit' function.
> + * @param new_head
> + *   Consumer's head index after reservation. The same should be passed to
> + *   'rte_ring_dequeue_elem_commit' function.
> + * @param available
> + *   returns the number of remaining ring elements after the reservation
> + *   It is not updated if the number of reserved elements is zero.
> + * @param src
> + *   Pointer to location in the ring to copy the data from.
> + * @return
> + *   - 0: Success; elements reserved
> + *   - -ENOBUFS: Not enough room in the ring; no element is reserved.
> + */
> +static __rte_always_inline int
> +rte_ring_dequeue_elem_reserve_serial(struct rte_ring *r, unsigned int esize,
> +		unsigned int *old_head, unsigned int *new_head,
> +		unsigned int *available, void **src)
> +{
> +	unsigned int n;
> +
> +	return __rte_ring_do_dequeue_elem_reserve_serial(r, esize, 1,
> +			RTE_RING_QUEUE_FIXED, r->cons.single, old_head,
> +			new_head, available, src, &n, NULL) ? 0 : -ENOBUFS;
> +}
> +
> +/**
> + * Consume previously reserved elements (for dequeue) in a ring
> + * (multi-consumer safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param old_head
> + *   Consumer's head index before reservation. This value was returned
> + *   when the API 'rte_ring_dequeue_elem_reserve_xxx' was called.
> + * @param new_head
> + *   Consumer's head index after reservation. This value was returned
> + *   when the API 'rte_ring_dequeue_elem_reserve_xxx' was called.
> + */
> +static __rte_always_inline void
> +rte_ring_dequeue_elem_commit(struct rte_ring *r, unsigned int old_head,
> +		unsigned int new_head)
> +{
> +	__rte_ring_do_dequeue_elem_commit(r, old_head, new_head,
> +						r->cons.single);
> +}
> +
> +/**
> + * Discard previously reserved elements (for dequeue) in a ring.
> + *
> + * @warning
> + * This API can be called only if the ring elements were reserved
> + * using 'rte_ring_dequeue_xxx_elem_reserve_serial' APIs.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + */
> +static __rte_always_inline void
> +rte_ring_dequeue_elem_revert_serial(struct rte_ring *r)
> +{
> +	__rte_ring_revert_head(&r->cons);
> +}
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _RTE_RING_ELEM_SG_H_ */
> diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h
> index 953cdbbd5..8d7a7ffcc 100644
> --- a/lib/librte_ring/rte_ring_generic.h
> +++ b/lib/librte_ring/rte_ring_generic.h
> @@ -170,4 +170,97 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
>  	return n;
>  }
> 
> +/**
> + * @internal This function updates the consumer head if there are no
> + * prior reserved elements on the ring.
> + *
> + * @param r
> + *   A pointer to the ring structure
> + * @param is_sc
> + *   Indicates whether multi-consumer path is needed or not
> + * @param n
> + *   The number of elements we will want to dequeue, i.e. how far should the
> + *   head be moved
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
> + * @param old_head
> + *   Returns head value as it was before the move, i.e. where dequeue starts
> + * @param new_head
> + *   Returns the current/new head value i.e. where dequeue finishes
> + * @param entries
> + *   Returns the number of entries in the ring BEFORE head was moved
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_move_cons_head_serial(struct rte_ring *r, unsigned int is_sc,
> +		unsigned int n, enum rte_ring_queue_behavior behavior,
> +		uint32_t *old_head, uint32_t *new_head,
> +		uint32_t *entries)
> +{
> +	unsigned int max = n;
> +	int success;
> +
> +	/* move cons.head atomically */
> +	do {
> +		/* Restore n as it may change every loop */
> +		n = max;
> +
> +		*old_head = r->cons.head;
> +
> +		/* add rmb barrier to avoid load/load reorder in weak
> +		 * memory model. It is noop on x86
> +		 */
> +		rte_smp_rmb();
> +
> +		/* Ensure that cons.tail and cons.head are the same */
> +		if (*old_head != r->cons.tail) {
> +			rte_pause();
> +
> +			success = 0;
> +			continue;
> +		}
> +
> +		/* The subtraction is done between two unsigned 32bits value
> +		 * (the result is always modulo 32 bits even if we have
> +		 * cons_head > prod_tail). So 'entries' is always between 0
> +		 * and size(ring)-1.
> +		 */
> +		*entries = (r->prod.tail - *old_head);
> +
> +		/* Set the actual entries for dequeue */
> +		if (n > *entries)
> +			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
> +
> +		if (unlikely(n == 0))
> +			return 0;
> +
> +		*new_head = *old_head + n;
> +		if (is_sc) {
> +			r->cons.head = *new_head;
> +			rte_smp_rmb();
> +			success = 1;

I don't think we need to worry about SC case in this function.
For sc(/sp)_serial (if we need such mode) - we probably can use normal move_(cons/prod)_head().

> +		} else {
> +			success = rte_atomic32_cmpset(&r->cons.head, *old_head,
> +					*new_head);
> +		}
> +	} while (unlikely(success == 0));
> +	return n;
> +}
> +
> +/**
> + * @internal This function updates the head to match the tail
> + *
> + * @param ht
> + *   A pointer to the ring's head-tail structure
> + */
> +static __rte_always_inline void
> +__rte_ring_revert_head(struct rte_ring_headtail *ht)
> +{
> +	/* Discard the reserved ring elements. */
> +	ht->head = ht->tail;
> +}
> +
>  #endif /* _RTE_RING_GENERIC_H_ */
> --
> 2.17.1
Konstantin Ananyev Feb. 26, 2020, 11:21 p.m. UTC | #2
> > +/**
> > + * @internal Reserve ring elements to enqueue several objects on the ring
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param esize
> > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > + *   This must be the same value used while creating the ring. Otherwise
> > + *   the results are undefined.
> > + * @param n
> > + *   The number of elements to reserve in the ring.
> > + * @param behavior
> > + *   RTE_RING_QUEUE_FIXED:    Reserve a fixed number of elements from a ring
> > + *   RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible from ring
> > + * @param is_sp
> > + *   Indicates whether to use single producer or multi-producer reserve
> > + * @param old_head
> > + *   Producer's head index before reservation.
> > + * @param new_head
> > + *   Producer's head index after reservation.
> > + * @param free_space
> > + *   returns the amount of space after the reserve operation has finished.
> > + *   It is not updated if the number of reserved elements is zero.
> > + * @param dst1
> > + *   Pointer to location in the ring to copy the data.
> > + * @param n1
> > + *   Number of elements to copy at dst1
> > + * @param dst2
> > + *   In case of ring wrap around, this pointer provides the location to
> > + *   copy the remaining elements. The number of elements to copy at this
> > + *   location is equal to (number of elements reserved - n1)
> > + * @return
> > + *   Actual number of elements reserved.
> > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_do_enqueue_elem_reserve(struct rte_ring *r, unsigned int esize,
> 
> 
> I do understand the purpose of reserve, then either commit/abort for serial sync mode,
> but what is the purpose of non-serial version of reserve/commit?
> In serial  MP/MC case, after _reserve_(n) you always have to do

Typo, meant 'in on-serial MP/MP case' of course. 

> _commit_(n) - you can't reduce number of elements, or do _abort_.
> Again you cannot avoid memcpy(n) here anyhow.
> So what is the point of these functions for non-serial case?
> 
> BTW, I think it would be good to have serial version of _enqueue_ too.
Honnappa Nagarahalli Feb. 28, 2020, 12:18 a.m. UTC | #3
<snip>
> 
> 
> Hi Honnappa,
Thanks Konstantin for the comments.
> 
> > Add scatter gather APIs to avoid intermediate memcpy. Serial dequeue
> > APIs are added to support access to ring elements before actual
> > dequeue.
> >
> > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> > Reviewed-by: Gavin Hu <gavin.hu@arm.com>
> > Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com>
> > ---
> >  lib/librte_ring/Makefile           |   1 +
> >  lib/librte_ring/meson.build        |   1 +
> >  lib/librte_ring/rte_ring_c11_mem.h |  98 +++++++
> > lib/librte_ring/rte_ring_elem_sg.h | 417 +++++++++++++++++++++++++++++
> > lib/librte_ring/rte_ring_generic.h |  93 +++++++
> >  5 files changed, 610 insertions(+)
> 
> As was already noticed by you this patch overlaps quite a bit with another
> one:
> http://patches.dpdk.org/patch/66006/
I took a cursory look at this. I need to take a detailed look, plan to do so soon.

> 
> Though it seems there are few significant differences in our approaches (both
> API and implementation).
> So we probably need to come-up with some common view first, before
> moving forward with some unified version.
> To start a discussion, I produced some comments, pls see below.
> 
> I don't see changes in rte_ring.h itself, but I suppose that's just because it is an
> RFC and it would be added in later versions?
I did not plan to add them. IMO, we should not add new APIs to that list. We should encourage using the rte_ring_xxx_elem APIs should be used going forward. They are interoperable (I mean, the application can call a mix of APIs)

> Another similar question there seems only _bulk_ (RTE_RING_QUEUE_FIXED)
> mode, I suppose _burst_ will also be added in later versions?
Here, I was trying to avoid providing APIs without a clear need (_bulk_ is enough for RCU for now). If you see the need, I can add them.

> 
> > diff --git a/lib/librte_ring/rte_ring_elem_sg.h
> > b/lib/librte_ring/rte_ring_elem_sg.h
> > new file mode 100644
> > index 000000000..a73f4fbfe
> > --- /dev/null
> > +++ b/lib/librte_ring/rte_ring_elem_sg.h
> > @@ -0,0 +1,417 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + *
> > + * Copyright (c) 2020 Arm Limited
> > + * Copyright (c) 2010-2017 Intel Corporation
> > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
> > + * All rights reserved.
> > + * Derived from FreeBSD's bufring.h
> > + * Used as BSD-3 Licensed with permission from Kip Macy.
> > + */
> > +
> > +#ifndef _RTE_RING_ELEM_SG_H_
> > +#define _RTE_RING_ELEM_SG_H_
> > +
> > +/**
> > + * @file
> > + * RTE Ring with
> > + * 1) user defined element size
> > + * 2) scatter gather feature to copy objects to/from the ring
> > + * 3) ability to reserve, consume/discard elements in the ring  */
> > +
> > +#ifdef __cplusplus
> > +extern "C" {
> > +#endif
> > +
> > +#include <stdio.h>
> > +#include <stdint.h>
> > +#include <string.h>
> > +#include <sys/queue.h>
> > +#include <errno.h>
> > +#include <rte_common.h>
> > +#include <rte_config.h>
> > +#include <rte_memory.h>
> > +#include <rte_lcore.h>
> > +#include <rte_atomic.h>
> > +#include <rte_branch_prediction.h>
> > +#include <rte_memzone.h>
> > +#include <rte_pause.h>
> > +
> > +#include "rte_ring.h"
> > +#include "rte_ring_elem.h"
> > +
> > +/* Between load and load. there might be cpu reorder in weak model
> > + * (powerpc/arm).
> > + * There are 2 choices for the users
> > + * 1.use rmb() memory barrier
> > + * 2.use one-direction load_acquire/store_release barrier,defined by
> > + * CONFIG_RTE_USE_C11_MEM_MODEL=y
> > + * It depends on performance test results.
> > + * By default, move common functions to rte_ring_generic.h  */ #ifdef
> > +RTE_USE_C11_MEM_MODEL #include "rte_ring_c11_mem.h"
> > +#else
> > +#include "rte_ring_generic.h"
> > +#endif
> > +
> > +static __rte_always_inline void
> > +__rte_ring_get_elem_addr_64(struct rte_ring *r, uint32_t head,
> > +	uint32_t num, void **dst1, uint32_t *n1, void **dst2) {
> > +	uint32_t idx = head & r->mask;
> > +	uint64_t *ring = (uint64_t *)&r[1];
> > +
> > +	*dst1 = ring + idx;
> > +	*n1 = num;
> > +
> > +	if (idx + num > r->size) {
> > +		*n1 = num - (r->size - idx - 1);
> > +		*dst2 = ring;
> > +	}
> > +}
> > +
> > +static __rte_always_inline void
> > +__rte_ring_get_elem_addr_128(struct rte_ring *r, uint32_t head,
> > +	uint32_t num, void **dst1, uint32_t *n1, void **dst2) {
> > +	uint32_t idx = head & r->mask;
> > +	rte_int128_t *ring = (rte_int128_t *)&r[1];
> > +
> > +	*dst1 = ring + idx;
> > +	*n1 = num;
> > +
> > +	if (idx + num > r->size) {
> > +		*n1 = num - (r->size - idx - 1);
> > +		*dst2 = ring;
> > +	}
> > +}
> > +
> > +static __rte_always_inline void
> > +__rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head,
> > +	uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void
> > +**dst2) {
> > +	if (esize == 8)
> > +		return __rte_ring_get_elem_addr_64(r, head,
> > +						num, dst1, n1, dst2);
> > +	else if (esize == 16)
> > +		return __rte_ring_get_elem_addr_128(r, head,
> > +						num, dst1, n1, dst2);
> > +	else {
> > +		uint32_t idx, scale, nr_idx;
> > +		uint32_t *ring = (uint32_t *)&r[1];
> > +
> > +		/* Normalize to uint32_t */
> > +		scale = esize / sizeof(uint32_t);
> > +		idx = head & r->mask;
> > +		nr_idx = idx * scale;
> > +
> > +		*dst1 = ring + nr_idx;
> > +		*n1 = num;
> > +
> > +		if (idx + num > r->size) {
> > +			*n1 = num - (r->size - idx - 1);
> > +			*dst2 = ring;
> > +		}
> > +	}
> > +}
> > +
> > +/**
> > + * @internal Reserve ring elements to enqueue several objects on the
> > +ring
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param esize
> > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > + *   This must be the same value used while creating the ring. Otherwise
> > + *   the results are undefined.
> > + * @param n
> > + *   The number of elements to reserve in the ring.
> > + * @param behavior
> > + *   RTE_RING_QUEUE_FIXED:    Reserve a fixed number of elements from a
> ring
> > + *   RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible
> from ring
> > + * @param is_sp
> > + *   Indicates whether to use single producer or multi-producer reserve
> > + * @param old_head
> > + *   Producer's head index before reservation.
> > + * @param new_head
> > + *   Producer's head index after reservation.
> > + * @param free_space
> > + *   returns the amount of space after the reserve operation has finished.
> > + *   It is not updated if the number of reserved elements is zero.
> > + * @param dst1
> > + *   Pointer to location in the ring to copy the data.
> > + * @param n1
> > + *   Number of elements to copy at dst1
> > + * @param dst2
> > + *   In case of ring wrap around, this pointer provides the location to
> > + *   copy the remaining elements. The number of elements to copy at this
> > + *   location is equal to (number of elements reserved - n1)
> > + * @return
> > + *   Actual number of elements reserved.
> > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_do_enqueue_elem_reserve(struct rte_ring *r, unsigned int
> > +esize,
> 
> 
> I do understand the purpose of reserve, then either commit/abort for serial
> sync mode, but what is the purpose of non-serial version of reserve/commit?
In RCU, I have the need for scatter-gather feature. i.e. the data in the ring element is coming from multiple sources ('token' is generated by the RCU library and the application provides additional data). If I do not provide the reserve/commit, I need to introduce an intermediate memcpy to get these two data contiguously to copy to the ring element. The sequence is 'reserve(1), memcpy1, mempcy2, commit(1)'. Hence, you do not see the abort API for the enqueue.
 
> In serial  MP/MC case, after _reserve_(n) you always have to do
> _commit_(n) - you can't reduce number of elements, or do _abort_.
Agree, the intention here is to provide the scatter/gather feature.

> Again you cannot avoid memcpy(n) here anyhow.
> So what is the point of these functions for non-serial case?
It avoids an intermediate memcpy when the data is coming from multiple sources.

> 
> BTW, I think it would be good to have serial version of _enqueue_ too.
If there is a good use case, they should be provided. I did not come across a good use case.

> 
> > +		unsigned int n, enum rte_ring_queue_behavior behavior,
> > +		unsigned int is_sp, unsigned int *old_head,
> > +		unsigned int *new_head, unsigned int *free_space,
> > +		void **dst1, unsigned int *n1, void **dst2)
> 
> I do understand the intention to avoid memcpy(), but proposed API seems
> overcomplicated, error prone, and not very convenient for the user.
The issue is the need to handle the wrap around in ring storage array. i.e. when the space is reserved for more than 1 ring element, the wrap around might happen.

> I don't think that avoiding memcpy() will save us that many cycles here, so
This depends on the amount of data being copied.

> probably better to keep API model a bit more regular:
> 
> n = rte_ring_mp_serial_enqueue_bulk_reserve(ring, num, &free_space); ...
> /* performs actual memcpy(), m<=n */
> rte_ring_mp_serial_enqueue_bulk_commit(ring, obj,  m);
These do not take care of the wrap-around case or I am not able to understand your comment.

> 
> /* performs actual memcpy for num elems */ n =
> rte_ring_mp_serial_dequeue_bulk_reserve(ring, obj, num, &free_space); ....
> /* m<=n */
> rte_ring_mp_serial_dequeue_bulk_commit(ring, obj,  m);
> 
> Plus, we can have usual enqueue/dequeue API for serial sync mode:
> rte_ring_serial_(enqueue/dequeue)_(bulk/burst)
It would be good to understand the use cases. IMO, if we do not have use cases, we should not add for now. We can add them as and when the use cases are understood.

> 
> > +{
> > +	uint32_t free_entries;
> > +
> > +	n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
> > +			old_head, new_head, &free_entries);
> > +
> > +	if (n == 0)
> > +		goto end;
> 
> Here and in other similar places, why not just 'return 0;'?
Yes, should be possible.

> 
> > +
> > +	__rte_ring_get_elem_addr(r, *old_head, esize, n, dst1, n1, dst2);
> > +
> > +	if (free_space != NULL)
> > +		*free_space = free_entries - n;
> > +
> > +end:
> > +	return n;
> > +}
> > +
> > +/**
> > + * @internal Consume previously reserved ring elements (for enqueue)
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param old_head
> > + *   Producer's head index before reservation.
> > + * @param new_head
> > + *   Producer's head index after reservation.
> > + * @param is_sp
> > + *   Indicates whether to use single producer or multi-producer head
> update
> > + */
> > +static __rte_always_inline void
> > +__rte_ring_do_enqueue_elem_commit(struct rte_ring *r,
> > +		unsigned int old_head, unsigned int new_head,
> > +		unsigned int is_sp)
> > +{
> > +	update_tail(&r->prod, old_head, new_head, is_sp, 1); }
> > +
> > +/**
> > + * Reserve one element for enqueuing one object on a ring
> > + * (multi-producers safe). Application must call
> > + * 'rte_ring_mp_enqueue_elem_commit' to complete the enqueue
> operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param esize
> > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > + *   This must be the same value used while creating the ring. Otherwise
> > + *   the results are undefined.
> > + * @param old_head
> > + *   Producer's head index before reservation. The same should be passed
> to
> > + *   'rte_ring_mp_enqueue_elem_commit' function.
> > + * @param new_head
> > + *   Producer's head index after reservation. The same should be passed to
> > + *   'rte_ring_mp_enqueue_elem_commit' function.
> > + * @param free_space
> > + *   Returns the amount of space after the reservation operation has
> finished.
> > + *   It is not updated if the number of reserved elements is zero.
> > + * @param dst
> > + *   Pointer to location in the ring to copy the data.
> > + * @return
> > + *   - 0: Success; objects enqueued.
> > + *   - -ENOBUFS: Not enough room in the ring to reserve; no element is
> reserved.
> > + */
> > +static __rte_always_inline int
> > +rte_ring_mp_enqueue_elem_reserve(struct rte_ring *r, unsigned int esize,
> > +		unsigned int *old_head, unsigned int *new_head,
> > +		unsigned int *free_space, void **dst) {
> > +	unsigned int n;
> > +
> > +	return __rte_ring_do_enqueue_elem_reserve(r, esize, 1,
> > +			RTE_RING_QUEUE_FIXED, 0, old_head, new_head,
> > +			free_space, dst, &n, NULL) ? 0 : -ENOBUFS; }
> > +
> > +/**
> > + * Consume previously reserved elements (for enqueue) in a ring
> > + * (multi-producers safe). This API completes the enqueue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param old_head
> > + *   Producer's head index before reservation. This value was returned
> > + *   when the API 'rte_ring_mp_enqueue_elem_reserve' was called.
> > + * @param new_head
> > + *   Producer's head index after reservation. This value was returned
> > + *   when the API 'rte_ring_mp_enqueue_elem_reserve' was called.
> > + */
> > +static __rte_always_inline void
> > +rte_ring_mp_enqueue_elem_commit(struct rte_ring *r, unsigned int
> old_head,
> > +		unsigned int new_head)
> > +{
> > +	__rte_ring_do_enqueue_elem_commit(r, old_head, new_head, 0); }
> > +
> > +/**
> > + * @internal Reserve elements to dequeue several objects on the ring.
> > + * This function blocks if there are elements reserved already.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param esize
> > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > + *   This must be the same value used while creating the ring. Otherwise
> > + *   the results are undefined.
> > + * @param n
> > + *   The number of objects to reserve in the ring
> > + * @param behavior
> > + *   RTE_RING_QUEUE_FIXED:    Reserve fixed number of elements in a ring
> > + *   RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible in
> a ring
> > + * @param is_sc
> > + *   Indicates whether to use single consumer or multi-consumer head
> update
> > + * @param old_head
> > + *   Consumer's head index before reservation.
> > + * @param new_head
> > + *   Consumer's head index after reservation.
> > + * @param available
> > + *   returns the number of remaining ring elements after the reservation
> > + *   It is not updated if the number of reserved elements is zero.
> > + * @param src1
> > + *   Pointer to location in the ring to copy the data from.
> > + * @param n1
> > + *   Number of elements to copy from src1
> > + * @param src2
> > + *   In case of wrap around in the ring, this pointer provides the location
> > + *   to copy the remaining elements from. The number of elements to copy
> from
> > + *   this pointer is equal to (number of elements reserved - n1)
> > + * @return
> > + *   Actual number of elements reserved.
> > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_do_dequeue_elem_reserve_serial(struct rte_ring *r,
> > +		unsigned int esize, unsigned int n,
> > +		enum rte_ring_queue_behavior behavior, unsigned int is_sc,
> > +		unsigned int *old_head, unsigned int *new_head,
> > +		unsigned int *available, void **src1, unsigned int *n1,
> > +		void **src2)
> > +{
> > +	uint32_t entries;
> > +
> > +	n = __rte_ring_move_cons_head_serial(r, is_sc, n, behavior,
> > +			old_head, new_head, &entries);
> > +
> > +	if (n == 0)
> > +		goto end;
> > +
> > +	__rte_ring_get_elem_addr(r, *old_head, esize, n, src1, n1, src2);
> > +
> > +	if (available != NULL)
> > +		*available = entries - n;
> > +
> > +end:
> > +	return n;
> > +}
> > +
> > +/**
> > + * @internal Consume previously reserved ring elements (for dequeue)
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param old_head
> > + *   Consumer's head index before reservation.
> > + * @param new_head
> > + *   Consumer's head index after reservation.
> > + * @param is_sc
> > + *   Indicates whether to use single consumer or multi-consumer head
> update
> > + */
> > +static __rte_always_inline void
> > +__rte_ring_do_dequeue_elem_commit(struct rte_ring *r,
> > +		unsigned int old_head, unsigned int new_head,
> > +		unsigned int is_sc)
> > +{
> 
> I think it is a bit dangerous and error-prone approach to let user specify
> old_head/new_head manually.
old_head and new_head are local to the thread in the non-serial MP/MC case. Hence, they need to be returned back to the caller.

> Seems better just _commit(ring, num) - see above.
This would not work for non-serial cases.

> That way suer don't have to calculate new head mannualy,
I do not understand the 'calculate' part. The user has to provide the same values that were returned.

> plus we can have a check that ring.tail - ring.head >= num.
> 
> > +	update_tail(&r->cons, old_head, new_head, is_sc, 1);
> 
> I think update_tail() is not enough here.
> As in some cases we need to update  ring.head also:
> let say user reserved 2 elems, but then decided to commit only one.
This patch does not address that use case. Do you see use cases for this?

> So I think we need a special new function instead of update_tail() here.
> BTW, in HTS I use atomic 64-bit read/write to get/set both head and tail in
> one go.
> This is not really required - two 32bit ops would work too, I think.
> As usual, both ways have some pros and cons:
> using 64bit ops might be faster on 64-bit target, plus it is less error prone (no
> need to think about head/tail read/write ordering, etc.), though for 32-bit
> target it would mean some extra overhead.
> 
> > +}
> > +
> > +/**
> > + * Reserve one element on a ring for dequeue. This function blocks if
> > +there
> > + * are elements reserved already. Application must call
> > + * 'rte_ring_do_dequeue_elem_commit' or
> > + * `rte_ring_do_dequeue_elem_revert_serial' to complete the dequeue
> operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param esize
> > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > + *   This must be the same value used while creating the ring. Otherwise
> > + *   the results are undefined.
> > + * @param old_head
> > + *   Consumer's head index before reservation. The same should be passed
> to
> > + *   'rte_ring_dequeue_elem_commit' function.
> > + * @param new_head
> > + *   Consumer's head index after reservation. The same should be passed to
> > + *   'rte_ring_dequeue_elem_commit' function.
> > + * @param available
> > + *   returns the number of remaining ring elements after the reservation
> > + *   It is not updated if the number of reserved elements is zero.
> > + * @param src
> > + *   Pointer to location in the ring to copy the data from.
> > + * @return
> > + *   - 0: Success; elements reserved
> > + *   - -ENOBUFS: Not enough room in the ring; no element is reserved.
> > + */
> > +static __rte_always_inline int
> > +rte_ring_dequeue_elem_reserve_serial(struct rte_ring *r, unsigned int
> esize,
> > +		unsigned int *old_head, unsigned int *new_head,
> > +		unsigned int *available, void **src) {
> > +	unsigned int n;
> > +
> > +	return __rte_ring_do_dequeue_elem_reserve_serial(r, esize, 1,
> > +			RTE_RING_QUEUE_FIXED, r->cons.single, old_head,
> > +			new_head, available, src, &n, NULL) ? 0 : -ENOBUFS; }
> > +
> > +/**
> > + * Consume previously reserved elements (for dequeue) in a ring
> > + * (multi-consumer safe).
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param old_head
> > + *   Consumer's head index before reservation. This value was returned
> > + *   when the API 'rte_ring_dequeue_elem_reserve_xxx' was called.
> > + * @param new_head
> > + *   Consumer's head index after reservation. This value was returned
> > + *   when the API 'rte_ring_dequeue_elem_reserve_xxx' was called.
> > + */
> > +static __rte_always_inline void
> > +rte_ring_dequeue_elem_commit(struct rte_ring *r, unsigned int old_head,
> > +		unsigned int new_head)
> > +{
> > +	__rte_ring_do_dequeue_elem_commit(r, old_head, new_head,
> > +						r->cons.single);
> > +}
> > +
> > +/**
> > + * Discard previously reserved elements (for dequeue) in a ring.
> > + *
> > + * @warning
> > + * This API can be called only if the ring elements were reserved
> > + * using 'rte_ring_dequeue_xxx_elem_reserve_serial' APIs.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + */
> > +static __rte_always_inline void
> > +rte_ring_dequeue_elem_revert_serial(struct rte_ring *r) {
> > +	__rte_ring_revert_head(&r->cons);
> > +}
> > +
> > +#ifdef __cplusplus
> > +}
> > +#endif
> > +
> > +#endif /* _RTE_RING_ELEM_SG_H_ */
> > diff --git a/lib/librte_ring/rte_ring_generic.h
> > b/lib/librte_ring/rte_ring_generic.h
> > index 953cdbbd5..8d7a7ffcc 100644
> > --- a/lib/librte_ring/rte_ring_generic.h
> > +++ b/lib/librte_ring/rte_ring_generic.h
> > @@ -170,4 +170,97 @@ __rte_ring_move_cons_head(struct rte_ring *r,
> unsigned int is_sc,
> >  	return n;
> >  }
> >
> > +/**
> > + * @internal This function updates the consumer head if there are no
> > + * prior reserved elements on the ring.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure
> > + * @param is_sc
> > + *   Indicates whether multi-consumer path is needed or not
> > + * @param n
> > + *   The number of elements we will want to dequeue, i.e. how far should
> the
> > + *   head be moved
> > + * @param behavior
> > + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a
> ring
> > + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from
> ring
> > + * @param old_head
> > + *   Returns head value as it was before the move, i.e. where dequeue
> starts
> > + * @param new_head
> > + *   Returns the current/new head value i.e. where dequeue finishes
> > + * @param entries
> > + *   Returns the number of entries in the ring BEFORE head was moved
> > + * @return
> > + *   - Actual number of objects dequeued.
> > + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_move_cons_head_serial(struct rte_ring *r, unsigned int is_sc,
> > +		unsigned int n, enum rte_ring_queue_behavior behavior,
> > +		uint32_t *old_head, uint32_t *new_head,
> > +		uint32_t *entries)
> > +{
> > +	unsigned int max = n;
> > +	int success;
> > +
> > +	/* move cons.head atomically */
> > +	do {
> > +		/* Restore n as it may change every loop */
> > +		n = max;
> > +
> > +		*old_head = r->cons.head;
> > +
> > +		/* add rmb barrier to avoid load/load reorder in weak
> > +		 * memory model. It is noop on x86
> > +		 */
> > +		rte_smp_rmb();
> > +
> > +		/* Ensure that cons.tail and cons.head are the same */
> > +		if (*old_head != r->cons.tail) {
> > +			rte_pause();
> > +
> > +			success = 0;
> > +			continue;
> > +		}
> > +
> > +		/* The subtraction is done between two unsigned 32bits value
> > +		 * (the result is always modulo 32 bits even if we have
> > +		 * cons_head > prod_tail). So 'entries' is always between 0
> > +		 * and size(ring)-1.
> > +		 */
> > +		*entries = (r->prod.tail - *old_head);
> > +
> > +		/* Set the actual entries for dequeue */
> > +		if (n > *entries)
> > +			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 :
> *entries;
> > +
> > +		if (unlikely(n == 0))
> > +			return 0;
> > +
> > +		*new_head = *old_head + n;
> > +		if (is_sc) {
> > +			r->cons.head = *new_head;
> > +			rte_smp_rmb();
> > +			success = 1;
> 
> I don't think we need to worry about SC case in this function.
> For sc(/sp)_serial (if we need such mode) - we probably can use normal
> move_(cons/prod)_head().
Agree

> 
> > +		} else {
> > +			success = rte_atomic32_cmpset(&r->cons.head,
> *old_head,
> > +					*new_head);
> > +		}
> > +	} while (unlikely(success == 0));
> > +	return n;
> > +}
> > +
> > +/**
> > + * @internal This function updates the head to match the tail
> > + *
> > + * @param ht
> > + *   A pointer to the ring's head-tail structure
> > + */
> > +static __rte_always_inline void
> > +__rte_ring_revert_head(struct rte_ring_headtail *ht) {
> > +	/* Discard the reserved ring elements. */
> > +	ht->head = ht->tail;
> > +}
> > +
> >  #endif /* _RTE_RING_GENERIC_H_ */
> > --
> > 2.17.1
Konstantin Ananyev March 2, 2020, 6:20 p.m. UTC | #4
> > > +/**
> > > + * @internal Reserve ring elements to enqueue several objects on the
> > > +ring
> > > + *
> > > + * @param r
> > > + *   A pointer to the ring structure.
> > > + * @param esize
> > > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > > + *   This must be the same value used while creating the ring. Otherwise
> > > + *   the results are undefined.
> > > + * @param n
> > > + *   The number of elements to reserve in the ring.
> > > + * @param behavior
> > > + *   RTE_RING_QUEUE_FIXED:    Reserve a fixed number of elements from a
> > ring
> > > + *   RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible
> > from ring
> > > + * @param is_sp
> > > + *   Indicates whether to use single producer or multi-producer reserve
> > > + * @param old_head
> > > + *   Producer's head index before reservation.
> > > + * @param new_head
> > > + *   Producer's head index after reservation.
> > > + * @param free_space
> > > + *   returns the amount of space after the reserve operation has finished.
> > > + *   It is not updated if the number of reserved elements is zero.
> > > + * @param dst1
> > > + *   Pointer to location in the ring to copy the data.
> > > + * @param n1
> > > + *   Number of elements to copy at dst1
> > > + * @param dst2
> > > + *   In case of ring wrap around, this pointer provides the location to
> > > + *   copy the remaining elements. The number of elements to copy at this
> > > + *   location is equal to (number of elements reserved - n1)
> > > + * @return
> > > + *   Actual number of elements reserved.
> > > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > > + */
> > > +static __rte_always_inline unsigned int
> > > +__rte_ring_do_enqueue_elem_reserve(struct rte_ring *r, unsigned int
> > > +esize,
> >
> >
> > I do understand the purpose of reserve, then either commit/abort for serial
> > sync mode, but what is the purpose of non-serial version of reserve/commit?
> In RCU, I have the need for scatter-gather feature. i.e. the data in the ring element is coming from multiple sources ('token' is generated by
> the RCU library and the application provides additional data). If I do not provide the reserve/commit, I need to introduce an intermediate
> memcpy to get these two data contiguously to copy to the ring element. The sequence is 'reserve(1), memcpy1, mempcy2, commit(1)'.
> Hence, you do not see the abort API for the enqueue.
> 
> > In serial  MP/MC case, after _reserve_(n) you always have to do
> > _commit_(n) - you can't reduce number of elements, or do _abort_.
> Agree, the intention here is to provide the scatter/gather feature.
> 
> > Again you cannot avoid memcpy(n) here anyhow.
> > So what is the point of these functions for non-serial case?
> It avoids an intermediate memcpy when the data is coming from multiple sources.

Ok, I think I understand what was my confusion:
Your intention:
 1) reserve/commit for both serial and non-serial mode -
     to allow user get/set contents of the ring manually and avoid
     intermediate load/stores.
2) abort only for serial mode.  

My intention:
1) commit/reserve/abort only for serial case
    (as that's the only mode where we can commit less
     then was reserved or do abort).
2) get/set of ring contents are done as part of either
    reserve(for dequeue) or commit(for enqueue) API calls
    (no scatter-gather ability).

I still think that this new API you suggest creates too
big exposure of ring internals, and makes it less 'safe-to-use':
- it provides direct access to contents of the ring.
- user has to specify head/tail values directly.

So in case of some programmatic error in related user code, 
there are less chances it could be catch-up by API,
and we can easily end-up with silent memory corruption
and other nasty things that would be hard to catch/reproduce.

That makes me wonder how critical is this scatter-gather ability
in terms of overall RCU performance? 
Is the gain provided really that significant, especially if you'll update the
ring by one element at a time? 
  
> 
> >
> > BTW, I think it would be good to have serial version of _enqueue_ too.
> If there is a good use case, they should be provided. I did not come across a good use case.
> 
> >
> > > +		unsigned int n, enum rte_ring_queue_behavior behavior,
> > > +		unsigned int is_sp, unsigned int *old_head,
> > > +		unsigned int *new_head, unsigned int *free_space,
> > > +		void **dst1, unsigned int *n1, void **dst2)
> >
> > I do understand the intention to avoid memcpy(), but proposed API seems
> > overcomplicated, error prone, and not very convenient for the user.
> The issue is the need to handle the wrap around in ring storage array. i.e. when the space is reserved for more than 1 ring element, the wrap
> around might happen.
> 
> > I don't think that avoiding memcpy() will save us that many cycles here, so
> This depends on the amount of data being copied.
> 
> > probably better to keep API model a bit more regular:
> >
> > n = rte_ring_mp_serial_enqueue_bulk_reserve(ring, num, &free_space); ...
> > /* performs actual memcpy(), m<=n */
> > rte_ring_mp_serial_enqueue_bulk_commit(ring, obj,  m);
> These do not take care of the wrap-around case or I am not able to understand your comment.

I meant that serial_enqueue_commit() will do both:
actual copy of elements to the ring and tail update (no Scatter-Gather), see above.
Honnappa Nagarahalli March 4, 2020, 11:21 p.m. UTC | #5
<snip>

> 
> > > > +/**
> > > > + * @internal Reserve ring elements to enqueue several objects on
> > > > +the ring
> > > > + *
> > > > + * @param r
> > > > + *   A pointer to the ring structure.
> > > > + * @param esize
> > > > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > > > + *   This must be the same value used while creating the ring.
> Otherwise
> > > > + *   the results are undefined.
> > > > + * @param n
> > > > + *   The number of elements to reserve in the ring.
> > > > + * @param behavior
> > > > + *   RTE_RING_QUEUE_FIXED:    Reserve a fixed number of elements
> from a
> > > ring
> > > > + *   RTE_RING_QUEUE_VARIABLE: Reserve as many elements as
> possible
> > > from ring
> > > > + * @param is_sp
> > > > + *   Indicates whether to use single producer or multi-producer reserve
> > > > + * @param old_head
> > > > + *   Producer's head index before reservation.
> > > > + * @param new_head
> > > > + *   Producer's head index after reservation.
> > > > + * @param free_space
> > > > + *   returns the amount of space after the reserve operation has
> finished.
> > > > + *   It is not updated if the number of reserved elements is zero.
> > > > + * @param dst1
> > > > + *   Pointer to location in the ring to copy the data.
> > > > + * @param n1
> > > > + *   Number of elements to copy at dst1
> > > > + * @param dst2
> > > > + *   In case of ring wrap around, this pointer provides the location to
> > > > + *   copy the remaining elements. The number of elements to copy at
> this
> > > > + *   location is equal to (number of elements reserved - n1)
> > > > + * @return
> > > > + *   Actual number of elements reserved.
> > > > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > > > + */
> > > > +static __rte_always_inline unsigned int
> > > > +__rte_ring_do_enqueue_elem_reserve(struct rte_ring *r, unsigned
> > > > +int esize,
> > >
> > >
> > > I do understand the purpose of reserve, then either commit/abort for
> > > serial sync mode, but what is the purpose of non-serial version of
> reserve/commit?
> > In RCU, I have the need for scatter-gather feature. i.e. the data in
> > the ring element is coming from multiple sources ('token' is generated
> > by the RCU library and the application provides additional data). If I do not
> provide the reserve/commit, I need to introduce an intermediate memcpy to
> get these two data contiguously to copy to the ring element. The sequence is
> 'reserve(1), memcpy1, mempcy2, commit(1)'.
> > Hence, you do not see the abort API for the enqueue.
> >
> > > In serial  MP/MC case, after _reserve_(n) you always have to do
> > > _commit_(n) - you can't reduce number of elements, or do _abort_.
> > Agree, the intention here is to provide the scatter/gather feature.
> >
> > > Again you cannot avoid memcpy(n) here anyhow.
> > > So what is the point of these functions for non-serial case?
> > It avoids an intermediate memcpy when the data is coming from multiple
> sources.
> 
> Ok, I think I understand what was my confusion:
Yes, the following understanding is correct.

> Your intention:
>  1) reserve/commit for both serial and non-serial mode -
>      to allow user get/set contents of the ring manually and avoid
>      intermediate load/stores.
> 2) abort only for serial mode.
> 
> My intention:
> 1) commit/reserve/abort only for serial case
>     (as that's the only mode where we can commit less
>      then was reserved or do abort).
I do not know if there is a requirement on committing less than reserved. I think, if the size of commit is not known during reservation, may be the reservation can be delayed till it is known.
If there is no requirement to commit less than reserved, then I do not see a need for serial APIs for enqueue operation.

> 2) get/set of ring contents are done as part of either
>     reserve(for dequeue) or commit(for enqueue) API calls
>     (no scatter-gather ability).
> 
> I still think that this new API you suggest creates too big exposure of ring
> internals, and makes it less 'safe-to-use':
> - it provides direct access to contents of the ring.
> - user has to specify head/tail values directly.
It is some what complex. But, with the support of user defined element size, I think it becomes necessary to support scatter gather feature (since it is not a single pointer that will be stored).

> 
> So in case of some programmatic error in related user code, there are less
> chances it could be catch-up by API, and we can easily end-up with silent
> memory corruption and other nasty things that would be hard to
> catch/reproduce.
> 
> That makes me wonder how critical is this scatter-gather ability in terms of
> overall RCU performance?
> Is the gain provided really that significant, especially if you'll update the ring
> by one element at a time?
For RCU, it is 64b token and the size of the user data. Not sure how much difference it will make.
I can drop the scatter gather requirement for now.

> 
> >
> > >
> > > BTW, I think it would be good to have serial version of _enqueue_ too.
> > If there is a good use case, they should be provided. I did not come across a
> good use case.
> >
> > >
> > > > +		unsigned int n, enum rte_ring_queue_behavior behavior,
> > > > +		unsigned int is_sp, unsigned int *old_head,
> > > > +		unsigned int *new_head, unsigned int *free_space,
> > > > +		void **dst1, unsigned int *n1, void **dst2)
> > >
> > > I do understand the intention to avoid memcpy(), but proposed API
> > > seems overcomplicated, error prone, and not very convenient for the user.
> > The issue is the need to handle the wrap around in ring storage array.
> > i.e. when the space is reserved for more than 1 ring element, the wrap
> around might happen.
> >
> > > I don't think that avoiding memcpy() will save us that many cycles
> > > here, so
> > This depends on the amount of data being copied.
> >
> > > probably better to keep API model a bit more regular:
> > >
> > > n = rte_ring_mp_serial_enqueue_bulk_reserve(ring, num, &free_space); ...
> > > /* performs actual memcpy(), m<=n */
> > > rte_ring_mp_serial_enqueue_bulk_commit(ring, obj,  m);
> > These do not take care of the wrap-around case or I am not able to
> understand your comment.
> 
> I meant that serial_enqueue_commit() will do both:
> actual copy of elements to the ring and tail update (no Scatter-Gather), see
> above.
RCU does not require the serial enqueue APIs, do you have any use case?
Konstantin Ananyev March 5, 2020, 6:26 p.m. UTC | #6
> >
> > > > > +/**
> > > > > + * @internal Reserve ring elements to enqueue several objects on
> > > > > +the ring
> > > > > + *
> > > > > + * @param r
> > > > > + *   A pointer to the ring structure.
> > > > > + * @param esize
> > > > > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > > > > + *   This must be the same value used while creating the ring.
> > Otherwise
> > > > > + *   the results are undefined.
> > > > > + * @param n
> > > > > + *   The number of elements to reserve in the ring.
> > > > > + * @param behavior
> > > > > + *   RTE_RING_QUEUE_FIXED:    Reserve a fixed number of elements
> > from a
> > > > ring
> > > > > + *   RTE_RING_QUEUE_VARIABLE: Reserve as many elements as
> > possible
> > > > from ring
> > > > > + * @param is_sp
> > > > > + *   Indicates whether to use single producer or multi-producer reserve
> > > > > + * @param old_head
> > > > > + *   Producer's head index before reservation.
> > > > > + * @param new_head
> > > > > + *   Producer's head index after reservation.
> > > > > + * @param free_space
> > > > > + *   returns the amount of space after the reserve operation has
> > finished.
> > > > > + *   It is not updated if the number of reserved elements is zero.
> > > > > + * @param dst1
> > > > > + *   Pointer to location in the ring to copy the data.
> > > > > + * @param n1
> > > > > + *   Number of elements to copy at dst1
> > > > > + * @param dst2
> > > > > + *   In case of ring wrap around, this pointer provides the location to
> > > > > + *   copy the remaining elements. The number of elements to copy at
> > this
> > > > > + *   location is equal to (number of elements reserved - n1)
> > > > > + * @return
> > > > > + *   Actual number of elements reserved.
> > > > > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > > > > + */
> > > > > +static __rte_always_inline unsigned int
> > > > > +__rte_ring_do_enqueue_elem_reserve(struct rte_ring *r, unsigned
> > > > > +int esize,
> > > >
> > > >
> > > > I do understand the purpose of reserve, then either commit/abort for
> > > > serial sync mode, but what is the purpose of non-serial version of
> > reserve/commit?
> > > In RCU, I have the need for scatter-gather feature. i.e. the data in
> > > the ring element is coming from multiple sources ('token' is generated
> > > by the RCU library and the application provides additional data). If I do not
> > provide the reserve/commit, I need to introduce an intermediate memcpy to
> > get these two data contiguously to copy to the ring element. The sequence is
> > 'reserve(1), memcpy1, mempcy2, commit(1)'.
> > > Hence, you do not see the abort API for the enqueue.
> > >
> > > > In serial  MP/MC case, after _reserve_(n) you always have to do
> > > > _commit_(n) - you can't reduce number of elements, or do _abort_.
> > > Agree, the intention here is to provide the scatter/gather feature.
> > >
> > > > Again you cannot avoid memcpy(n) here anyhow.
> > > > So what is the point of these functions for non-serial case?
> > > It avoids an intermediate memcpy when the data is coming from multiple
> > sources.
> >
> > Ok, I think I understand what was my confusion:
> Yes, the following understanding is correct.
> 
> > Your intention:
> >  1) reserve/commit for both serial and non-serial mode -
> >      to allow user get/set contents of the ring manually and avoid
> >      intermediate load/stores.
> > 2) abort only for serial mode.
> >
> > My intention:
> > 1) commit/reserve/abort only for serial case
> >     (as that's the only mode where we can commit less
> >      then was reserved or do abort).
> I do not know if there is a requirement on committing less than reserved.

From my perspective, that's a necessary part of peek functionality.
revert/abort function you introduced below is just one special case of it.
Having just abort is enough when you processing elements in the ring one by one,
but not sufficient if someone would try to operate in bulks.
Let say you read (reserved) N objects from the ring, inspected them
and found that first M (<N) are ok to be removed from the ring,
others should remain.

> I think, if the size of commit is not known during reservation,
> may be the reservation can be delayed till it is known.

In some cases, you do know how much you'd like to commit,
but you can't guarantee that you can commit that much,
till you inspect contents of reserved elems.  

> If there is no requirement to commit less than reserved, then I do not see a need for serial APIs for enqueue operation.

> 
> > 2) get/set of ring contents are done as part of either
> >     reserve(for dequeue) or commit(for enqueue) API calls
> >     (no scatter-gather ability).
> >
> > I still think that this new API you suggest creates too big exposure of ring
> > internals, and makes it less 'safe-to-use':
> > - it provides direct access to contents of the ring.
> > - user has to specify head/tail values directly.
> It is some what complex. But, with the support of user defined element size, I think it becomes necessary to support scatter gather
> feature (since it is not a single pointer that will be stored).

I suppose to see the real benefit from scatter-gather, we need a scenario
where there are relatively big elems in the ring (32B+ or so),
plus enqueue/dequeue done in bulks.
If you really  envision such use case - I am ok to consider scatter-gather API too,
but I think it shouldn't be the only available API for serial mode.
Might be we can have 'normal' enqueue/dequeue API for serial mode
(actual copy done internally in ring functions, head/tail values are not exposed directly),
plus SG API as addon for some special cases.  

> >
> > So in case of some programmatic error in related user code, there are less
> > chances it could be catch-up by API, and we can easily end-up with silent
> > memory corruption and other nasty things that would be hard to
> > catch/reproduce.
> >
> > That makes me wonder how critical is this scatter-gather ability in terms of
> > overall RCU performance?
> > Is the gain provided really that significant, especially if you'll update the ring
> > by one element at a time?
> For RCU, it is 64b token and the size of the user data. Not sure how much difference it will make.
> I can drop the scatter gather requirement for now.
> 
> >
> > >
> > > >
> > > > BTW, I think it would be good to have serial version of _enqueue_ too.
> > > If there is a good use case, they should be provided. I did not come across a
> > good use case.
> > >
> > > >
> > > > > +		unsigned int n, enum rte_ring_queue_behavior behavior,
> > > > > +		unsigned int is_sp, unsigned int *old_head,
> > > > > +		unsigned int *new_head, unsigned int *free_space,
> > > > > +		void **dst1, unsigned int *n1, void **dst2)
> > > >
> > > > I do understand the intention to avoid memcpy(), but proposed API
> > > > seems overcomplicated, error prone, and not very convenient for the user.
> > > The issue is the need to handle the wrap around in ring storage array.
> > > i.e. when the space is reserved for more than 1 ring element, the wrap
> > around might happen.
> > >
> > > > I don't think that avoiding memcpy() will save us that many cycles
> > > > here, so
> > > This depends on the amount of data being copied.
> > >
> > > > probably better to keep API model a bit more regular:
> > > >
> > > > n = rte_ring_mp_serial_enqueue_bulk_reserve(ring, num, &free_space); ...
> > > > /* performs actual memcpy(), m<=n */
> > > > rte_ring_mp_serial_enqueue_bulk_commit(ring, obj,  m);
> > > These do not take care of the wrap-around case or I am not able to
> > understand your comment.
> >
> > I meant that serial_enqueue_commit() will do both:
> > actual copy of elements to the ring and tail update (no Scatter-Gather), see
> > above.
> RCU does not require the serial enqueue APIs, do you have any use case?

I agree that serial dequeue seems to have more usages then enqueue.
Though I still can name at least two cases for enqueue, from top of my head:
1. serial mode (both enqueue/dequeue) helps to mitigate ring slowdown 
overcommitted scenarios, see RFC I submitted:
http://patches.dpdk.org/cover/66001/
2. any intermediate node when you have pop/push from/to some external queue,
and enqueue/dequeue to/from the ring, would like to avoid any elem
drops in between, and by some reason don't want your own intermediate bufferization.
Let say:
dequeue_from_ring -> tx_burst/cryptodev_enqueue
rx_burst/cryptodev_dequeue -> enqueue_to_ring

Plus as enqueue/dequeue are sort of mirror, I think it is good to have both identical.
Honnappa Nagarahalli March 25, 2020, 8:43 p.m. UTC | #7
<snip>

> 
> > >
> > > > > > +/**
> > > > > > + * @internal Reserve ring elements to enqueue several objects
> > > > > > +on the ring
> > > > > > + *
> > > > > > + * @param r
> > > > > > + *   A pointer to the ring structure.
> > > > > > + * @param esize
> > > > > > + *   The size of ring element, in bytes. It must be a multiple of 4.
> > > > > > + *   This must be the same value used while creating the ring.
> > > Otherwise
> > > > > > + *   the results are undefined.
> > > > > > + * @param n
> > > > > > + *   The number of elements to reserve in the ring.
> > > > > > + * @param behavior
> > > > > > + *   RTE_RING_QUEUE_FIXED:    Reserve a fixed number of
> elements
> > > from a
> > > > > ring
> > > > > > + *   RTE_RING_QUEUE_VARIABLE: Reserve as many elements as
> > > possible
> > > > > from ring
> > > > > > + * @param is_sp
> > > > > > + *   Indicates whether to use single producer or multi-producer
> reserve
> > > > > > + * @param old_head
> > > > > > + *   Producer's head index before reservation.
> > > > > > + * @param new_head
> > > > > > + *   Producer's head index after reservation.
> > > > > > + * @param free_space
> > > > > > + *   returns the amount of space after the reserve operation has
> > > finished.
> > > > > > + *   It is not updated if the number of reserved elements is zero.
> > > > > > + * @param dst1
> > > > > > + *   Pointer to location in the ring to copy the data.
> > > > > > + * @param n1
> > > > > > + *   Number of elements to copy at dst1
> > > > > > + * @param dst2
> > > > > > + *   In case of ring wrap around, this pointer provides the location
> to
> > > > > > + *   copy the remaining elements. The number of elements to copy
> at
> > > this
> > > > > > + *   location is equal to (number of elements reserved - n1)
> > > > > > + * @return
> > > > > > + *   Actual number of elements reserved.
> > > > > > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > > > > > + */
> > > > > > +static __rte_always_inline unsigned int
> > > > > > +__rte_ring_do_enqueue_elem_reserve(struct rte_ring *r,
> > > > > > +unsigned int esize,
> > > > >
> > > > >
> > > > > I do understand the purpose of reserve, then either commit/abort
> > > > > for serial sync mode, but what is the purpose of non-serial
> > > > > version of
> > > reserve/commit?
> > > > In RCU, I have the need for scatter-gather feature. i.e. the data
> > > > in the ring element is coming from multiple sources ('token' is
> > > > generated by the RCU library and the application provides
> > > > additional data). If I do not
> > > provide the reserve/commit, I need to introduce an intermediate
> > > memcpy to get these two data contiguously to copy to the ring
> > > element. The sequence is 'reserve(1), memcpy1, mempcy2, commit(1)'.
> > > > Hence, you do not see the abort API for the enqueue.
> > > >
> > > > > In serial  MP/MC case, after _reserve_(n) you always have to do
> > > > > _commit_(n) - you can't reduce number of elements, or do _abort_.
> > > > Agree, the intention here is to provide the scatter/gather feature.
> > > >
> > > > > Again you cannot avoid memcpy(n) here anyhow.
> > > > > So what is the point of these functions for non-serial case?
> > > > It avoids an intermediate memcpy when the data is coming from
> > > > multiple
> > > sources.
> > >
> > > Ok, I think I understand what was my confusion:
> > Yes, the following understanding is correct.
> >
> > > Your intention:
> > >  1) reserve/commit for both serial and non-serial mode -
> > >      to allow user get/set contents of the ring manually and avoid
> > >      intermediate load/stores.
> > > 2) abort only for serial mode.
> > >
> > > My intention:
> > > 1) commit/reserve/abort only for serial case
> > >     (as that's the only mode where we can commit less
> > >      then was reserved or do abort).
> > I do not know if there is a requirement on committing less than reserved.
> 
> From my perspective, that's a necessary part of peek functionality.
> revert/abort function you introduced below is just one special case of it.
> Having just abort is enough when you processing elements in the ring one by
> one, but not sufficient if someone would try to operate in bulks.
> Let say you read (reserved) N objects from the ring, inspected them and
> found that first M (<N) are ok to be removed from the ring, others should
> remain.
Agree, it makes sense from a dequeue perspective. Does it make sense from enqueue perspective?

> 
> > I think, if the size of commit is not known during reservation, may be
> > the reservation can be delayed till it is known.
> 
> In some cases, you do know how much you'd like to commit, but you can't
> guarantee that you can commit that much, till you inspect contents of
> reserved elems.
The above comment was from enqueue perspective.

> 
> > If there is no requirement to commit less than reserved, then I do not see a
> need for serial APIs for enqueue operation.
> 
> >
> > > 2) get/set of ring contents are done as part of either
> > >     reserve(for dequeue) or commit(for enqueue) API calls
> > >     (no scatter-gather ability).
> > >
> > > I still think that this new API you suggest creates too big exposure
> > > of ring internals, and makes it less 'safe-to-use':
> > > - it provides direct access to contents of the ring.
> > > - user has to specify head/tail values directly.
> > It is some what complex. But, with the support of user defined element
> > size, I think it becomes necessary to support scatter gather feature (since it
> is not a single pointer that will be stored).
> 
> I suppose to see the real benefit from scatter-gather, we need a scenario
> where there are relatively big elems in the ring (32B+ or so), plus
> enqueue/dequeue done in bulks.
> If you really  envision such use case - I am ok to consider scatter-gather API
> too, but I think it shouldn't be the only available API for serial mode.
> Might be we can have 'normal' enqueue/dequeue API for serial mode (actual
> copy done internally in ring functions, head/tail values are not exposed
> directly), plus SG API as addon for some special cases.
I will try to run some benchmarks and take a decision on if SG makes an impact on RCU defer APIs.

> 
> > >
> > > So in case of some programmatic error in related user code, there
> > > are less chances it could be catch-up by API, and we can easily
> > > end-up with silent memory corruption and other nasty things that
> > > would be hard to catch/reproduce.
> > >
> > > That makes me wonder how critical is this scatter-gather ability in
> > > terms of overall RCU performance?
> > > Is the gain provided really that significant, especially if you'll
> > > update the ring by one element at a time?
> > For RCU, it is 64b token and the size of the user data. Not sure how much
> difference it will make.
> > I can drop the scatter gather requirement for now.
> >
> > >
> > > >
> > > > >
> > > > > BTW, I think it would be good to have serial version of _enqueue_ too.
> > > > If there is a good use case, they should be provided. I did not
> > > > come across a
> > > good use case.
> > > >
> > > > >
> > > > > > +		unsigned int n, enum rte_ring_queue_behavior
> behavior,
> > > > > > +		unsigned int is_sp, unsigned int *old_head,
> > > > > > +		unsigned int *new_head, unsigned int *free_space,
> > > > > > +		void **dst1, unsigned int *n1, void **dst2)
> > > > >
> > > > > I do understand the intention to avoid memcpy(), but proposed
> > > > > API seems overcomplicated, error prone, and not very convenient for
> the user.
> > > > The issue is the need to handle the wrap around in ring storage array.
> > > > i.e. when the space is reserved for more than 1 ring element, the
> > > > wrap
> > > around might happen.
> > > >
> > > > > I don't think that avoiding memcpy() will save us that many
> > > > > cycles here, so
> > > > This depends on the amount of data being copied.
> > > >
> > > > > probably better to keep API model a bit more regular:
> > > > >
> > > > > n = rte_ring_mp_serial_enqueue_bulk_reserve(ring, num,
> &free_space); ...
> > > > > /* performs actual memcpy(), m<=n */
> > > > > rte_ring_mp_serial_enqueue_bulk_commit(ring, obj,  m);
> > > > These do not take care of the wrap-around case or I am not able to
> > > understand your comment.
> > >
> > > I meant that serial_enqueue_commit() will do both:
> > > actual copy of elements to the ring and tail update (no
> > > Scatter-Gather), see above.
> > RCU does not require the serial enqueue APIs, do you have any use case?
> 
> I agree that serial dequeue seems to have more usages then enqueue.
> Though I still can name at least two cases for enqueue, from top of my head:
> 1. serial mode (both enqueue/dequeue) helps to mitigate ring slowdown
> overcommitted scenarios, see RFC I submitted:
> http://patches.dpdk.org/cover/66001/
> 2. any intermediate node when you have pop/push from/to some external
> queue, and enqueue/dequeue to/from the ring, would like to avoid any
> elem drops in between, and by some reason don't want your own
> intermediate bufferization.
> Let say:
> dequeue_from_ring -> tx_burst/cryptodev_enqueue
> rx_burst/cryptodev_dequeue -> enqueue_to_ring
> 
> Plus as enqueue/dequeue are sort of mirror, I think it is good to have both
> identical.
Ok, agreed. I think we need to allow for combination of APIs to be used. i.e. MP enqueue and serialization on dequeue.

> 
>

Patch
diff mbox series

diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
index 917c560ad..824e4a9bb 100644
--- a/lib/librte_ring/Makefile
+++ b/lib/librte_ring/Makefile
@@ -17,6 +17,7 @@  SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
 # install includes
 SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
 					rte_ring_elem.h \
+					rte_ring_elem_sg.h \
 					rte_ring_generic.h \
 					rte_ring_c11_mem.h
 
diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build
index f2f3ccc88..30115ad7c 100644
--- a/lib/librte_ring/meson.build
+++ b/lib/librte_ring/meson.build
@@ -4,6 +4,7 @@ 
 sources = files('rte_ring.c')
 headers = files('rte_ring.h',
 		'rte_ring_elem.h',
+		'rte_ring_elem_sg.h',
 		'rte_ring_c11_mem.h',
 		'rte_ring_generic.h')
 
diff --git a/lib/librte_ring/rte_ring_c11_mem.h b/lib/librte_ring/rte_ring_c11_mem.h
index 0fb73a337..dcae8bcc0 100644
--- a/lib/librte_ring/rte_ring_c11_mem.h
+++ b/lib/librte_ring/rte_ring_c11_mem.h
@@ -178,4 +178,102 @@  __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
 	return n;
 }
 
+/**
+ * @internal This function updates the consumer head if there are no
+ * prior reserved elements on the ring.
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sc
+ *   Indicates whether multi-consumer path is needed or not
+ * @param n
+ *   The number of elements to dequeue, i.e. how far should the head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where dequeue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where dequeue finishes
+ * @param entries
+ *   Returns the number of entries in the ring BEFORE head was moved
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_move_cons_head_serial(struct rte_ring *r, int is_sc,
+		unsigned int n, enum rte_ring_queue_behavior behavior,
+		uint32_t *old_head, uint32_t *new_head,
+		uint32_t *entries)
+{
+	unsigned int max = n;
+	uint32_t prod_tail;
+	uint32_t cons_tail;
+	int success;
+
+	/* move cons.head atomically */
+	*old_head = __atomic_load_n(&r->cons.head, __ATOMIC_RELAXED);
+	do {
+		/* Restore n as it may change every loop */
+		n = max;
+
+		/* Load the cons.tail and ensure that it is the
+		 * same as cons.head. load-acquire synchronizes
+		 * with the store-release in update_tail.
+		 */
+		cons_tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE);
+		if (*old_head != cons_tail) {
+			rte_pause();
+			*old_head = __atomic_load_n(&r->cons.head,
+							__ATOMIC_RELAXED);
+			success = 0;
+			continue;
+		}
+
+		/* this load-acquire synchronize with store-release of ht->tail
+		 * in update_tail.
+		 */
+		prod_tail = __atomic_load_n(&r->prod.tail,
+					__ATOMIC_ACQUIRE);
+
+		/* The subtraction is done between two unsigned 32bits value
+		 * (the result is always modulo 32 bits even if we have
+		 * cons_head > prod_tail). So 'entries' is always between 0
+		 * and size(ring)-1.
+		 */
+		*entries = (prod_tail - *old_head);
+
+		/* Set the actual entries for dequeue */
+		if (n > *entries)
+			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+		if (unlikely(n == 0))
+			return 0;
+
+		*new_head = *old_head + n;
+		if (is_sc)
+			r->cons.head = *new_head, success = 1;
+		else
+			/* on failure, *old_head will be updated */
+			success = __atomic_compare_exchange_n(&r->cons.head,
+							old_head, *new_head,
+							0, __ATOMIC_RELAXED,
+							__ATOMIC_RELAXED);
+	} while (unlikely(success == 0));
+	return n;
+}
+
+/**
+ * @internal Discard reserved ring elements
+ *
+ * @param ht
+ *   A pointer to the ring's head-tail structure
+ */
+static __rte_always_inline void
+__rte_ring_revert_head(struct rte_ring_headtail *ht)
+{
+	__atomic_store_n(&ht->head, ht->tail, __ATOMIC_RELAXED);
+}
+
 #endif /* _RTE_RING_C11_MEM_H_ */
diff --git a/lib/librte_ring/rte_ring_elem_sg.h b/lib/librte_ring/rte_ring_elem_sg.h
new file mode 100644
index 000000000..a73f4fbfe
--- /dev/null
+++ b/lib/librte_ring/rte_ring_elem_sg.h
@@ -0,0 +1,417 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2020 Arm Limited
+ * Copyright (c) 2010-2017 Intel Corporation
+ * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
+ * All rights reserved.
+ * Derived from FreeBSD's bufring.h
+ * Used as BSD-3 Licensed with permission from Kip Macy.
+ */
+
+#ifndef _RTE_RING_ELEM_SG_H_
+#define _RTE_RING_ELEM_SG_H_
+
+/**
+ * @file
+ * RTE Ring with
+ * 1) user defined element size
+ * 2) scatter gather feature to copy objects to/from the ring
+ * 3) ability to reserve, consume/discard elements in the ring
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdio.h>
+#include <stdint.h>
+#include <string.h>
+#include <sys/queue.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_memzone.h>
+#include <rte_pause.h>
+
+#include "rte_ring.h"
+#include "rte_ring_elem.h"
+
+/* Between load and load. there might be cpu reorder in weak model
+ * (powerpc/arm).
+ * There are 2 choices for the users
+ * 1.use rmb() memory barrier
+ * 2.use one-direction load_acquire/store_release barrier,defined by
+ * CONFIG_RTE_USE_C11_MEM_MODEL=y
+ * It depends on performance test results.
+ * By default, move common functions to rte_ring_generic.h
+ */
+#ifdef RTE_USE_C11_MEM_MODEL
+#include "rte_ring_c11_mem.h"
+#else
+#include "rte_ring_generic.h"
+#endif
+
+static __rte_always_inline void
+__rte_ring_get_elem_addr_64(struct rte_ring *r, uint32_t head,
+	uint32_t num, void **dst1, uint32_t *n1, void **dst2)
+{
+	uint32_t idx = head & r->mask;
+	uint64_t *ring = (uint64_t *)&r[1];
+
+	*dst1 = ring + idx;
+	*n1 = num;
+
+	if (idx + num > r->size) {
+		*n1 = num - (r->size - idx - 1);
+		*dst2 = ring;
+	}
+}
+
+static __rte_always_inline void
+__rte_ring_get_elem_addr_128(struct rte_ring *r, uint32_t head,
+	uint32_t num, void **dst1, uint32_t *n1, void **dst2)
+{
+	uint32_t idx = head & r->mask;
+	rte_int128_t *ring = (rte_int128_t *)&r[1];
+
+	*dst1 = ring + idx;
+	*n1 = num;
+
+	if (idx + num > r->size) {
+		*n1 = num - (r->size - idx - 1);
+		*dst2 = ring;
+	}
+}
+
+static __rte_always_inline void
+__rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head,
+	uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2)
+{
+	if (esize == 8)
+		return __rte_ring_get_elem_addr_64(r, head,
+						num, dst1, n1, dst2);
+	else if (esize == 16)
+		return __rte_ring_get_elem_addr_128(r, head,
+						num, dst1, n1, dst2);
+	else {
+		uint32_t idx, scale, nr_idx;
+		uint32_t *ring = (uint32_t *)&r[1];
+
+		/* Normalize to uint32_t */
+		scale = esize / sizeof(uint32_t);
+		idx = head & r->mask;
+		nr_idx = idx * scale;
+
+		*dst1 = ring + nr_idx;
+		*n1 = num;
+
+		if (idx + num > r->size) {
+			*n1 = num - (r->size - idx - 1);
+			*dst2 = ring;
+		}
+	}
+}
+
+/**
+ * @internal Reserve ring elements to enqueue several objects on the ring
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of elements to reserve in the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Reserve a fixed number of elements from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible from ring
+ * @param is_sp
+ *   Indicates whether to use single producer or multi-producer reserve
+ * @param old_head
+ *   Producer's head index before reservation.
+ * @param new_head
+ *   Producer's head index after reservation.
+ * @param free_space
+ *   returns the amount of space after the reserve operation has finished.
+ *   It is not updated if the number of reserved elements is zero.
+ * @param dst1
+ *   Pointer to location in the ring to copy the data.
+ * @param n1
+ *   Number of elements to copy at dst1
+ * @param dst2
+ *   In case of ring wrap around, this pointer provides the location to
+ *   copy the remaining elements. The number of elements to copy at this
+ *   location is equal to (number of elements reserved - n1)
+ * @return
+ *   Actual number of elements reserved.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_enqueue_elem_reserve(struct rte_ring *r, unsigned int esize,
+		unsigned int n, enum rte_ring_queue_behavior behavior,
+		unsigned int is_sp, unsigned int *old_head,
+		unsigned int *new_head, unsigned int *free_space,
+		void **dst1, unsigned int *n1, void **dst2)
+{
+	uint32_t free_entries;
+
+	n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
+			old_head, new_head, &free_entries);
+
+	if (n == 0)
+		goto end;
+
+	__rte_ring_get_elem_addr(r, *old_head, esize, n, dst1, n1, dst2);
+
+	if (free_space != NULL)
+		*free_space = free_entries - n;
+
+end:
+	return n;
+}
+
+/**
+ * @internal Consume previously reserved ring elements (for enqueue)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param old_head
+ *   Producer's head index before reservation.
+ * @param new_head
+ *   Producer's head index after reservation.
+ * @param is_sp
+ *   Indicates whether to use single producer or multi-producer head update
+ */
+static __rte_always_inline void
+__rte_ring_do_enqueue_elem_commit(struct rte_ring *r,
+		unsigned int old_head, unsigned int new_head,
+		unsigned int is_sp)
+{
+	update_tail(&r->prod, old_head, new_head, is_sp, 1);
+}
+
+/**
+ * Reserve one element for enqueuing one object on a ring
+ * (multi-producers safe). Application must call
+ * 'rte_ring_mp_enqueue_elem_commit' to complete the enqueue operation.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param old_head
+ *   Producer's head index before reservation. The same should be passed to
+ *   'rte_ring_mp_enqueue_elem_commit' function.
+ * @param new_head
+ *   Producer's head index after reservation. The same should be passed to
+ *   'rte_ring_mp_enqueue_elem_commit' function.
+ * @param free_space
+ *   Returns the amount of space after the reservation operation has finished.
+ *   It is not updated if the number of reserved elements is zero.
+ * @param dst
+ *   Pointer to location in the ring to copy the data.
+ * @return
+ *   - 0: Success; objects enqueued.
+ *   - -ENOBUFS: Not enough room in the ring to reserve; no element is reserved.
+ */
+static __rte_always_inline int
+rte_ring_mp_enqueue_elem_reserve(struct rte_ring *r, unsigned int esize,
+		unsigned int *old_head, unsigned int *new_head,
+		unsigned int *free_space, void **dst)
+{
+	unsigned int n;
+
+	return __rte_ring_do_enqueue_elem_reserve(r, esize, 1,
+			RTE_RING_QUEUE_FIXED, 0, old_head, new_head,
+			free_space, dst, &n, NULL) ? 0 : -ENOBUFS;
+}
+
+/**
+ * Consume previously reserved elements (for enqueue) in a ring
+ * (multi-producers safe). This API completes the enqueue operation.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param old_head
+ *   Producer's head index before reservation. This value was returned
+ *   when the API 'rte_ring_mp_enqueue_elem_reserve' was called.
+ * @param new_head
+ *   Producer's head index after reservation. This value was returned
+ *   when the API 'rte_ring_mp_enqueue_elem_reserve' was called.
+ */
+static __rte_always_inline void
+rte_ring_mp_enqueue_elem_commit(struct rte_ring *r, unsigned int old_head,
+		unsigned int new_head)
+{
+	__rte_ring_do_enqueue_elem_commit(r, old_head, new_head, 0);
+}
+
+/**
+ * @internal Reserve elements to dequeue several objects on the ring.
+ * This function blocks if there are elements reserved already.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to reserve in the ring
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Reserve fixed number of elements in a ring
+ *   RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible in a ring
+ * @param is_sc
+ *   Indicates whether to use single consumer or multi-consumer head update
+ * @param old_head
+ *   Consumer's head index before reservation.
+ * @param new_head
+ *   Consumer's head index after reservation.
+ * @param available
+ *   returns the number of remaining ring elements after the reservation
+ *   It is not updated if the number of reserved elements is zero.
+ * @param src1
+ *   Pointer to location in the ring to copy the data from.
+ * @param n1
+ *   Number of elements to copy from src1
+ * @param src2
+ *   In case of wrap around in the ring, this pointer provides the location
+ *   to copy the remaining elements from. The number of elements to copy from
+ *   this pointer is equal to (number of elements reserved - n1)
+ * @return
+ *   Actual number of elements reserved.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_dequeue_elem_reserve_serial(struct rte_ring *r,
+		unsigned int esize, unsigned int n,
+		enum rte_ring_queue_behavior behavior, unsigned int is_sc,
+		unsigned int *old_head, unsigned int *new_head,
+		unsigned int *available, void **src1, unsigned int *n1,
+		void **src2)
+{
+	uint32_t entries;
+
+	n = __rte_ring_move_cons_head_serial(r, is_sc, n, behavior,
+			old_head, new_head, &entries);
+
+	if (n == 0)
+		goto end;
+
+	__rte_ring_get_elem_addr(r, *old_head, esize, n, src1, n1, src2);
+
+	if (available != NULL)
+		*available = entries - n;
+
+end:
+	return n;
+}
+
+/**
+ * @internal Consume previously reserved ring elements (for dequeue)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param old_head
+ *   Consumer's head index before reservation.
+ * @param new_head
+ *   Consumer's head index after reservation.
+ * @param is_sc
+ *   Indicates whether to use single consumer or multi-consumer head update
+ */
+static __rte_always_inline void
+__rte_ring_do_dequeue_elem_commit(struct rte_ring *r,
+		unsigned int old_head, unsigned int new_head,
+		unsigned int is_sc)
+{
+	update_tail(&r->cons, old_head, new_head, is_sc, 1);
+}
+
+/**
+ * Reserve one element on a ring for dequeue. This function blocks if there
+ * are elements reserved already. Application must call
+ * 'rte_ring_do_dequeue_elem_commit' or
+ * `rte_ring_do_dequeue_elem_revert_serial' to complete the dequeue operation.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param esize
+ *   The size of ring element, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the ring. Otherwise
+ *   the results are undefined.
+ * @param old_head
+ *   Consumer's head index before reservation. The same should be passed to
+ *   'rte_ring_dequeue_elem_commit' function.
+ * @param new_head
+ *   Consumer's head index after reservation. The same should be passed to
+ *   'rte_ring_dequeue_elem_commit' function.
+ * @param available
+ *   returns the number of remaining ring elements after the reservation
+ *   It is not updated if the number of reserved elements is zero.
+ * @param src
+ *   Pointer to location in the ring to copy the data from.
+ * @return
+ *   - 0: Success; elements reserved
+ *   - -ENOBUFS: Not enough room in the ring; no element is reserved.
+ */
+static __rte_always_inline int
+rte_ring_dequeue_elem_reserve_serial(struct rte_ring *r, unsigned int esize,
+		unsigned int *old_head, unsigned int *new_head,
+		unsigned int *available, void **src)
+{
+	unsigned int n;
+
+	return __rte_ring_do_dequeue_elem_reserve_serial(r, esize, 1,
+			RTE_RING_QUEUE_FIXED, r->cons.single, old_head,
+			new_head, available, src, &n, NULL) ? 0 : -ENOBUFS;
+}
+
+/**
+ * Consume previously reserved elements (for dequeue) in a ring
+ * (multi-consumer safe).
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param old_head
+ *   Consumer's head index before reservation. This value was returned
+ *   when the API 'rte_ring_dequeue_elem_reserve_xxx' was called.
+ * @param new_head
+ *   Consumer's head index after reservation. This value was returned
+ *   when the API 'rte_ring_dequeue_elem_reserve_xxx' was called.
+ */
+static __rte_always_inline void
+rte_ring_dequeue_elem_commit(struct rte_ring *r, unsigned int old_head,
+		unsigned int new_head)
+{
+	__rte_ring_do_dequeue_elem_commit(r, old_head, new_head,
+						r->cons.single);
+}
+
+/**
+ * Discard previously reserved elements (for dequeue) in a ring.
+ *
+ * @warning
+ * This API can be called only if the ring elements were reserved
+ * using 'rte_ring_dequeue_xxx_elem_reserve_serial' APIs.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ */
+static __rte_always_inline void
+rte_ring_dequeue_elem_revert_serial(struct rte_ring *r)
+{
+	__rte_ring_revert_head(&r->cons);
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_RING_ELEM_SG_H_ */
diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h
index 953cdbbd5..8d7a7ffcc 100644
--- a/lib/librte_ring/rte_ring_generic.h
+++ b/lib/librte_ring/rte_ring_generic.h
@@ -170,4 +170,97 @@  __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
 	return n;
 }
 
+/**
+ * @internal This function updates the consumer head if there are no
+ * prior reserved elements on the ring.
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sc
+ *   Indicates whether multi-consumer path is needed or not
+ * @param n
+ *   The number of elements we will want to dequeue, i.e. how far should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where dequeue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where dequeue finishes
+ * @param entries
+ *   Returns the number of entries in the ring BEFORE head was moved
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_move_cons_head_serial(struct rte_ring *r, unsigned int is_sc,
+		unsigned int n, enum rte_ring_queue_behavior behavior,
+		uint32_t *old_head, uint32_t *new_head,
+		uint32_t *entries)
+{
+	unsigned int max = n;
+	int success;
+
+	/* move cons.head atomically */
+	do {
+		/* Restore n as it may change every loop */
+		n = max;
+
+		*old_head = r->cons.head;
+
+		/* add rmb barrier to avoid load/load reorder in weak
+		 * memory model. It is noop on x86
+		 */
+		rte_smp_rmb();
+
+		/* Ensure that cons.tail and cons.head are the same */
+		if (*old_head != r->cons.tail) {
+			rte_pause();
+
+			success = 0;
+			continue;
+		}
+
+		/* The subtraction is done between two unsigned 32bits value
+		 * (the result is always modulo 32 bits even if we have
+		 * cons_head > prod_tail). So 'entries' is always between 0
+		 * and size(ring)-1.
+		 */
+		*entries = (r->prod.tail - *old_head);
+
+		/* Set the actual entries for dequeue */
+		if (n > *entries)
+			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+		if (unlikely(n == 0))
+			return 0;
+
+		*new_head = *old_head + n;
+		if (is_sc) {
+			r->cons.head = *new_head;
+			rte_smp_rmb();
+			success = 1;
+		} else {
+			success = rte_atomic32_cmpset(&r->cons.head, *old_head,
+					*new_head);
+		}
+	} while (unlikely(success == 0));
+	return n;
+}
+
+/**
+ * @internal This function updates the head to match the tail
+ *
+ * @param ht
+ *   A pointer to the ring's head-tail structure
+ */
+static __rte_always_inline void
+__rte_ring_revert_head(struct rte_ring_headtail *ht)
+{
+	/* Discard the reserved ring elements. */
+	ht->head = ht->tail;
+}
+
 #endif /* _RTE_RING_GENERIC_H_ */