[5/7] stack: add non-blocking stack implementation

Message ID 20190222160655.3346-6-gage.eads@intel.com
State Superseded, archived
Delegated to: Thomas Monjalon
Headers show
Series
  • Subject: [PATCH ...] Add stack library and new mempool handler
Related show

Checks

Context Check Description
ci/Intel-compilation fail Compilation issues
ci/checkpatch success coding style OK

Commit Message

Gage Eads Feb. 22, 2019, 4:06 p.m.
This commit adds support for a non-blocking (linked list based) stack to
the stack API. This behavior is selected through a new rte_stack_create()
flag, STACK_F_NB.

The stack consists of a linked list of elements, each containing a data
pointer and a next pointer, and an atomic stack depth counter.

The non-blocking push operation enqueues a linked list of pointers by
pointing the tail of the list to the current stack head, and using a CAS to
swing the stack head pointer to the head of the list. The operation retries
if it is unsuccessful (i.e. the list changed between reading the head and
modifying it), else it adjusts the stack length and returns.

The non-blocking pop operation first reserves num elements by adjusting the
stack length, to ensure the dequeue operation will succeed without
blocking. It then dequeues pointers by walking the list -- starting from
the head -- then swinging the head pointer (using a CAS as well). While
walking the list, the data pointers are recorded in an object table.

This algorithm stack uses a 128-bit compare-and-swap instruction, which
atomically updates the stack top pointer and a modification counter, to
protect against the ABA problem.

The linked list elements themselves are maintained in a non-blocking LIFO,
and are allocated before stack pushes and freed after stack pops. Since the
stack has a fixed maximum depth, these elements do not need to be
dynamically created.

Signed-off-by: Gage Eads <gage.eads@intel.com>
---
 doc/guides/prog_guide/stack_lib.rst    |  46 ++++++++-
 doc/guides/rel_notes/release_19_05.rst |   3 +
 lib/librte_stack/Makefile              |   4 +-
 lib/librte_stack/meson.build           |   4 +-
 lib/librte_stack/rte_stack.c           |  42 ++++++--
 lib/librte_stack/rte_stack.h           | 139 +++++++++++++++++++++++++-
 lib/librte_stack/rte_stack_c11_mem.h   | 173 +++++++++++++++++++++++++++++++++
 lib/librte_stack/rte_stack_generic.h   | 157 ++++++++++++++++++++++++++++++
 8 files changed, 550 insertions(+), 18 deletions(-)
 create mode 100644 lib/librte_stack/rte_stack_c11_mem.h
 create mode 100644 lib/librte_stack/rte_stack_generic.h

Comments

Olivier Matz Feb. 25, 2019, 11:28 a.m. | #1
On Fri, Feb 22, 2019 at 10:06:53AM -0600, Gage Eads wrote:
> This commit adds support for a non-blocking (linked list based) stack to
> the stack API. This behavior is selected through a new rte_stack_create()
> flag, STACK_F_NB.
> 
> The stack consists of a linked list of elements, each containing a data
> pointer and a next pointer, and an atomic stack depth counter.
> 
> The non-blocking push operation enqueues a linked list of pointers by
> pointing the tail of the list to the current stack head, and using a CAS to
> swing the stack head pointer to the head of the list. The operation retries
> if it is unsuccessful (i.e. the list changed between reading the head and
> modifying it), else it adjusts the stack length and returns.
> 
> The non-blocking pop operation first reserves num elements by adjusting the
> stack length, to ensure the dequeue operation will succeed without
> blocking. It then dequeues pointers by walking the list -- starting from
> the head -- then swinging the head pointer (using a CAS as well). While
> walking the list, the data pointers are recorded in an object table.
> 
> This algorithm stack uses a 128-bit compare-and-swap instruction, which
> atomically updates the stack top pointer and a modification counter, to
> protect against the ABA problem.
> 
> The linked list elements themselves are maintained in a non-blocking LIFO,
> and are allocated before stack pushes and freed after stack pops. Since the
> stack has a fixed maximum depth, these elements do not need to be
> dynamically created.
> 
> Signed-off-by: Gage Eads <gage.eads@intel.com>

[...]

> diff --git a/doc/guides/prog_guide/stack_lib.rst b/doc/guides/prog_guide/stack_lib.rst
> index 51689cfe1..86fdc0a9b 100644
> --- a/doc/guides/prog_guide/stack_lib.rst
> +++ b/doc/guides/prog_guide/stack_lib.rst
> @@ -9,7 +9,7 @@ pointers.
>  
>  The stack library provides the following basic operations:
>  
> -*  Create a uniquely named stack of a user-specified size and using a user-specified socket.
> +*  Create a uniquely named stack of a user-specified size and using a user-specified socket, with either lock-based or non-blocking behavior.
>  
>  *  Push and pop a burst of one or more stack objects (pointers). These function are multi-threading safe.
>  

Same comment about 80-cols than in the first patch.

[...]

> --- a/lib/librte_stack/rte_stack.c
> +++ b/lib/librte_stack/rte_stack.c
> @@ -26,27 +26,46 @@ static struct rte_tailq_elem rte_stack_tailq = {
>  EAL_REGISTER_TAILQ(rte_stack_tailq)
>  
>  static void
> +nb_lifo_init(struct rte_stack *s, unsigned int count)
> +{
> +	struct rte_nb_lifo_elem *elems;
> +	unsigned int i;
> +
> +	elems = (struct rte_nb_lifo_elem *)&s[1];
> +	for (i = 0; i < count; i++)
> +		__nb_lifo_push(&s->nb_lifo.free, &elems[i], &elems[i], 1);
> +}

Would it be possible to add:

	 struct rte_nb_lifo {
	 	/** LIFO list of elements */
	 	struct rte_nb_lifo_list used __rte_cache_aligned;
	 	/** LIFO list of free elements */
	 	struct rte_nb_lifo_list free __rte_cache_aligned;
	+	struct rte_nb_lifo_elem elems[];
	 };

I think it is more consistent with the non-blocking structure.

[...]

> --- a/lib/librte_stack/rte_stack.h
> +++ b/lib/librte_stack/rte_stack.h
> @@ -29,6 +29,33 @@ extern "C" {
>  #define RTE_STACK_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
>  			   sizeof(RTE_STACK_MZ_PREFIX) + 1)
>  
> +struct rte_nb_lifo_elem {
> +	void *data;			/**< Data pointer */
> +	struct rte_nb_lifo_elem *next;	/**< Next pointer */
> +};
> +
> +struct rte_nb_lifo_head {
> +	struct rte_nb_lifo_elem *top; /**< Stack top */
> +	uint64_t cnt; /**< Modification counter for avoiding ABA problem */
> +};
> +
> +struct rte_nb_lifo_list {
> +	/** List head */
> +	struct rte_nb_lifo_head head __rte_aligned(16);
> +	/** List len */
> +	rte_atomic64_t len;
> +};
> +
> +/* Structure containing two non-blocking LIFO lists: the stack itself and a
> + * list of free linked-list elements.
> + */
> +struct rte_nb_lifo {
> +	/** LIFO list of elements */
> +	struct rte_nb_lifo_list used __rte_cache_aligned;
> +	/** LIFO list of free elements */
> +	struct rte_nb_lifo_list free __rte_cache_aligned;
> +};
> +

The names "rte_nb_lifo*" bothers me a bit. I think a more usual name
format is "rte_<module_name>_<struct_name>".

What would you think about names like this?
  rte_nb_lifo ->  rte_stack_nb
  rte_nb_lifo_elem -> rte_stack_nb_elem
  rte_nb_lifo_head -> rte_stack_nb_head
  rte_nb_lifo_list -> rte_stack_nb_list
  rte_lifo -> rte_stack_std

I even wonder if "nonblock", "noblk", or "lockless" shouldn't be used
in place of "nb" (which is also a common abbreviation for number). This
can also applies to the STACK_F_NB flag name.

[...]

>  /* Structure containing the LIFO, its current length, and a lock for mutual
>   * exclusion.
>   */
> @@ -48,10 +75,69 @@ struct rte_stack {
>  	const struct rte_memzone *memzone;
>  	uint32_t capacity; /**< Usable size of the stack */
>  	uint32_t flags; /**< Flags supplied at creation */
> -	struct rte_lifo lifo; /**< LIFO structure */
> +	RTE_STD_C11
> +	union {
> +		struct rte_nb_lifo nb_lifo; /**< Non-blocking LIFO structure */
> +		struct rte_lifo lifo;	    /**< LIFO structure */
> +	};
>  } __rte_cache_aligned;
>  
>  /**
> + * The stack uses non-blocking push and pop functions. This flag is only
> + * supported on x86_64 platforms, currently.
> + */
> +#define STACK_F_NB 0x0001

What about adding the RTE_ prefix?

> +static __rte_always_inline unsigned int __rte_experimental
> +rte_nb_lifo_push(struct rte_stack *s, void * const *obj_table, unsigned int n)
> +{
> +	struct rte_nb_lifo_elem *tmp, *first, *last = NULL;
> +	unsigned int i;
> +
> +	if (unlikely(n == 0))
> +		return 0;
> +
> +	/* Pop n free elements */
> +	first = __nb_lifo_pop(&s->nb_lifo.free, n, NULL, NULL);
> +	if (unlikely(first == NULL))
> +		return 0;
> +
> +	/* Construct the list elements */
> +	tmp = first;
> +	for (i = 0; i < n; i++) {
> +		tmp->data = obj_table[n - i - 1];
> +		last = tmp;
> +		tmp = tmp->next;
> +	}
> +
> +	/* Push them to the used list */
> +	__nb_lifo_push(&s->nb_lifo.used, first, last, n);
> +
> +	return n;
> +}

Here, I didn't get why "last" is not retrieved through __nb_lifo_pop(),
like it's done in rte_nb_lifo_pop(). Is there a reason for that?

[...]

> --- /dev/null
> +++ b/lib/librte_stack/rte_stack_c11_mem.h

For the c11 memory model, please consider having an additional reviewer ;)

[...]

> --- /dev/null
> +++ b/lib/librte_stack/rte_stack_generic.h
> @@ -0,0 +1,157 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2019 Intel Corporation
> + */
> +
> +#ifndef _NB_LIFO_GENERIC_H_
> +#define _NB_LIFO_GENERIC_H_
> +
> +#include <rte_branch_prediction.h>
> +#include <rte_prefetch.h>
> +
> +static __rte_always_inline unsigned int
> +rte_nb_lifo_len(struct rte_stack *s)
> +{
> +	/* nb_lifo_push() and nb_lifo_pop() do not update the list's contents
> +	 * and nb_lifo->len atomically, which can cause the list to appear
> +	 * shorter than it actually is if this function is called while other
> +	 * threads are modifying the list.
> +	 *
> +	 * However, given the inherently approximate nature of the get_count
> +	 * callback -- even if the list and its size were updated atomically,
> +	 * the size could change between when get_count executes and when the
> +	 * value is returned to the caller -- this is acceptable.
> +	 *
> +	 * The nb_lifo->len updates are placed such that the list may appear to
> +	 * have fewer elements than it does, but will never appear to have more
> +	 * elements. If the mempool is near-empty to the point that this is a
> +	 * concern, the user should consider increasing the mempool size.
> +	 */
> +	return (unsigned int)rte_atomic64_read(&s->nb_lifo.used.len);
> +}
> +
> +static __rte_always_inline void
> +__nb_lifo_push(struct rte_nb_lifo_list *lifo,
> +	       struct rte_nb_lifo_elem *first,
> +	       struct rte_nb_lifo_elem *last,
> +	       unsigned int num)
> +{
> +#ifndef RTE_ARCH_X86_64
> +	RTE_SET_USED(first);
> +	RTE_SET_USED(last);
> +	RTE_SET_USED(lifo);
> +	RTE_SET_USED(num);
> +#else
> +	struct rte_nb_lifo_head old_head;
> +	int success;
> +
> +	old_head = lifo->head;
> +
> +	do {
> +		struct rte_nb_lifo_head new_head;
> +
> +		/* Swing the top pointer to the first element in the list and
> +		 * make the last element point to the old top.
> +		 */
> +		new_head.top = first;
> +		new_head.cnt = old_head.cnt + 1;
> +
> +		last->next = old_head.top;
> +
> +		/* Ensure the list entry writes are visible before pushing them
> +		 * to the stack.
> +		 */
> +		rte_wmb();
> +
> +		/* old_head is updated on failure */
> +		success = rte_atomic128_cmpxchg((rte_int128_t *)&lifo->head,
> +					       (rte_int128_t *)&old_head,
> +					       (rte_int128_t *)&new_head,
> +					       1, __ATOMIC_RELEASE,
> +					       __ATOMIC_RELAXED);
> +	} while (success == 0);
> +
> +	rte_atomic64_add(&lifo->len, num);
> +#endif
> +}
> +
> +static __rte_always_inline struct rte_nb_lifo_elem *
> +__nb_lifo_pop(struct rte_nb_lifo_list *lifo,
> +	      unsigned int num,
> +	      void **obj_table,
> +	      struct rte_nb_lifo_elem **last)
> +{
> +#ifndef RTE_ARCH_X86_64
> +	RTE_SET_USED(obj_table);
> +	RTE_SET_USED(last);
> +	RTE_SET_USED(lifo);
> +	RTE_SET_USED(num);
> +
> +	return NULL;
> +#else
> +	struct rte_nb_lifo_head old_head;
> +	int success;
> +
> +	/* Reserve num elements, if available */
> +	while (1) {
> +		uint64_t len = rte_atomic64_read(&lifo->len);
> +
> +		/* Does the list contain enough elements? */
> +		if (unlikely(len < num))
> +			return NULL;
> +
> +		if (rte_atomic64_cmpset((volatile uint64_t *)&lifo->len,
> +					len, len - num))
> +			break;
> +	}
> +

Here, accessing the length with a compare and set costs probably more
than a standard atomic sub fonction. I understand that was done for the
reason described above:

	The nb_lifo->len updates are placed such that the list may
	appear to have fewer elements than it does, but will never
	appear to have more elements.

Another strategy could be to use a rte_atomic64_sub() after the effective
pop and change rte_nb_lifo_len() to bound the result to [0:size].
Gage Eads March 1, 2019, 8:53 p.m. | #2
> -----Original Message-----
> From: Olivier Matz [mailto:olivier.matz@6wind.com]
> Sent: Monday, February 25, 2019 5:28 AM
> To: Eads, Gage <gage.eads@intel.com>
> Cc: dev@dpdk.org; arybchenko@solarflare.com; Richardson, Bruce
> <bruce.richardson@intel.com>; Ananyev, Konstantin
> <konstantin.ananyev@intel.com>; gavin.hu@arm.com;
> Honnappa.Nagarahalli@arm.com; nd@arm.com; thomas@monjalon.net
> Subject: Re: [PATCH 5/7] stack: add non-blocking stack implementation
>
> On Fri, Feb 22, 2019 at 10:06:53AM -0600, Gage Eads wrote:
> > This commit adds support for a non-blocking (linked list based)
> > stack to the stack API. This behavior is selected through a new
> > rte_stack_create() flag, STACK_F_NB.
> >
> > The stack consists of a linked list of elements, each containing a
> > data pointer and a next pointer, and an atomic stack depth counter.
> >
> > The non-blocking push operation enqueues a linked list of pointers
> by pointing the tail of the list to the current stack head, and
> > using a CAS to swing the stack head pointer to the head of the list.
> > The operation retries if it is unsuccessful (i.e. the list changed
> > between reading the head and modifying it), else it adjusts the stack length
and returns.
> >
> > The non-blocking pop operation first reserves num elements by
> > adjusting the stack length, to ensure the dequeue operation will
> > succeed without blocking. It then dequeues pointers by walking the
> > list -- starting from the head -- then swinging the head pointer
> > (using a CAS as well). While walking the list, the data pointers are
> > recorded in
> an object table.
> >
> > This algorithm stack uses a 128-bit compare-and-swap instruction,
> > which atomically updates the stack top pointer and a modification
> > counter, to protect against the ABA problem.
> >
> > The linked list elements themselves are maintained in a non-blocking
> > LIFO, and are allocated before stack pushes and freed after stack
> > pops. Since the stack has a fixed maximum depth, these elements do
> > not need to be dynamically created.
> >
> > Signed-off-by: Gage Eads <gage.eads@intel.com>
>
> [...]
>
> > diff --git a/doc/guides/prog_guide/stack_lib.rst
> > b/doc/guides/prog_guide/stack_lib.rst
> > index 51689cfe1..86fdc0a9b 100644
> > --- a/doc/guides/prog_guide/stack_lib.rst
> > +++ b/doc/guides/prog_guide/stack_lib.rst
> > @@ -9,7 +9,7 @@ pointers.
> >
> >  The stack library provides the following basic operations:
> >
> > -*  Create a uniquely named stack of a user-specified size and using
> > a user-
> specified socket.
> > +*  Create a uniquely named stack of a user-specified size and using
> > +a user-
> specified socket, with either lock-based or non-blocking behavior.
> >
> >  *  Push and pop a burst of one or more stack objects (pointers).
> > These
> function are multi-threading safe.
> >
>
> Same comment about 80-cols than in the first patch.
>
> [...]
>
> > --- a/lib/librte_stack/rte_stack.c
> > +++ b/lib/librte_stack/rte_stack.c
> > @@ -26,27 +26,46 @@ static struct rte_tailq_elem rte_stack_tailq = {
> >  EAL_REGISTER_TAILQ(rte_stack_tailq)
> >
> >  static void
> > +nb_lifo_init(struct rte_stack *s, unsigned int count) {
> > +	struct rte_nb_lifo_elem *elems;
> > +	unsigned int i;
> > +
> > +	elems = (struct rte_nb_lifo_elem *)&s[1];
> > +	for (i = 0; i < count; i++)
> > +		__nb_lifo_push(&s->nb_lifo.free, &elems[i], &elems[i], 1); }
>
> Would it be possible to add:
>
> 	 struct rte_nb_lifo {
> 	 	/** LIFO list of elements */
> 	 	struct rte_nb_lifo_list used __rte_cache_aligned;
> 	 	/** LIFO list of free elements */
> 	 	struct rte_nb_lifo_list free __rte_cache_aligned;
> 	+	struct rte_nb_lifo_elem elems[];
> 	 };
>
> I think it is more consistent with the non-blocking structure.
>

Will do.

> [...]
>
> > --- a/lib/librte_stack/rte_stack.h
> > +++ b/lib/librte_stack/rte_stack.h
> > @@ -29,6 +29,33 @@ extern "C" {
> >  #define RTE_STACK_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
> >  			   sizeof(RTE_STACK_MZ_PREFIX) + 1)
> >
> > +struct rte_nb_lifo_elem {
> > +	void *data;			/**< Data pointer */
> > +	struct rte_nb_lifo_elem *next;	/**< Next pointer */
> > +};
> > +
> > +struct rte_nb_lifo_head {
> > +	struct rte_nb_lifo_elem *top; /**< Stack top */
> > +	uint64_t cnt; /**< Modification counter for avoiding ABA problem
> > +*/ };
> > +
> > +struct rte_nb_lifo_list {
> > +	/** List head */
> > +	struct rte_nb_lifo_head head __rte_aligned(16);
> > +	/** List len */
> > +	rte_atomic64_t len;
> > +};
> > +
> > +/* Structure containing two non-blocking LIFO lists: the stack
> > +itself and a
> > + * list of free linked-list elements.
> > + */
> > +struct rte_nb_lifo {
> > +	/** LIFO list of elements */
> > +	struct rte_nb_lifo_list used __rte_cache_aligned;
> > +	/** LIFO list of free elements */
> > +	struct rte_nb_lifo_list free __rte_cache_aligned; };
> > +
>
> The names "rte_nb_lifo*" bothers me a bit. I think a more usual name
> format is "rte_<module_name>_<struct_name>".
>
> What would you think about names like this?
>   rte_nb_lifo ->  rte_stack_nb
>   rte_nb_lifo_elem -> rte_stack_nb_elem
>   rte_nb_lifo_head -> rte_stack_nb_head
>   rte_nb_lifo_list -> rte_stack_nb_list
>   rte_lifo -> rte_stack_std
>
> I even wonder if "nonblock", "noblk", or "lockless" shouldn't be used
> in place of "nb" (which is also a common abbreviation for number).
> This can also applies to the STACK_F_NB flag name.
>

How about std and lf (lock-free)?

> [...]
>
> >  /* Structure containing the LIFO, its current length, and a lock for mutual
> >   * exclusion.
> >   */
> > @@ -48,10 +75,69 @@ struct rte_stack {
> >  	const struct rte_memzone *memzone;
> >  	uint32_t capacity; /**< Usable size of the stack */
> >  	uint32_t flags; /**< Flags supplied at creation */
> > -	struct rte_lifo lifo; /**< LIFO structure */
> > +	RTE_STD_C11
> > +	union {
> > +		struct rte_nb_lifo nb_lifo; /**< Non-blocking LIFO structure */
> > +		struct rte_lifo lifo;	    /**< LIFO structure */
> > +	};
> >  } __rte_cache_aligned;
> >
> >  /**
> > + * The stack uses non-blocking push and pop functions. This flag is
> > +only
> > + * supported on x86_64 platforms, currently.
> > + */
> > +#define STACK_F_NB 0x0001
>
> What about adding the RTE_ prefix?

I'm fine with either, but there's precedent for flag macros named
<module_name>_*. E.g. RING_F_*, MEMPOOL_F_*, ETH_*, and SERVICE_F_*.

>
> > +static __rte_always_inline unsigned int __rte_experimental
> > +rte_nb_lifo_push(struct rte_stack *s, void * const *obj_table,
> > +unsigned int n) {
> > +	struct rte_nb_lifo_elem *tmp, *first, *last = NULL;
> > +	unsigned int i;
> > +
> > +	if (unlikely(n == 0))
> > +		return 0;
> > +
> > +	/* Pop n free elements */
> > +	first = __nb_lifo_pop(&s->nb_lifo.free, n, NULL, NULL);
> > +	if (unlikely(first == NULL))
> > +		return 0;
> > +
> > +	/* Construct the list elements */
> > +	tmp = first;
> > +	for (i = 0; i < n; i++) {
> > +		tmp->data = obj_table[n - i - 1];
> > +		last = tmp;
> > +		tmp = tmp->next;
> > +	}
> > +
> > +	/* Push them to the used list */
> > +	__nb_lifo_push(&s->nb_lifo.used, first, last, n);
> > +
> > +	return n;
> > +}
>
> Here, I didn't get why "last" is not retrieved through
> __nb_lifo_pop(), like it's done in rte_nb_lifo_pop(). Is there a reason for that?
>

Just an simple oversight -- that works, and I'll change it for v2.

> [...]
>
> > --- /dev/null
> > +++ b/lib/librte_stack/rte_stack_c11_mem.h
>
> For the c11 memory model, please consider having an additional
> reviewer ;)

No problem, and I'll break out the C11 implementation into a separate patch in
case that makes reviewing it easier.

>
> [...]
>
> > --- /dev/null
> > +++ b/lib/librte_stack/rte_stack_generic.h
> > @@ -0,0 +1,157 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + * Copyright(c) 2019 Intel Corporation  */
> > +
> > +#ifndef _NB_LIFO_GENERIC_H_
> > +#define _NB_LIFO_GENERIC_H_
> > +
> > +#include <rte_branch_prediction.h>
> > +#include <rte_prefetch.h>
> > +
> > +static __rte_always_inline unsigned int rte_nb_lifo_len(struct
> > +rte_stack *s) {
> > +	/* nb_lifo_push() and nb_lifo_pop() do not update the list's contents
> > +	 * and nb_lifo->len atomically, which can cause the list to appear
> > +	 * shorter than it actually is if this function is called while other
> > +	 * threads are modifying the list.
> > +	 *
> > +	 * However, given the inherently approximate nature of the get_count
> > +	 * callback -- even if the list and its size were updated atomically,
> > +	 * the size could change between when get_count executes and when
> the
> > +	 * value is returned to the caller -- this is acceptable.
> > +	 *
> > +	 * The nb_lifo->len updates are placed such that the list may appear to
> > +	 * have fewer elements than it does, but will never appear to have more
> > +	 * elements. If the mempool is near-empty to the point that this is a
> > +	 * concern, the user should consider increasing the mempool size.
> > +	 */
> > +	return (unsigned int)rte_atomic64_read(&s->nb_lifo.used.len);
> > +}
> > +
> > +static __rte_always_inline void
> > +__nb_lifo_push(struct rte_nb_lifo_list *lifo,
> > +	       struct rte_nb_lifo_elem *first,
> > +	       struct rte_nb_lifo_elem *last,
> > +	       unsigned int num)
> > +{
> > +#ifndef RTE_ARCH_X86_64
> > +	RTE_SET_USED(first);
> > +	RTE_SET_USED(last);
> > +	RTE_SET_USED(lifo);
> > +	RTE_SET_USED(num);
> > +#else
> > +	struct rte_nb_lifo_head old_head;
> > +	int success;
> > +
> > +	old_head = lifo->head;
> > +
> > +	do {
> > +		struct rte_nb_lifo_head new_head;
> > +
> > +		/* Swing the top pointer to the first element in the list and
> > +		 * make the last element point to the old top.
> > +		 */
> > +		new_head.top = first;
> > +		new_head.cnt = old_head.cnt + 1;
> > +
> > +		last->next = old_head.top;
> > +
> > +		/* Ensure the list entry writes are visible before pushing them
> > +		 * to the stack.
> > +		 */
> > +		rte_wmb();
> > +
> > +		/* old_head is updated on failure */
> > +		success = rte_atomic128_cmpxchg((rte_int128_t *)&lifo->head,
> > +					       (rte_int128_t *)&old_head,
> > +					       (rte_int128_t *)&new_head,
> > +					       1, __ATOMIC_RELEASE,
> > +					       __ATOMIC_RELAXED);
> > +	} while (success == 0);
> > +
> > +	rte_atomic64_add(&lifo->len, num); #endif }
> > +
> > +static __rte_always_inline struct rte_nb_lifo_elem *
> > +__nb_lifo_pop(struct rte_nb_lifo_list *lifo,
> > +	      unsigned int num,
> > +	      void **obj_table,
> > +	      struct rte_nb_lifo_elem **last) { #ifndef RTE_ARCH_X86_64
> > +	RTE_SET_USED(obj_table);
> > +	RTE_SET_USED(last);
> > +	RTE_SET_USED(lifo);
> > +	RTE_SET_USED(num);
> > +
> > +	return NULL;
> > +#else
> > +	struct rte_nb_lifo_head old_head;
> > +	int success;
> > +
> > +	/* Reserve num elements, if available */
> > +	while (1) {
> > +		uint64_t len = rte_atomic64_read(&lifo->len);
> > +
> > +		/* Does the list contain enough elements? */
> > +		if (unlikely(len < num))
> > +			return NULL;
> > +
> > +		if (rte_atomic64_cmpset((volatile uint64_t *)&lifo->len,
> > +					len, len - num))
> > +			break;
> > +	}
> > +
>
> Here, accessing the length with a compare and set costs probably more
> than a standard atomic sub fonction. I understand that was done for
> the reason described above:
>
> 	The nb_lifo->len updates are placed such that the list may
> 	appear to have fewer elements than it does, but will never
> 	appear to have more elements.
>
> Another strategy could be to use a rte_atomic64_sub() after the
> effective pop and change rte_nb_lifo_len() to bound the result to [0:size].

It serves a second purpose: if the CAS succeeds, the subsequent do-while loop is
guaranteed to (eventually) succeed because we've effectively reserved num
elements. Otherwise there's the chance that the list runs empty after popping
fewer than num elements.

If folks are interested in this patchset, please also consider reviewing the 128-bit
CAS patch here: http://mails.dpdk.org/archives/dev/2019-February/125059.html

Thanks!
Gage
Thomas Monjalon March 1, 2019, 9:12 p.m. | #3
01/03/2019 21:53, Eads, Gage:
> From: Olivier Matz [mailto:olivier.matz@6wind.com]
> > On Fri, Feb 22, 2019 at 10:06:53AM -0600, Gage Eads wrote:
> > > +#define STACK_F_NB 0x0001
> >
> > What about adding the RTE_ prefix?
> 
> I'm fine with either, but there's precedent for flag macros named
> <module_name>_*. E.g. RING_F_*, MEMPOOL_F_*, ETH_*, and SERVICE_F_*.

They should be fixed.
Every public symbols should be prefixed to avoid namespace conflict.
At first, we should rename them and keep the old name as an alias.
Later, non-prefixed names should be removed after a deprecation notice.
Gage Eads March 1, 2019, 9:29 p.m. | #4
> -----Original Message-----
> From: Thomas Monjalon [mailto:thomas@monjalon.net]
> Sent: Friday, March 1, 2019 3:13 PM
> To: Eads, Gage <gage.eads@intel.com>
> Cc: Olivier Matz <olivier.matz@6wind.com>; dev@dpdk.org;
> arybchenko@solarflare.com; Richardson, Bruce <bruce.richardson@intel.com>;
> Ananyev, Konstantin <konstantin.ananyev@intel.com>; gavin.hu@arm.com;
> Honnappa.Nagarahalli@arm.com; nd@arm.com
> Subject: Re: FW: [PATCH 5/7] stack: add non-blocking stack implementation
> 
> 01/03/2019 21:53, Eads, Gage:
> > From: Olivier Matz [mailto:olivier.matz@6wind.com]
> > > On Fri, Feb 22, 2019 at 10:06:53AM -0600, Gage Eads wrote:
> > > > +#define STACK_F_NB 0x0001
> > >
> > > What about adding the RTE_ prefix?
> >
> > I'm fine with either, but there's precedent for flag macros named
> > <module_name>_*. E.g. RING_F_*, MEMPOOL_F_*, ETH_*, and
> SERVICE_F_*.
> 
> They should be fixed.
> Every public symbols should be prefixed to avoid namespace conflict.
> At first, we should rename them and keep the old name as an alias.
> Later, non-prefixed names should be removed after a deprecation notice.
> 

Ok, will fix.

Thanks,
Gage

Patch

diff --git a/doc/guides/prog_guide/stack_lib.rst b/doc/guides/prog_guide/stack_lib.rst
index 51689cfe1..86fdc0a9b 100644
--- a/doc/guides/prog_guide/stack_lib.rst
+++ b/doc/guides/prog_guide/stack_lib.rst
@@ -9,7 +9,7 @@  pointers.
 
 The stack library provides the following basic operations:
 
-*  Create a uniquely named stack of a user-specified size and using a user-specified socket.
+*  Create a uniquely named stack of a user-specified size and using a user-specified socket, with either lock-based or non-blocking behavior.
 
 *  Push and pop a burst of one or more stack objects (pointers). These function are multi-threading safe.
 
@@ -22,5 +22,45 @@  The stack library provides the following basic operations:
 Implementation
 ~~~~~~~~~~~~~~
 
-The stack consists of a contiguous array of pointers, a current index, and a
-spinlock. Accesses to the stack are made multi-thread safe by the spinlock.
+The library supports two types of stacks: lock-based and non-blocking.
+Both types use the same set of interfaces, but their implementations differ.
+
+Lock-based Stack
+----------------
+
+The lock-based stack consists of a contiguous array of pointers, a current index, and a spinlock.
+Accesses to the stack are made multi-thread safe by the spinlock.
+
+The lock-based stack consists of a contiguous array of pointers, a current
+index, and a spinlock. Accesses to the stack are made multi-thread safe by the
+spinlock.
+
+Non-blocking Stack
+------------------
+
+The non-blocking stack consists of a linked list of elements, each containing a data pointer and a next pointer, and an atomic stack depth counter.
+The non-blocking property means that multiple threads can push and pop simultaneously, and one thread being preempted/delayed in a push or pop operation will not impede the forward progress of any other thread.
+
+The non-blocking push operation enqueues a linked list of pointers by pointing the list's tail to the current stack head, and using a CAS to swing the stack head pointer to the head of the list.
+The operation retries if it is unsuccessful (i.e. the list changed between reading the head and modifying it), else it adjusts the stack length and returns.
+
+The non-blocking pop operation first reserves one or more list elements by adjusting the stack length, to ensure the dequeue operation will succeed without blocking.
+It then dequeues pointers by walking the list -- starting from the head -- then swinging the head pointer (using a CAS as well).
+While walking the list, the data pointers are recorded in an object table.
+
+The linked list elements themselves are maintained in a non-blocking LIFO, and are allocated before stack pushes and freed after stack pops.
+Since the stack has a fixed maximum depth, these elements do not need to be dynamically created.
+
+The non-blocking behavior is selected by passing the *STACK_F_NB* flag to rte_stack_create().
+
+Preventing the ABA Problem
+^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+To prevent the ABA problem, this algorithm stack uses a 128-bit compare-and-swap instruction to atomically update both the stack top pointer and a modification counter. The ABA problem can occur without a modification counter if, for example:
+
+1. Thread A reads head pointer X and stores the pointed-to list element.
+2. Other threads modify the list such that the head pointer is once again X, but its pointed-to data is different than what thread A read.
+3. Thread A changes the head pointer with a compare-and-swap and succeeds.
+
+In this case thread A would not detect that the list had changed, and would both pop stale data and incorrect change the head pointer.
+By adding a modification counter that is updated on every push and pop as part of the compare-and-swap, the algorithm can detect when the list changes even if the head pointer remains the same.
diff --git a/doc/guides/rel_notes/release_19_05.rst b/doc/guides/rel_notes/release_19_05.rst
index 04394f8cf..52c5ba78e 100644
--- a/doc/guides/rel_notes/release_19_05.rst
+++ b/doc/guides/rel_notes/release_19_05.rst
@@ -71,6 +71,9 @@  New Features
   pointers. The API provides MT-safe push and pop operations that can operate
   on one or more pointers per operation.
 
+  The library supports two stack implementations: lock-based and non-blocking.
+  The non-blocking implementation is currently limited to x86-64 platforms.
+
 Removed Items
 -------------
 
diff --git a/lib/librte_stack/Makefile b/lib/librte_stack/Makefile
index e956b6535..94a7c1476 100644
--- a/lib/librte_stack/Makefile
+++ b/lib/librte_stack/Makefile
@@ -18,6 +18,8 @@  LIBABIVER := 1
 SRCS-$(CONFIG_RTE_LIBRTE_STACK) := rte_stack.c
 
 # install includes
-SYMLINK-$(CONFIG_RTE_LIBRTE_STACK)-include := rte_stack.h
+SYMLINK-$(CONFIG_RTE_LIBRTE_STACK)-include := rte_stack.h \
+					      rte_stack_generic.h \
+					      rte_stack_c11_mem.h
 
 include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_stack/meson.build b/lib/librte_stack/meson.build
index 99f43710e..dec527966 100644
--- a/lib/librte_stack/meson.build
+++ b/lib/librte_stack/meson.build
@@ -5,4 +5,6 @@  allow_experimental_apis = true
 
 version = 1
 sources = files('rte_stack.c')
-headers = files('rte_stack.h')
+headers = files('rte_stack.h',
+		'rte_stack_c11_mem.h',
+		'rte_stack_generic.h')
diff --git a/lib/librte_stack/rte_stack.c b/lib/librte_stack/rte_stack.c
index a43ebb68f..f1c0b5bba 100644
--- a/lib/librte_stack/rte_stack.c
+++ b/lib/librte_stack/rte_stack.c
@@ -26,27 +26,46 @@  static struct rte_tailq_elem rte_stack_tailq = {
 EAL_REGISTER_TAILQ(rte_stack_tailq)
 
 static void
+nb_lifo_init(struct rte_stack *s, unsigned int count)
+{
+	struct rte_nb_lifo_elem *elems;
+	unsigned int i;
+
+	elems = (struct rte_nb_lifo_elem *)&s[1];
+	for (i = 0; i < count; i++)
+		__nb_lifo_push(&s->nb_lifo.free, &elems[i], &elems[i], 1);
+}
+
+static void
 lifo_init(struct rte_stack *s)
 {
 	rte_spinlock_init(&s->lifo.lock);
 }
 
 static void
-rte_stack_init(struct rte_stack *s)
+rte_stack_init(struct rte_stack *s, unsigned int count, uint32_t flags)
 {
 	memset(s, 0, sizeof(*s));
 
-	lifo_init(s);
+	if (flags & STACK_F_NB)
+		nb_lifo_init(s, count);
+	else
+		lifo_init(s);
 }
 
 static ssize_t
-rte_stack_get_memsize(unsigned int count)
+rte_stack_get_memsize(unsigned int count, uint32_t flags)
 {
 	ssize_t sz = sizeof(struct rte_stack);
 
+	if (flags & STACK_F_NB)
+		sz += RTE_CACHE_LINE_ROUNDUP(count *
+					     sizeof(struct rte_nb_lifo_elem));
+	else
+		sz += RTE_CACHE_LINE_ROUNDUP(count * sizeof(void *));
+
 	/* Add padding to avoid false sharing conflicts */
-	sz += RTE_CACHE_LINE_ROUNDUP(count * sizeof(void *)) +
-		2 * RTE_CACHE_LINE_SIZE;
+	sz += 2 * RTE_CACHE_LINE_SIZE;
 
 	return sz;
 }
@@ -63,9 +82,16 @@  rte_stack_create(const char *name, unsigned int count, int socket_id,
 	unsigned int sz;
 	int ret;
 
-	RTE_SET_USED(flags);
+#ifdef RTE_ARCH_X86_64
+	RTE_BUILD_BUG_ON(sizeof(struct rte_nb_lifo_head) != 16);
+#else
+	if (flags & STACK_F_NB) {
+		STACK_LOG_ERR("Non-blocking stack is not supported on your platform\n");
+		return NULL;
+	}
+#endif
 
-	sz = rte_stack_get_memsize(count);
+	sz = rte_stack_get_memsize(count, flags);
 
 	ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
 		       RTE_STACK_MZ_PREFIX, name);
@@ -94,7 +120,7 @@  rte_stack_create(const char *name, unsigned int count, int socket_id,
 
 	s = mz->addr;
 
-	rte_stack_init(s);
+	rte_stack_init(s, count, flags);
 
 	/* Store the name for later lookups */
 	ret = snprintf(s->name, sizeof(s->name), "%s", name);
diff --git a/lib/librte_stack/rte_stack.h b/lib/librte_stack/rte_stack.h
index da0210550..6ca175a8c 100644
--- a/lib/librte_stack/rte_stack.h
+++ b/lib/librte_stack/rte_stack.h
@@ -29,6 +29,33 @@  extern "C" {
 #define RTE_STACK_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
 			   sizeof(RTE_STACK_MZ_PREFIX) + 1)
 
+struct rte_nb_lifo_elem {
+	void *data;			/**< Data pointer */
+	struct rte_nb_lifo_elem *next;	/**< Next pointer */
+};
+
+struct rte_nb_lifo_head {
+	struct rte_nb_lifo_elem *top; /**< Stack top */
+	uint64_t cnt; /**< Modification counter for avoiding ABA problem */
+};
+
+struct rte_nb_lifo_list {
+	/** List head */
+	struct rte_nb_lifo_head head __rte_aligned(16);
+	/** List len */
+	rte_atomic64_t len;
+};
+
+/* Structure containing two non-blocking LIFO lists: the stack itself and a
+ * list of free linked-list elements.
+ */
+struct rte_nb_lifo {
+	/** LIFO list of elements */
+	struct rte_nb_lifo_list used __rte_cache_aligned;
+	/** LIFO list of free elements */
+	struct rte_nb_lifo_list free __rte_cache_aligned;
+};
+
 /* Structure containing the LIFO, its current length, and a lock for mutual
  * exclusion.
  */
@@ -48,10 +75,69 @@  struct rte_stack {
 	const struct rte_memzone *memzone;
 	uint32_t capacity; /**< Usable size of the stack */
 	uint32_t flags; /**< Flags supplied at creation */
-	struct rte_lifo lifo; /**< LIFO structure */
+	RTE_STD_C11
+	union {
+		struct rte_nb_lifo nb_lifo; /**< Non-blocking LIFO structure */
+		struct rte_lifo lifo;	    /**< LIFO structure */
+	};
 } __rte_cache_aligned;
 
 /**
+ * The stack uses non-blocking push and pop functions. This flag is only
+ * supported on x86_64 platforms, currently.
+ */
+#define STACK_F_NB 0x0001
+
+#ifdef RTE_USE_C11_MEM_MODEL
+#include "rte_stack_c11_mem.h"
+#else
+#include "rte_stack_generic.h"
+#endif
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * @internal Push several objects on the non-blocking stack (MT-safe)
+ *
+ * @param s
+ *   A pointer to the stack structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to push on the stack from the obj_table.
+ * @return
+ *   Actual number of objects enqueued.
+ */
+static __rte_always_inline unsigned int __rte_experimental
+rte_nb_lifo_push(struct rte_stack *s, void * const *obj_table, unsigned int n)
+{
+	struct rte_nb_lifo_elem *tmp, *first, *last = NULL;
+	unsigned int i;
+
+	if (unlikely(n == 0))
+		return 0;
+
+	/* Pop n free elements */
+	first = __nb_lifo_pop(&s->nb_lifo.free, n, NULL, NULL);
+	if (unlikely(first == NULL))
+		return 0;
+
+	/* Construct the list elements */
+	tmp = first;
+	for (i = 0; i < n; i++) {
+		tmp->data = obj_table[n - i - 1];
+		last = tmp;
+		tmp = tmp->next;
+	}
+
+	/* Push them to the used list */
+	__nb_lifo_push(&s->nb_lifo.used, first, last, n);
+
+	return n;
+}
+
+/**
  * @warning
  * @b EXPERIMENTAL: this API may change without prior notice
  *
@@ -109,7 +195,41 @@  rte_lifo_push(struct rte_stack *s, void * const *obj_table, unsigned int n)
 static __rte_always_inline unsigned int __rte_experimental
 rte_stack_push(struct rte_stack *s, void * const *obj_table, unsigned int n)
 {
-	return rte_lifo_push(s, obj_table, n);
+	if (s->flags & STACK_F_NB)
+		return rte_nb_lifo_push(s, obj_table, n);
+	else
+		return rte_lifo_push(s, obj_table, n);
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * @internal Pop several objects from the non-blocking stack (MT-safe)
+ *
+ * @param s
+ *   A pointer to the stack structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the stack.
+ * @return
+ *   - Actual number of objects popped.
+ */
+static __rte_always_inline unsigned int __rte_experimental
+rte_nb_lifo_pop(struct rte_stack *s, void **obj_table, unsigned int n)
+{
+	struct rte_nb_lifo_elem *first, *last = NULL;
+
+	/* Pop n used elements */
+	first = __nb_lifo_pop(&s->nb_lifo.used, n, obj_table, &last);
+	if (unlikely(first == NULL))
+		return 0;
+
+	/* Push the list elements to the free list */
+	__nb_lifo_push(&s->nb_lifo.free, first, last, n);
+
+	return n;
 }
 
 /**
@@ -173,7 +293,10 @@  rte_stack_pop(struct rte_stack *s, void **obj_table, unsigned int n)
 	if (unlikely(n == 0 || obj_table == NULL))
 		return 0;
 
-	return rte_lifo_pop(s, obj_table, n);
+	if (s->flags & STACK_F_NB)
+		return rte_nb_lifo_pop(s, obj_table, n);
+	else
+		return rte_lifo_pop(s, obj_table, n);
 }
 
 /**
@@ -190,7 +313,10 @@  rte_stack_pop(struct rte_stack *s, void **obj_table, unsigned int n)
 static __rte_always_inline unsigned int __rte_experimental
 rte_stack_count(struct rte_stack *s)
 {
-	return (unsigned int)s->lifo.len;
+	if (s->flags & STACK_F_NB)
+		return rte_nb_lifo_len(s);
+	else
+		return (unsigned int)s->lifo.len;
 }
 
 /**
@@ -228,7 +354,10 @@  rte_stack_free_count(struct rte_stack *s)
  *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
  *   constraint for the reserved zone.
  * @param flags
- *   Reserved for future use
+ *   An OR of the following:
+ *    - STACK_F_NB: If this flag is set, the stack uses non-blocking variants
+ *      of the push and pop functions. Otherwise, it achieves thread-safety
+ *      using a lock.
  * @return
  *   On success, the pointer to the new allocated stack. NULL on error with
  *    rte_errno set appropriately. Possible errno values include:
diff --git a/lib/librte_stack/rte_stack_c11_mem.h b/lib/librte_stack/rte_stack_c11_mem.h
new file mode 100644
index 000000000..c8276c530
--- /dev/null
+++ b/lib/librte_stack/rte_stack_c11_mem.h
@@ -0,0 +1,173 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+
+#ifndef _NB_LIFO_C11_MEM_H_
+#define _NB_LIFO_C11_MEM_H_
+
+#include <rte_branch_prediction.h>
+#include <rte_prefetch.h>
+
+static __rte_always_inline unsigned int
+rte_nb_lifo_len(struct rte_stack *s)
+{
+	/* nb_lifo_push() and nb_lifo_pop() do not update the list's contents
+	 * and lifo->len atomically, which can cause the list to appear shorter
+	 * than it actually is if this function is called while other threads
+	 * are modifying the list.
+	 *
+	 * However, given the inherently approximate nature of the get_count
+	 * callback -- even if the list and its size were updated atomically,
+	 * the size could change between when get_count executes and when the
+	 * value is returned to the caller -- this is acceptable.
+	 *
+	 * The lifo->len updates are placed such that the list may appear to
+	 * have fewer elements than it does, but will never appear to have more
+	 * elements. If the mempool is near-empty to the point that this is a
+	 * concern, the user should consider increasing the mempool size.
+	 */
+	return (unsigned int)__atomic_load_n(&s->nb_lifo.used.len.cnt,
+					     __ATOMIC_RELAXED);
+}
+
+static __rte_always_inline void
+__nb_lifo_push(struct rte_nb_lifo_list *lifo,
+	       struct rte_nb_lifo_elem *first,
+	       struct rte_nb_lifo_elem *last,
+	       unsigned int num)
+{
+#ifndef RTE_ARCH_X86_64
+	RTE_SET_USED(first);
+	RTE_SET_USED(last);
+	RTE_SET_USED(lifo);
+	RTE_SET_USED(num);
+#else
+	struct rte_nb_lifo_head old_head;
+	int success;
+
+	old_head = lifo->head;
+
+	do {
+		struct rte_nb_lifo_head new_head;
+
+		/* Swing the top pointer to the first element in the list and
+		 * make the last element point to the old top.
+		 */
+		new_head.top = first;
+		new_head.cnt = old_head.cnt + 1;
+
+		last->next = old_head.top;
+
+		/* Use the release memmodel to ensure the writes to the NB LIFO
+		 * elements are visible before the head pointer write.
+		 */
+		success = rte_atomic128_cmpxchg((rte_int128_t *)&lifo->head,
+						(rte_int128_t *)&old_head,
+						(rte_int128_t *)&new_head,
+						1, __ATOMIC_RELEASE,
+						__ATOMIC_RELAXED);
+	} while (success == 0);
+
+	/* Ensure the stack modifications are not reordered with respect
+	 * to the LIFO len update.
+	 */
+	__atomic_add_fetch(&lifo->len.cnt, num, __ATOMIC_RELEASE);
+#endif
+}
+
+static __rte_always_inline struct rte_nb_lifo_elem *
+__nb_lifo_pop(struct rte_nb_lifo_list *lifo,
+	      unsigned int num,
+	      void **obj_table,
+	      struct rte_nb_lifo_elem **last)
+{
+#ifndef RTE_ARCH_X86_64
+	RTE_SET_USED(obj_table);
+	RTE_SET_USED(last);
+	RTE_SET_USED(lifo);
+	RTE_SET_USED(num);
+
+	return NULL;
+#else
+	struct rte_nb_lifo_head old_head;
+	int success;
+
+	/* Reserve num elements, if available */
+	while (1) {
+		uint64_t len = __atomic_load_n(&lifo->len.cnt,
+					       __ATOMIC_ACQUIRE);
+
+		/* Does the list contain enough elements? */
+		if (unlikely(len < num))
+			return NULL;
+
+		if (__atomic_compare_exchange_n(&lifo->len.cnt,
+						&len, len - num,
+						0, __ATOMIC_RELAXED,
+						__ATOMIC_RELAXED))
+			break;
+	}
+
+#ifndef RTE_ARCH_X86_64
+	/* Use the acquire memmodel to ensure the reads to the NB LIFO elements
+	 * are properly ordered with respect to the head pointer read.
+	 *
+	 * Note that for aarch64, GCC's implementation of __atomic_load_16 in
+	 * libatomic uses locks, and so this function should be replaced by
+	 * a new function (e.g. "rte_atomic128_load()").
+	 */
+	__atomic_load((volatile __int128 *)&lifo->head,
+		      &old_head,
+		      __ATOMIC_ACQUIRE);
+#else
+	/* x86-64 does not require an atomic load here; if a torn read occurs,
+	 * the CAS will fail and set old_head to the correct/latest value.
+	 */
+	old_head = lifo->head;
+#endif
+
+	/* Pop num elements */
+	do {
+		struct rte_nb_lifo_head new_head;
+		struct rte_nb_lifo_elem *tmp;
+		unsigned int i;
+
+		rte_prefetch0(old_head.top);
+
+		tmp = old_head.top;
+
+		/* Traverse the list to find the new head. A next pointer will
+		 * either point to another element or NULL; if a thread
+		 * encounters a pointer that has already been popped, the CAS
+		 * will fail.
+		 */
+		for (i = 0; i < num && tmp != NULL; i++) {
+			rte_prefetch0(tmp->next);
+			if (obj_table)
+				obj_table[i] = tmp->data;
+			if (last)
+				*last = tmp;
+			tmp = tmp->next;
+		}
+
+		/* If NULL was encountered, the list was modified while
+		 * traversing it. Retry.
+		 */
+		if (i != num)
+			continue;
+
+		new_head.top = tmp;
+		new_head.cnt = old_head.cnt + 1;
+
+		success = rte_atomic128_cmpxchg((rte_int128_t *)&lifo->head,
+						(rte_int128_t *)&old_head,
+						(rte_int128_t *)&new_head,
+						1, __ATOMIC_ACQUIRE,
+						__ATOMIC_ACQUIRE);
+	} while (success == 0);
+
+	return old_head.top;
+#endif
+}
+
+#endif /* _NB_LIFO_C11_MEM_H_ */
diff --git a/lib/librte_stack/rte_stack_generic.h b/lib/librte_stack/rte_stack_generic.h
new file mode 100644
index 000000000..7d8570b34
--- /dev/null
+++ b/lib/librte_stack/rte_stack_generic.h
@@ -0,0 +1,157 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+
+#ifndef _NB_LIFO_GENERIC_H_
+#define _NB_LIFO_GENERIC_H_
+
+#include <rte_branch_prediction.h>
+#include <rte_prefetch.h>
+
+static __rte_always_inline unsigned int
+rte_nb_lifo_len(struct rte_stack *s)
+{
+	/* nb_lifo_push() and nb_lifo_pop() do not update the list's contents
+	 * and nb_lifo->len atomically, which can cause the list to appear
+	 * shorter than it actually is if this function is called while other
+	 * threads are modifying the list.
+	 *
+	 * However, given the inherently approximate nature of the get_count
+	 * callback -- even if the list and its size were updated atomically,
+	 * the size could change between when get_count executes and when the
+	 * value is returned to the caller -- this is acceptable.
+	 *
+	 * The nb_lifo->len updates are placed such that the list may appear to
+	 * have fewer elements than it does, but will never appear to have more
+	 * elements. If the mempool is near-empty to the point that this is a
+	 * concern, the user should consider increasing the mempool size.
+	 */
+	return (unsigned int)rte_atomic64_read(&s->nb_lifo.used.len);
+}
+
+static __rte_always_inline void
+__nb_lifo_push(struct rte_nb_lifo_list *lifo,
+	       struct rte_nb_lifo_elem *first,
+	       struct rte_nb_lifo_elem *last,
+	       unsigned int num)
+{
+#ifndef RTE_ARCH_X86_64
+	RTE_SET_USED(first);
+	RTE_SET_USED(last);
+	RTE_SET_USED(lifo);
+	RTE_SET_USED(num);
+#else
+	struct rte_nb_lifo_head old_head;
+	int success;
+
+	old_head = lifo->head;
+
+	do {
+		struct rte_nb_lifo_head new_head;
+
+		/* Swing the top pointer to the first element in the list and
+		 * make the last element point to the old top.
+		 */
+		new_head.top = first;
+		new_head.cnt = old_head.cnt + 1;
+
+		last->next = old_head.top;
+
+		/* Ensure the list entry writes are visible before pushing them
+		 * to the stack.
+		 */
+		rte_wmb();
+
+		/* old_head is updated on failure */
+		success = rte_atomic128_cmpxchg((rte_int128_t *)&lifo->head,
+					       (rte_int128_t *)&old_head,
+					       (rte_int128_t *)&new_head,
+					       1, __ATOMIC_RELEASE,
+					       __ATOMIC_RELAXED);
+	} while (success == 0);
+
+	rte_atomic64_add(&lifo->len, num);
+#endif
+}
+
+static __rte_always_inline struct rte_nb_lifo_elem *
+__nb_lifo_pop(struct rte_nb_lifo_list *lifo,
+	      unsigned int num,
+	      void **obj_table,
+	      struct rte_nb_lifo_elem **last)
+{
+#ifndef RTE_ARCH_X86_64
+	RTE_SET_USED(obj_table);
+	RTE_SET_USED(last);
+	RTE_SET_USED(lifo);
+	RTE_SET_USED(num);
+
+	return NULL;
+#else
+	struct rte_nb_lifo_head old_head;
+	int success;
+
+	/* Reserve num elements, if available */
+	while (1) {
+		uint64_t len = rte_atomic64_read(&lifo->len);
+
+		/* Does the list contain enough elements? */
+		if (unlikely(len < num))
+			return NULL;
+
+		if (rte_atomic64_cmpset((volatile uint64_t *)&lifo->len,
+					len, len - num))
+			break;
+	}
+
+	old_head = lifo->head;
+
+	/* Pop num elements */
+	do {
+		struct rte_nb_lifo_head new_head;
+		struct rte_nb_lifo_elem *tmp;
+		unsigned int i;
+
+		rte_prefetch0(old_head.top);
+
+		tmp = old_head.top;
+
+		/* Traverse the list to find the new head. A next pointer will
+		 * either point to another element or NULL; if a thread
+		 * encounters a pointer that has already been popped, the CAS
+		 * will fail.
+		 */
+		for (i = 0; i < num && tmp != NULL; i++) {
+			rte_prefetch0(tmp->next);
+			if (obj_table)
+				obj_table[i] = tmp->data;
+			if (last)
+				*last = tmp;
+			tmp = tmp->next;
+		}
+
+		/* If NULL was encountered, the list was modified while
+		 * traversing it. Retry.
+		 */
+		if (i != num)
+			continue;
+
+		/* Ensure the list reads occur before popping the list */
+		rte_rmb();
+
+		new_head.top = tmp;
+		new_head.cnt = old_head.cnt + 1;
+
+		/* old_head is updated on failure */
+		success = rte_atomic128_cmpxchg((rte_int128_t *)&lifo->head,
+						(rte_int128_t *)&old_head,
+						(rte_int128_t *)&new_head,
+						1, __ATOMIC_ACQUIRE,
+						__ATOMIC_ACQUIRE);
+	} while (success == 0);
+
+	return old_head.top;
+#endif
+}
+
+#endif /* _NB_LIFO_GENERIC_H_ */