[2/3] mempool/nb_stack: add non-blocking stack mempool

Message ID 20190110205538.24435-3-gage.eads@intel.com
State Superseded
Delegated to: Thomas Monjalon
Headers show
Series
  • Add non-blocking stack mempool handler
Related show

Checks

Context Check Description
ci/Intel-compilation fail Compilation issues
ci/checkpatch success coding style OK

Commit Message

Eads, Gage Jan. 10, 2019, 8:55 p.m.
This commit adds support for non-blocking (linked list based) stack mempool
handler. The stack uses a 128-bit compare-and-swap instruction, and thus is
limited to x86_64. The 128-bit CAS atomically updates the stack top pointer
and a modification counter, which protects against the ABA problem.

In mempool_perf_autotest the lock-based stack outperforms the non-blocking
handler*, however:
- For applications with preemptible pthreads, a lock-based stack's
  worst-case performance (i.e. one thread being preempted while
  holding the spinlock) is much worse than the non-blocking stack's.
- Using per-thread mempool caches will largely mitigate the performance
  difference.

*Test setup: x86_64 build with default config, dual-socket Xeon E5-2699 v4,
running on isolcpus cores with a tickless scheduler. The lock-based stack's
rate_persec was 1x-3.5x the non-blocking stack's.

Signed-off-by: Gage Eads <gage.eads@intel.com>
---
 MAINTAINERS                                        |   4 +
 config/common_base                                 |   1 +
 drivers/mempool/Makefile                           |   1 +
 drivers/mempool/nb_stack/Makefile                  |  30 +++++
 drivers/mempool/nb_stack/meson.build               |   4 +
 drivers/mempool/nb_stack/nb_lifo.h                 | 132 +++++++++++++++++++++
 drivers/mempool/nb_stack/rte_mempool_nb_stack.c    | 125 +++++++++++++++++++
 .../nb_stack/rte_mempool_nb_stack_version.map      |   4 +
 mk/rte.app.mk                                      |   1 +
 9 files changed, 302 insertions(+)
 create mode 100644 drivers/mempool/nb_stack/Makefile
 create mode 100644 drivers/mempool/nb_stack/meson.build
 create mode 100644 drivers/mempool/nb_stack/nb_lifo.h
 create mode 100644 drivers/mempool/nb_stack/rte_mempool_nb_stack.c
 create mode 100644 drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map

Comments

Andrew Rybchenko Jan. 13, 2019, 1:31 p.m. | #1
Hi Gage,

In general looks very good.

Have you considered to make nb_lifo.h a library to be reusable outside
of the mempool driver?

There are few notes below.

Thanks,
Andrew.

On 1/10/19 11:55 PM, Gage Eads wrote:
> This commit adds support for non-blocking (linked list based) stack mempool
> handler. The stack uses a 128-bit compare-and-swap instruction, and thus is
> limited to x86_64. The 128-bit CAS atomically updates the stack top pointer
> and a modification counter, which protects against the ABA problem.
>
> In mempool_perf_autotest the lock-based stack outperforms the non-blocking
> handler*, however:
> - For applications with preemptible pthreads, a lock-based stack's
>    worst-case performance (i.e. one thread being preempted while
>    holding the spinlock) is much worse than the non-blocking stack's.
> - Using per-thread mempool caches will largely mitigate the performance
>    difference.
>
> *Test setup: x86_64 build with default config, dual-socket Xeon E5-2699 v4,
> running on isolcpus cores with a tickless scheduler. The lock-based stack's
> rate_persec was 1x-3.5x the non-blocking stack's.
>
> Signed-off-by: Gage Eads <gage.eads@intel.com>
> ---
>   MAINTAINERS                                        |   4 +
>   config/common_base                                 |   1 +
>   drivers/mempool/Makefile                           |   1 +
>   drivers/mempool/nb_stack/Makefile                  |  30 +++++
>   drivers/mempool/nb_stack/meson.build               |   4 +
>   drivers/mempool/nb_stack/nb_lifo.h                 | 132 +++++++++++++++++++++
>   drivers/mempool/nb_stack/rte_mempool_nb_stack.c    | 125 +++++++++++++++++++
>   .../nb_stack/rte_mempool_nb_stack_version.map      |   4 +
>   mk/rte.app.mk                                      |   1 +
>   9 files changed, 302 insertions(+)
>   create mode 100644 drivers/mempool/nb_stack/Makefile
>   create mode 100644 drivers/mempool/nb_stack/meson.build
>   create mode 100644 drivers/mempool/nb_stack/nb_lifo.h
>   create mode 100644 drivers/mempool/nb_stack/rte_mempool_nb_stack.c
>   create mode 100644 drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map
>
> diff --git a/MAINTAINERS b/MAINTAINERS
> index 470f36b9c..5519d3323 100644
> --- a/MAINTAINERS
> +++ b/MAINTAINERS
> @@ -416,6 +416,10 @@ M: Artem V. Andreev <artem.andreev@oktetlabs.ru>
>   M: Andrew Rybchenko <arybchenko@solarflare.com>
>   F: drivers/mempool/bucket/
>   
> +Non-blocking stack memory pool
> +M: Gage Eads <gage.eads@intel.com>
> +F: drivers/mempool/nb_stack/
> +
>   
>   Bus Drivers
>   -----------
> diff --git a/config/common_base b/config/common_base
> index 964a6956e..40ce47312 100644
> --- a/config/common_base
> +++ b/config/common_base
> @@ -728,6 +728,7 @@ CONFIG_RTE_DRIVER_MEMPOOL_BUCKET=y
>   CONFIG_RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB=64
>   CONFIG_RTE_DRIVER_MEMPOOL_RING=y
>   CONFIG_RTE_DRIVER_MEMPOOL_STACK=y
> +CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK=y

Typically it is alphabetically sorted.

>   #
>   # Compile PMD for octeontx fpa mempool device
> diff --git a/drivers/mempool/Makefile b/drivers/mempool/Makefile
> index 28c2e8360..aeae3ac12 100644
> --- a/drivers/mempool/Makefile
> +++ b/drivers/mempool/Makefile
> @@ -13,5 +13,6 @@ endif
>   DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_RING) += ring
>   DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK) += stack
>   DIRS-$(CONFIG_RTE_LIBRTE_OCTEONTX_MEMPOOL) += octeontx
> +DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) += nb_stack

Typically it is alphabetically sorted. Yes, already broken, but, please, 
put it before ring.

>   
>   include $(RTE_SDK)/mk/rte.subdir.mk
> diff --git a/drivers/mempool/nb_stack/Makefile b/drivers/mempool/nb_stack/Makefile
> new file mode 100644
> index 000000000..38b45f4f5
> --- /dev/null
> +++ b/drivers/mempool/nb_stack/Makefile
> @@ -0,0 +1,30 @@
> +# SPDX-License-Identifier: BSD-3-Clause
> +# Copyright(c) 2019 Intel Corporation
> +
> +include $(RTE_SDK)/mk/rte.vars.mk
> +
> +# The non-blocking stack uses a 128-bit compare-and-swap instruction, and thus
> +# is limited to x86_64.
> +ifeq ($(CONFIG_RTE_ARCH_X86_64),y)
> +
> +#
> +# library name
> +#
> +LIB = librte_mempool_nb_stack.a
> +
> +CFLAGS += -O3
> +CFLAGS += $(WERROR_FLAGS)
> +
> +# Headers
> +CFLAGS += -I$(RTE_SDK)/lib/librte_mempool

I guess it is derived from stack. Is it really required? There is no 
such line
in ring, bucket, octeontx and dpaa2.

> +LDLIBS += -lrte_eal -lrte_mempool
> +
> +EXPORT_MAP := rte_mempool_nb_stack_version.map
> +
> +LIBABIVER := 1
> +
> +SRCS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) += rte_mempool_nb_stack.c
> +
> +endif
> +
> +include $(RTE_SDK)/mk/rte.lib.mk
> diff --git a/drivers/mempool/nb_stack/meson.build b/drivers/mempool/nb_stack/meson.build
> new file mode 100644
> index 000000000..66d64a9ba
> --- /dev/null
> +++ b/drivers/mempool/nb_stack/meson.build
> @@ -0,0 +1,4 @@
> +# SPDX-License-Identifier: BSD-3-Clause
> +# Copyright(c) 2019 Intel Corporation
> +
> +sources = files('rte_mempool_nb_stack.c')

Have no tested meson build for non-x86_64 target?
I guess it should be fixed to skip it on non-x86_64 builds.

> diff --git a/drivers/mempool/nb_stack/nb_lifo.h b/drivers/mempool/nb_stack/nb_lifo.h
> new file mode 100644
> index 000000000..701d75e37
> --- /dev/null
> +++ b/drivers/mempool/nb_stack/nb_lifo.h
> @@ -0,0 +1,132 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2019 Intel Corporation
> + */
> +
> +#ifndef _NB_LIFO_H_
> +#define _NB_LIFO_H_
> +
> +struct nb_lifo_elem {
> +	void *data;
> +	struct nb_lifo_elem *next;
> +};
> +
> +struct nb_lifo_head {
> +	struct nb_lifo_elem *top; /**< Stack top */
> +	uint64_t cnt; /**< Modification counter */
> +};
> +
> +struct nb_lifo {
> +	volatile struct nb_lifo_head head __rte_aligned(16);
> +	rte_atomic64_t len;
> +} __rte_cache_aligned;
> +
> +static __rte_always_inline void
> +nb_lifo_init(struct nb_lifo *lifo)
> +{
> +	memset(lifo, 0, sizeof(*lifo));
> +	rte_atomic64_set(&lifo->len, 0);
> +}
> +
> +static __rte_always_inline unsigned int
> +nb_lifo_len(struct nb_lifo *lifo)
> +{
> +	return (unsigned int) rte_atomic64_read(&lifo->len);
> +}
> +
> +static __rte_always_inline void
> +nb_lifo_push(struct nb_lifo *lifo,
> +	     struct nb_lifo_elem *first,
> +	     struct nb_lifo_elem *last,
> +	     unsigned int num)
> +{
> +	while (1) {
> +		struct nb_lifo_head old_head, new_head;
> +
> +		old_head = lifo->head;
> +
> +		/* Swing the top pointer to the first element in the list and
> +		 * make the last element point to the old top.
> +		 */
> +		new_head.top = first;
> +		new_head.cnt = old_head.cnt + 1;
> +
> +		last->next = old_head.top;
> +
> +		if (rte_atomic128_cmpset((volatile uint64_t *) &lifo->head,
> +					 (uint64_t *)&old_head,
> +					 (uint64_t *)&new_head))
> +			break;
> +	}
> +
> +	rte_atomic64_add(&lifo->len, num);

I'd like to understand why it is not a problem that change of the list and
increase its length are not atomic. So, we can get wrong length of the stack
in the middle. It would be good to explain it in the comment.

> +}
> +
> +static __rte_always_inline void
> +nb_lifo_push_single(struct nb_lifo *lifo, struct nb_lifo_elem *elem)
> +{
> +	nb_lifo_push(lifo, elem, elem, 1);
> +}
> +
> +static __rte_always_inline struct nb_lifo_elem *
> +nb_lifo_pop(struct nb_lifo *lifo,
> +	    unsigned int num,
> +	    void **obj_table,
> +	    struct nb_lifo_elem **last)
> +{
> +	struct nb_lifo_head old_head;
> +
> +	/* Reserve num elements, if available */
> +	while (1) {
> +		uint64_t len = rte_atomic64_read(&lifo->len);
> +
> +		/* Does the list contain enough elements? */
> +		if (len < num)
> +			return NULL;
> +
> +		if (rte_atomic64_cmpset((volatile uint64_t *)&lifo->len,
> +					len, len - num))
> +			break;
> +	}
> +
> +	/* Pop num elements */
> +	while (1) {
> +		struct nb_lifo_head new_head;
> +		struct nb_lifo_elem *tmp;
> +		unsigned int i;
> +
> +		old_head = lifo->head;
> +
> +		tmp = old_head.top;
> +
> +		/* Traverse the list to find the new head. A next pointer will
> +		 * either point to another element or NULL; if a thread
> +		 * encounters a pointer that has already been popped, the CAS
> +		 * will fail.
> +		 */
> +		for (i = 0; i < num && tmp != NULL; i++) {
> +			if (obj_table)
> +				obj_table[i] = tmp->data;
> +			if (last)
> +				*last = tmp;

Isn't it better to do obj_table and last assignment later when
we successfully reserved elements? If there is not retries,
current solution is optimal, but I guess solution with the second
traversal to fill in obj_table will show more stable performance
results under high load when many retries are done.

> +			tmp = tmp->next;
> +		}
> +
> +		/* If NULL was encountered, the list was modified while
> +		 * traversing it. Retry.
> +		 */
> +		if (i != num)
> +			continue;
> +
> +		new_head.top = tmp;
> +		new_head.cnt = old_head.cnt + 1;
> +
> +		if (rte_atomic128_cmpset((volatile uint64_t *) &lifo->head,
> +					 (uint64_t *)&old_head,
> +					 (uint64_t *)&new_head))
> +			break;
> +	}
> +
> +	return old_head.top;
> +}
> +
> +#endif /* _NB_LIFO_H_ */
> diff --git a/drivers/mempool/nb_stack/rte_mempool_nb_stack.c b/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
> new file mode 100644
> index 000000000..1b30775f7
> --- /dev/null
> +++ b/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
> @@ -0,0 +1,125 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2019 Intel Corporation
> + */
> +
> +#include <stdio.h>
> +#include <rte_mempool.h>
> +#include <rte_malloc.h>
> +
> +#include "nb_lifo.h"
> +
> +struct rte_mempool_nb_stack {
> +	uint64_t size;
> +	struct nb_lifo used_lifo; /**< LIFO containing mempool pointers  */
> +	struct nb_lifo free_lifo; /**< LIFO containing unused LIFO elements */
> +};
> +
> +static int
> +nb_stack_alloc(struct rte_mempool *mp)
> +{
> +	struct rte_mempool_nb_stack *s;
> +	struct nb_lifo_elem *elems;
> +	unsigned int n = mp->size;
> +	unsigned int size, i;
> +
> +	size = sizeof(*s) + n * sizeof(struct nb_lifo_elem);
> +
> +	/* Allocate our local memory structure */
> +	s = rte_zmalloc_socket("mempool-nb_stack",
> +			       size,
> +			       RTE_CACHE_LINE_SIZE,
> +			       mp->socket_id);
> +	if (s == NULL) {
> +		RTE_LOG(ERR, MEMPOOL, "Cannot allocate nb_stack!\n");
> +		return -ENOMEM;
> +	}
> +
> +	s->size = n;
> +
> +	nb_lifo_init(&s->used_lifo);
> +	nb_lifo_init(&s->free_lifo);
> +
> +	elems = (struct nb_lifo_elem *) &s[1];

Does checkpatch.sh generate warning here because of space after type cast?
There are few similar cases in the patch.

> +	for (i = 0; i < n; i++)
> +		nb_lifo_push_single(&s->free_lifo, &elems[i]);
> +
> +	mp->pool_data = s;
> +
> +	return 0;
> +}
> +
> +static int
> +nb_stack_enqueue(struct rte_mempool *mp, void * const *obj_table,
> +		 unsigned int n)
> +{
> +	struct rte_mempool_nb_stack *s = mp->pool_data;
> +	struct nb_lifo_elem *first, *last, *tmp;
> +	unsigned int i;
> +
> +	if (unlikely(n == 0))
> +		return 0;
> +
> +	/* Pop n free elements */
> +	first = nb_lifo_pop(&s->free_lifo, n, NULL, NULL);
> +	if (unlikely(!first))

Just a nit, but as far as I know comparison with NULL is typically used 
in DPDK.
(few cases in the patch)

> +		return -ENOBUFS;
> +
> +	/* Prepare the list elements */
> +	tmp = first;
> +	for (i = 0; i < n; i++) {
> +		tmp->data = obj_table[i];
> +		last = tmp;
> +		tmp = tmp->next;
> +	}
> +
> +	/* Enqueue them to the used list */
> +	nb_lifo_push(&s->used_lifo, first, last, n);
> +
> +	return 0;
> +}
> +
> +static int
> +nb_stack_dequeue(struct rte_mempool *mp, void **obj_table,
> +		 unsigned int n)
> +{
> +	struct rte_mempool_nb_stack *s = mp->pool_data;
> +	struct nb_lifo_elem *first, *last;
> +
> +	if (unlikely(n == 0))
> +		return 0;
> +
> +	/* Pop n used elements */
> +	first = nb_lifo_pop(&s->used_lifo, n, obj_table, &last);
> +	if (unlikely(!first))
> +		return -ENOENT;
> +
> +	/* Enqueue the list elements to the free list */
> +	nb_lifo_push(&s->free_lifo, first, last, n);
> +
> +	return 0;
> +}
> +
> +static unsigned
> +nb_stack_get_count(const struct rte_mempool *mp)
> +{
> +	struct rte_mempool_nb_stack *s = mp->pool_data;
> +
> +	return nb_lifo_len(&s->used_lifo);
> +}
> +
> +static void
> +nb_stack_free(struct rte_mempool *mp)
> +{
> +	rte_free((void *)(mp->pool_data));

I think type cast is not required.

> +}
> +
> +static struct rte_mempool_ops ops_nb_stack = {
> +	.name = "nb_stack",
> +	.alloc = nb_stack_alloc,
> +	.free = nb_stack_free,
> +	.enqueue = nb_stack_enqueue,
> +	.dequeue = nb_stack_dequeue,
> +	.get_count = nb_stack_get_count
> +};
> +
> +MEMPOOL_REGISTER_OPS(ops_nb_stack);
> diff --git a/drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map b/drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map
> new file mode 100644
> index 000000000..fc8c95e91
> --- /dev/null
> +++ b/drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map
> @@ -0,0 +1,4 @@
> +DPDK_19.05 {
> +
> +	local: *;
> +};
> diff --git a/mk/rte.app.mk b/mk/rte.app.mk
> index 02e8b6f05..0b11d9417 100644
> --- a/mk/rte.app.mk
> +++ b/mk/rte.app.mk
> @@ -133,6 +133,7 @@ ifeq ($(CONFIG_RTE_BUILD_SHARED_LIB),n)
>   
>   _LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_BUCKET) += -lrte_mempool_bucket
>   _LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK)  += -lrte_mempool_stack
> +_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK)  += -lrte_mempool_nb_stack

It is better to sort it alphabetically.

>   ifeq ($(CONFIG_RTE_LIBRTE_DPAA_BUS),y)
>   _LDLIBS-$(CONFIG_RTE_LIBRTE_DPAA_MEMPOOL)   += -lrte_mempool_dpaa
>   endif
Eads, Gage Jan. 14, 2019, 4:22 p.m. | #2
> -----Original Message-----
> From: Andrew Rybchenko [mailto:arybchenko@solarflare.com]
> Sent: Sunday, January 13, 2019 7:31 AM
> To: Eads, Gage <gage.eads@intel.com>; dev@dpdk.org
> Cc: olivier.matz@6wind.com; Richardson, Bruce <bruce.richardson@intel.com>;
> Ananyev, Konstantin <konstantin.ananyev@intel.com>
> Subject: Re: [PATCH 2/3] mempool/nb_stack: add non-blocking stack mempool
> 
> Hi Gage,
> 
> In general looks very good.
> 
> Have you considered to make nb_lifo.h a library to be reusable outside of the
> mempool driver?

I'm certainly open to it, if the community can benefit. Due to the difficulty of adding a new lib/ directory (I believe this requires Tech Board approval), I'll defer that work to a separate patchset.

> 
> There are few notes below.
> 
> Thanks,
> Andrew.
> 
> On 1/10/19 11:55 PM, Gage Eads wrote:
> 
> 
> 	This commit adds support for non-blocking (linked list based) stack
> mempool
> 	handler. The stack uses a 128-bit compare-and-swap instruction, and
> thus is
> 	limited to x86_64. The 128-bit CAS atomically updates the stack top
> pointer
> 	and a modification counter, which protects against the ABA problem.
> 
> 	In mempool_perf_autotest the lock-based stack outperforms the non-
> blocking
> 	handler*, however:
> 	- For applications with preemptible pthreads, a lock-based stack's
> 	  worst-case performance (i.e. one thread being preempted while
> 	  holding the spinlock) is much worse than the non-blocking stack's.
> 	- Using per-thread mempool caches will largely mitigate the
> performance
> 	  difference.
> 
> 	*Test setup: x86_64 build with default config, dual-socket Xeon E5-2699
> v4,
> 	running on isolcpus cores with a tickless scheduler. The lock-based
> stack's
> 	rate_persec was 1x-3.5x the non-blocking stack's.
> 
> 	Signed-off-by: Gage Eads <gage.eads@intel.com>
> <mailto:gage.eads@intel.com>
> 	---
> 	 MAINTAINERS                                        |   4 +
> 	 config/common_base                                 |   1 +
> 	 drivers/mempool/Makefile                           |   1 +
> 	 drivers/mempool/nb_stack/Makefile                  |  30 +++++
> 	 drivers/mempool/nb_stack/meson.build               |   4 +
> 	 drivers/mempool/nb_stack/nb_lifo.h                 | 132
> +++++++++++++++++++++
> 	 drivers/mempool/nb_stack/rte_mempool_nb_stack.c    | 125
> +++++++++++++++++++
> 	 .../nb_stack/rte_mempool_nb_stack_version.map      |   4 +
> 	 mk/rte.app.mk                                      |   1 +
> 	 9 files changed, 302 insertions(+)
> 	 create mode 100644 drivers/mempool/nb_stack/Makefile
> 	 create mode 100644 drivers/mempool/nb_stack/meson.build
> 	 create mode 100644 drivers/mempool/nb_stack/nb_lifo.h
> 	 create mode 100644
> drivers/mempool/nb_stack/rte_mempool_nb_stack.c
> 	 create mode 100644
> drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map
> 
> 	diff --git a/MAINTAINERS b/MAINTAINERS
> 	index 470f36b9c..5519d3323 100644
> 	--- a/MAINTAINERS
> 	+++ b/MAINTAINERS
> 	@@ -416,6 +416,10 @@ M: Artem V. Andreev
> <artem.andreev@oktetlabs.ru> <mailto:artem.andreev@oktetlabs.ru>
> 	 M: Andrew Rybchenko <arybchenko@solarflare.com>
> <mailto:arybchenko@solarflare.com>
> 	 F: drivers/mempool/bucket/
> 
> 	+Non-blocking stack memory pool
> 	+M: Gage Eads <gage.eads@intel.com> <mailto:gage.eads@intel.com>
> 	+F: drivers/mempool/nb_stack/
> 	+
> 
> 	 Bus Drivers
> 	 -----------
> 	diff --git a/config/common_base b/config/common_base
> 	index 964a6956e..40ce47312 100644
> 	--- a/config/common_base
> 	+++ b/config/common_base
> 	@@ -728,6 +728,7 @@ CONFIG_RTE_DRIVER_MEMPOOL_BUCKET=y
> 	 CONFIG_RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB=64
> 	 CONFIG_RTE_DRIVER_MEMPOOL_RING=y
> 	 CONFIG_RTE_DRIVER_MEMPOOL_STACK=y
> 	+CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK=y
> 
> 
> Typically it is alphabetically sorted.

Will fix.

> 
> 
> 
> 	 #
> 	 # Compile PMD for octeontx fpa mempool device
> 	diff --git a/drivers/mempool/Makefile b/drivers/mempool/Makefile
> 	index 28c2e8360..aeae3ac12 100644
> 	--- a/drivers/mempool/Makefile
> 	+++ b/drivers/mempool/Makefile
> 	@@ -13,5 +13,6 @@ endif
> 	 DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_RING) += ring
> 	 DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK) += stack
> 	 DIRS-$(CONFIG_RTE_LIBRTE_OCTEONTX_MEMPOOL) += octeontx
> 	+DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) += nb_stack
> 
> 
> Typically it is alphabetically sorted. Yes, already broken, but, please, put it
> before ring.
> 

Sure, will do.

> 
> 
> 
> 	 include $(RTE_SDK)/mk/rte.subdir.mk
> 	diff --git a/drivers/mempool/nb_stack/Makefile
> b/drivers/mempool/nb_stack/Makefile
> 	new file mode 100644
> 	index 000000000..38b45f4f5
> 	--- /dev/null
> 	+++ b/drivers/mempool/nb_stack/Makefile
> 	@@ -0,0 +1,30 @@
> 	+# SPDX-License-Identifier: BSD-3-Clause
> 	+# Copyright(c) 2019 Intel Corporation
> 	+
> 	+include $(RTE_SDK)/mk/rte.vars.mk
> 	+
> 	+# The non-blocking stack uses a 128-bit compare-and-swap instruction,
> and thus
> 	+# is limited to x86_64.
> 	+ifeq ($(CONFIG_RTE_ARCH_X86_64),y)
> 	+
> 	+#
> 	+# library name
> 	+#
> 	+LIB = librte_mempool_nb_stack.a
> 	+
> 	+CFLAGS += -O3
> 	+CFLAGS += $(WERROR_FLAGS)
> 	+
> 	+# Headers
> 	+CFLAGS += -I$(RTE_SDK)/lib/librte_mempool
> 
> 
> I guess it is derived from stack. Is it really required? There is no such line in ring,
> bucket, octeontx and dpaa2.
> 
> 

Good guess :). No, I don't believe so -- I'll remove this.

> 
> 	+LDLIBS += -lrte_eal -lrte_mempool
> 	+
> 	+EXPORT_MAP := rte_mempool_nb_stack_version.map
> 	+
> 	+LIBABIVER := 1
> 	+
> 	+SRCS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) +=
> rte_mempool_nb_stack.c
> 	+
> 	+endif
> 	+
> 	+include $(RTE_SDK)/mk/rte.lib.mk
> 	diff --git a/drivers/mempool/nb_stack/meson.build
> b/drivers/mempool/nb_stack/meson.build
> 	new file mode 100644
> 	index 000000000..66d64a9ba
> 	--- /dev/null
> 	+++ b/drivers/mempool/nb_stack/meson.build
> 	@@ -0,0 +1,4 @@
> 	+# SPDX-License-Identifier: BSD-3-Clause
> 	+# Copyright(c) 2019 Intel Corporation
> 	+
> 	+sources = files('rte_mempool_nb_stack.c')
> 
> 
> Have no tested meson build for non-x86_64 target?
> I guess it should be fixed to skip it on non-x86_64 builds.
> 

I did not -- my mistake. I'll correct this.

> 
> 
> 
> 	diff --git a/drivers/mempool/nb_stack/nb_lifo.h
> b/drivers/mempool/nb_stack/nb_lifo.h
> 	new file mode 100644
> 	index 000000000..701d75e37
> 	--- /dev/null
> 	+++ b/drivers/mempool/nb_stack/nb_lifo.h
> 	@@ -0,0 +1,132 @@
> 	+/* SPDX-License-Identifier: BSD-3-Clause
> 	+ * Copyright(c) 2019 Intel Corporation
> 	+ */
> 	+
> 	+#ifndef _NB_LIFO_H_
> 	+#define _NB_LIFO_H_
> 	+
> 	+struct nb_lifo_elem {
> 	+	void *data;
> 	+	struct nb_lifo_elem *next;
> 	+};
> 	+
> 	+struct nb_lifo_head {
> 	+	struct nb_lifo_elem *top; /**< Stack top */
> 	+	uint64_t cnt; /**< Modification counter */
> 	+};
> 	+
> 	+struct nb_lifo {
> 	+	volatile struct nb_lifo_head head __rte_aligned(16);
> 	+	rte_atomic64_t len;
> 	+} __rte_cache_aligned;
> 	+
> 	+static __rte_always_inline void
> 	+nb_lifo_init(struct nb_lifo *lifo)
> 	+{
> 	+	memset(lifo, 0, sizeof(*lifo));
> 	+	rte_atomic64_set(&lifo->len, 0);
> 	+}
> 	+
> 	+static __rte_always_inline unsigned int
> 	+nb_lifo_len(struct nb_lifo *lifo)
> 	+{
> 	+	return (unsigned int) rte_atomic64_read(&lifo->len);
> 	+}
> 	+
> 	+static __rte_always_inline void
> 	+nb_lifo_push(struct nb_lifo *lifo,
> 	+	     struct nb_lifo_elem *first,
> 	+	     struct nb_lifo_elem *last,
> 	+	     unsigned int num)
> 	+{
> 	+	while (1) {
> 	+		struct nb_lifo_head old_head, new_head;
> 	+
> 	+		old_head = lifo->head;
> 	+
> 	+		/* Swing the top pointer to the first element in the list
> and
> 	+		 * make the last element point to the old top.
> 	+		 */
> 	+		new_head.top = first;
> 	+		new_head.cnt = old_head.cnt + 1;
> 	+
> 	+		last->next = old_head.top;
> 	+
> 	+		if (rte_atomic128_cmpset((volatile uint64_t *) &lifo-
> >head,
> 	+					 (uint64_t *)&old_head,
> 	+					 (uint64_t *)&new_head))
> 	+			break;
> 	+	}
> 	+
> 	+	rte_atomic64_add(&lifo->len, num);
> 
> 
> I'd like to understand why it is not a problem that change of the list and increase
> its length are not atomic. So, we can get wrong length of the stack in the
> middle. It would be good to explain it in the comment.
> 

Indeed, there is a window in which the list appears shorter than it is. I don't believe this a problem becaues the get_count callback is inherently racy/approximate (if it is called while the list being accessed). That is, even if the list and its size were updated atomically, the size could change between when get_count reads the size and when that value is returned to the calling thread.

I placed the lifo->len updates such that the list may appear to have fewer elements than it does, but will never appear to have more elements. If the mempool is near-empty to the point that this is a concern, I think the bigger problem is the mempool size.

If that seems reasonable, I'll add that as a comment in the code.

> 
> 
> 	+}
> 	+
> 	+static __rte_always_inline void
> 	+nb_lifo_push_single(struct nb_lifo *lifo, struct nb_lifo_elem *elem)
> 	+{
> 	+	nb_lifo_push(lifo, elem, elem, 1);
> 	+}
> 	+
> 	+static __rte_always_inline struct nb_lifo_elem *
> 	+nb_lifo_pop(struct nb_lifo *lifo,
> 	+	    unsigned int num,
> 	+	    void **obj_table,
> 	+	    struct nb_lifo_elem **last)
> 	+{
> 	+	struct nb_lifo_head old_head;
> 	+
> 	+	/* Reserve num elements, if available */
> 	+	while (1) {
> 	+		uint64_t len = rte_atomic64_read(&lifo->len);
> 	+
> 	+		/* Does the list contain enough elements? */
> 	+		if (len < num)
> 	+			return NULL;
> 	+
> 	+		if (rte_atomic64_cmpset((volatile uint64_t *)&lifo->len,
> 	+					len, len - num))
> 	+			break;
> 	+	}
> 	+
> 	+	/* Pop num elements */
> 	+	while (1) {
> 	+		struct nb_lifo_head new_head;
> 	+		struct nb_lifo_elem *tmp;
> 	+		unsigned int i;
> 	+
> 	+		old_head = lifo->head;
> 	+
> 	+		tmp = old_head.top;
> 	+
> 	+		/* Traverse the list to find the new head. A next pointer
> will
> 	+		 * either point to another element or NULL; if a thread
> 	+		 * encounters a pointer that has already been popped,
> the CAS
> 	+		 * will fail.
> 	+		 */
> 	+		for (i = 0; i < num && tmp != NULL; i++) {
> 	+			if (obj_table)
> 	+				obj_table[i] = tmp->data;
> 	+			if (last)
> 	+				*last = tmp;
> 
> 
> Isn't it better to do obj_table and last assignment later when we successfully
> reserved elements? If there is not retries, current solution is optimal, but I guess
> solution with the second traversal to fill in obj_table will show more stable
> performance results under high load when many retries are done.
> 

I suspect that the latency of the writes to obj_table and last would largely be hidden by the pointer chasing, since obj_table (aided by the next-line prefetcher) and last should be cached but chances are tmp->next won't be.

Admittedly this is just a theory, I haven't experimentally confirmed anything; if you prefer, I'll investigate this further.

> 
> 
> 	+			tmp = tmp->next;
> 	+		}
> 	+
> 	+		/* If NULL was encountered, the list was modified while
> 	+		 * traversing it. Retry.
> 	+		 */
> 	+		if (i != num)
> 	+			continue;
> 	+
> 	+		new_head.top = tmp;
> 	+		new_head.cnt = old_head.cnt + 1;
> 	+
> 	+		if (rte_atomic128_cmpset((volatile uint64_t *) &lifo-
> >head,
> 	+					 (uint64_t *)&old_head,
> 	+					 (uint64_t *)&new_head))
> 	+			break;
> 	+	}
> 	+
> 	+	return old_head.top;
> 	+}
> 	+
> 	+#endif /* _NB_LIFO_H_ */
> 	diff --git a/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
> b/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
> 	new file mode 100644
> 	index 000000000..1b30775f7
> 	--- /dev/null
> 	+++ b/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
> 	@@ -0,0 +1,125 @@
> 	+/* SPDX-License-Identifier: BSD-3-Clause
> 	+ * Copyright(c) 2019 Intel Corporation
> 	+ */
> 	+
> 	+#include <stdio.h>
> 	+#include <rte_mempool.h>
> 	+#include <rte_malloc.h>
> 	+
> 	+#include "nb_lifo.h"
> 	+
> 	+struct rte_mempool_nb_stack {
> 	+	uint64_t size;
> 	+	struct nb_lifo used_lifo; /**< LIFO containing mempool pointers
> */
> 	+	struct nb_lifo free_lifo; /**< LIFO containing unused LIFO
> elements */
> 	+};
> 	+
> 	+static int
> 	+nb_stack_alloc(struct rte_mempool *mp)
> 	+{
> 	+	struct rte_mempool_nb_stack *s;
> 	+	struct nb_lifo_elem *elems;
> 	+	unsigned int n = mp->size;
> 	+	unsigned int size, i;
> 	+
> 	+	size = sizeof(*s) + n * sizeof(struct nb_lifo_elem);
> 	+
> 	+	/* Allocate our local memory structure */
> 	+	s = rte_zmalloc_socket("mempool-nb_stack",
> 	+			       size,
> 	+			       RTE_CACHE_LINE_SIZE,
> 	+			       mp->socket_id);
> 	+	if (s == NULL) {
> 	+		RTE_LOG(ERR, MEMPOOL, "Cannot allocate
> nb_stack!\n");
> 	+		return -ENOMEM;
> 	+	}
> 	+
> 	+	s->size = n;
> 	+
> 	+	nb_lifo_init(&s->used_lifo);
> 	+	nb_lifo_init(&s->free_lifo);
> 	+
> 	+	elems = (struct nb_lifo_elem *) &s[1];
> 
> 
> Does checkpatch.sh generate warning here because of space after type cast?
> There are few similar cases in the patch.
> 

No, because that's a --strict option. Regardless, I'm starting to prefer that style -- I'll fix these instances in v2.

> 
> 
> 	+	for (i = 0; i < n; i++)
> 	+		nb_lifo_push_single(&s->free_lifo, &elems[i]);
> 	+
> 	+	mp->pool_data = s;
> 	+
> 	+	return 0;
> 	+}
> 	+
> 	+static int
> 	+nb_stack_enqueue(struct rte_mempool *mp, void * const *obj_table,
> 	+		 unsigned int n)
> 	+{
> 	+	struct rte_mempool_nb_stack *s = mp->pool_data;
> 	+	struct nb_lifo_elem *first, *last, *tmp;
> 	+	unsigned int i;
> 	+
> 	+	if (unlikely(n == 0))
> 	+		return 0;
> 	+
> 	+	/* Pop n free elements */
> 	+	first = nb_lifo_pop(&s->free_lifo, n, NULL, NULL);
> 	+	if (unlikely(!first))
> 
> 
> Just a nit, but as far as I know comparison with NULL is typically used in DPDK.
> (few cases in the patch)
> 

Looks like this is explicitly called out in the style guide: https://doc.dpdk.org/guides/contributing/coding_style.html#null-pointers

Will fix in v2.

> 
> 
> 	+		return -ENOBUFS;
> 	+
> 	+	/* Prepare the list elements */
> 	+	tmp = first;
> 	+	for (i = 0; i < n; i++) {
> 	+		tmp->data = obj_table[i];
> 	+		last = tmp;
> 	+		tmp = tmp->next;
> 	+	}
> 	+
> 	+	/* Enqueue them to the used list */
> 	+	nb_lifo_push(&s->used_lifo, first, last, n);
> 	+
> 	+	return 0;
> 	+}
> 	+
> 	+static int
> 	+nb_stack_dequeue(struct rte_mempool *mp, void **obj_table,
> 	+		 unsigned int n)
> 	+{
> 	+	struct rte_mempool_nb_stack *s = mp->pool_data;
> 	+	struct nb_lifo_elem *first, *last;
> 	+
> 	+	if (unlikely(n == 0))
> 	+		return 0;
> 	+
> 	+	/* Pop n used elements */
> 	+	first = nb_lifo_pop(&s->used_lifo, n, obj_table, &last);
> 	+	if (unlikely(!first))
> 	+		return -ENOENT;
> 	+
> 	+	/* Enqueue the list elements to the free list */
> 	+	nb_lifo_push(&s->free_lifo, first, last, n);
> 	+
> 	+	return 0;
> 	+}
> 	+
> 	+static unsigned
> 	+nb_stack_get_count(const struct rte_mempool *mp)
> 	+{
> 	+	struct rte_mempool_nb_stack *s = mp->pool_data;
> 	+
> 	+	return nb_lifo_len(&s->used_lifo);
> 	+}
> 	+
> 	+static void
> 	+nb_stack_free(struct rte_mempool *mp)
> 	+{
> 	+	rte_free((void *)(mp->pool_data));
> 
> 
> I think type cast is not required.
> 
> 

Will fix.

> 
> 
> 	+}
> 	+
> 	+static struct rte_mempool_ops ops_nb_stack = {
> 	+	.name = "nb_stack",
> 	+	.alloc = nb_stack_alloc,
> 	+	.free = nb_stack_free,
> 	+	.enqueue = nb_stack_enqueue,
> 	+	.dequeue = nb_stack_dequeue,
> 	+	.get_count = nb_stack_get_count
> 	+};
> 	+
> 	+MEMPOOL_REGISTER_OPS(ops_nb_stack);
> 	diff --git
> a/drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map
> b/drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map
> 	new file mode 100644
> 	index 000000000..fc8c95e91
> 	--- /dev/null
> 	+++ b/drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map
> 	@@ -0,0 +1,4 @@
> 	+DPDK_19.05 {
> 	+
> 	+	local: *;
> 	+};
> 	diff --git a/mk/rte.app.mk b/mk/rte.app.mk
> 	index 02e8b6f05..0b11d9417 100644
> 	--- a/mk/rte.app.mk
> 	+++ b/mk/rte.app.mk
> 	@@ -133,6 +133,7 @@ ifeq ($(CONFIG_RTE_BUILD_SHARED_LIB),n)
> 
> 	 _LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_BUCKET) += -
> lrte_mempool_bucket
> 	 _LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK)  += -
> lrte_mempool_stack
> 	+_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK)  += -
> lrte_mempool_nb_stack
> 
> 
> It is better to sort it alphabetically.
> 

Will fix.

Appreciate the detailed review!

Thanks,
Gage

Patch

diff --git a/MAINTAINERS b/MAINTAINERS
index 470f36b9c..5519d3323 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -416,6 +416,10 @@  M: Artem V. Andreev <artem.andreev@oktetlabs.ru>
 M: Andrew Rybchenko <arybchenko@solarflare.com>
 F: drivers/mempool/bucket/
 
+Non-blocking stack memory pool
+M: Gage Eads <gage.eads@intel.com>
+F: drivers/mempool/nb_stack/
+
 
 Bus Drivers
 -----------
diff --git a/config/common_base b/config/common_base
index 964a6956e..40ce47312 100644
--- a/config/common_base
+++ b/config/common_base
@@ -728,6 +728,7 @@  CONFIG_RTE_DRIVER_MEMPOOL_BUCKET=y
 CONFIG_RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB=64
 CONFIG_RTE_DRIVER_MEMPOOL_RING=y
 CONFIG_RTE_DRIVER_MEMPOOL_STACK=y
+CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK=y
 
 #
 # Compile PMD for octeontx fpa mempool device
diff --git a/drivers/mempool/Makefile b/drivers/mempool/Makefile
index 28c2e8360..aeae3ac12 100644
--- a/drivers/mempool/Makefile
+++ b/drivers/mempool/Makefile
@@ -13,5 +13,6 @@  endif
 DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_RING) += ring
 DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK) += stack
 DIRS-$(CONFIG_RTE_LIBRTE_OCTEONTX_MEMPOOL) += octeontx
+DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) += nb_stack
 
 include $(RTE_SDK)/mk/rte.subdir.mk
diff --git a/drivers/mempool/nb_stack/Makefile b/drivers/mempool/nb_stack/Makefile
new file mode 100644
index 000000000..38b45f4f5
--- /dev/null
+++ b/drivers/mempool/nb_stack/Makefile
@@ -0,0 +1,30 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2019 Intel Corporation
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# The non-blocking stack uses a 128-bit compare-and-swap instruction, and thus
+# is limited to x86_64.
+ifeq ($(CONFIG_RTE_ARCH_X86_64),y)
+
+#
+# library name
+#
+LIB = librte_mempool_nb_stack.a
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+
+# Headers
+CFLAGS += -I$(RTE_SDK)/lib/librte_mempool
+LDLIBS += -lrte_eal -lrte_mempool
+
+EXPORT_MAP := rte_mempool_nb_stack_version.map
+
+LIBABIVER := 1
+
+SRCS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) += rte_mempool_nb_stack.c
+
+endif
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/drivers/mempool/nb_stack/meson.build b/drivers/mempool/nb_stack/meson.build
new file mode 100644
index 000000000..66d64a9ba
--- /dev/null
+++ b/drivers/mempool/nb_stack/meson.build
@@ -0,0 +1,4 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2019 Intel Corporation
+
+sources = files('rte_mempool_nb_stack.c')
diff --git a/drivers/mempool/nb_stack/nb_lifo.h b/drivers/mempool/nb_stack/nb_lifo.h
new file mode 100644
index 000000000..701d75e37
--- /dev/null
+++ b/drivers/mempool/nb_stack/nb_lifo.h
@@ -0,0 +1,132 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+
+#ifndef _NB_LIFO_H_
+#define _NB_LIFO_H_
+
+struct nb_lifo_elem {
+	void *data;
+	struct nb_lifo_elem *next;
+};
+
+struct nb_lifo_head {
+	struct nb_lifo_elem *top; /**< Stack top */
+	uint64_t cnt; /**< Modification counter */
+};
+
+struct nb_lifo {
+	volatile struct nb_lifo_head head __rte_aligned(16);
+	rte_atomic64_t len;
+} __rte_cache_aligned;
+
+static __rte_always_inline void
+nb_lifo_init(struct nb_lifo *lifo)
+{
+	memset(lifo, 0, sizeof(*lifo));
+	rte_atomic64_set(&lifo->len, 0);
+}
+
+static __rte_always_inline unsigned int
+nb_lifo_len(struct nb_lifo *lifo)
+{
+	return (unsigned int) rte_atomic64_read(&lifo->len);
+}
+
+static __rte_always_inline void
+nb_lifo_push(struct nb_lifo *lifo,
+	     struct nb_lifo_elem *first,
+	     struct nb_lifo_elem *last,
+	     unsigned int num)
+{
+	while (1) {
+		struct nb_lifo_head old_head, new_head;
+
+		old_head = lifo->head;
+
+		/* Swing the top pointer to the first element in the list and
+		 * make the last element point to the old top.
+		 */
+		new_head.top = first;
+		new_head.cnt = old_head.cnt + 1;
+
+		last->next = old_head.top;
+
+		if (rte_atomic128_cmpset((volatile uint64_t *) &lifo->head,
+					 (uint64_t *)&old_head,
+					 (uint64_t *)&new_head))
+			break;
+	}
+
+	rte_atomic64_add(&lifo->len, num);
+}
+
+static __rte_always_inline void
+nb_lifo_push_single(struct nb_lifo *lifo, struct nb_lifo_elem *elem)
+{
+	nb_lifo_push(lifo, elem, elem, 1);
+}
+
+static __rte_always_inline struct nb_lifo_elem *
+nb_lifo_pop(struct nb_lifo *lifo,
+	    unsigned int num,
+	    void **obj_table,
+	    struct nb_lifo_elem **last)
+{
+	struct nb_lifo_head old_head;
+
+	/* Reserve num elements, if available */
+	while (1) {
+		uint64_t len = rte_atomic64_read(&lifo->len);
+
+		/* Does the list contain enough elements? */
+		if (len < num)
+			return NULL;
+
+		if (rte_atomic64_cmpset((volatile uint64_t *)&lifo->len,
+					len, len - num))
+			break;
+	}
+
+	/* Pop num elements */
+	while (1) {
+		struct nb_lifo_head new_head;
+		struct nb_lifo_elem *tmp;
+		unsigned int i;
+
+		old_head = lifo->head;
+
+		tmp = old_head.top;
+
+		/* Traverse the list to find the new head. A next pointer will
+		 * either point to another element or NULL; if a thread
+		 * encounters a pointer that has already been popped, the CAS
+		 * will fail.
+		 */
+		for (i = 0; i < num && tmp != NULL; i++) {
+			if (obj_table)
+				obj_table[i] = tmp->data;
+			if (last)
+				*last = tmp;
+			tmp = tmp->next;
+		}
+
+		/* If NULL was encountered, the list was modified while
+		 * traversing it. Retry.
+		 */
+		if (i != num)
+			continue;
+
+		new_head.top = tmp;
+		new_head.cnt = old_head.cnt + 1;
+
+		if (rte_atomic128_cmpset((volatile uint64_t *) &lifo->head,
+					 (uint64_t *)&old_head,
+					 (uint64_t *)&new_head))
+			break;
+	}
+
+	return old_head.top;
+}
+
+#endif /* _NB_LIFO_H_ */
diff --git a/drivers/mempool/nb_stack/rte_mempool_nb_stack.c b/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
new file mode 100644
index 000000000..1b30775f7
--- /dev/null
+++ b/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
@@ -0,0 +1,125 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+
+#include <stdio.h>
+#include <rte_mempool.h>
+#include <rte_malloc.h>
+
+#include "nb_lifo.h"
+
+struct rte_mempool_nb_stack {
+	uint64_t size;
+	struct nb_lifo used_lifo; /**< LIFO containing mempool pointers  */
+	struct nb_lifo free_lifo; /**< LIFO containing unused LIFO elements */
+};
+
+static int
+nb_stack_alloc(struct rte_mempool *mp)
+{
+	struct rte_mempool_nb_stack *s;
+	struct nb_lifo_elem *elems;
+	unsigned int n = mp->size;
+	unsigned int size, i;
+
+	size = sizeof(*s) + n * sizeof(struct nb_lifo_elem);
+
+	/* Allocate our local memory structure */
+	s = rte_zmalloc_socket("mempool-nb_stack",
+			       size,
+			       RTE_CACHE_LINE_SIZE,
+			       mp->socket_id);
+	if (s == NULL) {
+		RTE_LOG(ERR, MEMPOOL, "Cannot allocate nb_stack!\n");
+		return -ENOMEM;
+	}
+
+	s->size = n;
+
+	nb_lifo_init(&s->used_lifo);
+	nb_lifo_init(&s->free_lifo);
+
+	elems = (struct nb_lifo_elem *) &s[1];
+	for (i = 0; i < n; i++)
+		nb_lifo_push_single(&s->free_lifo, &elems[i]);
+
+	mp->pool_data = s;
+
+	return 0;
+}
+
+static int
+nb_stack_enqueue(struct rte_mempool *mp, void * const *obj_table,
+		 unsigned int n)
+{
+	struct rte_mempool_nb_stack *s = mp->pool_data;
+	struct nb_lifo_elem *first, *last, *tmp;
+	unsigned int i;
+
+	if (unlikely(n == 0))
+		return 0;
+
+	/* Pop n free elements */
+	first = nb_lifo_pop(&s->free_lifo, n, NULL, NULL);
+	if (unlikely(!first))
+		return -ENOBUFS;
+
+	/* Prepare the list elements */
+	tmp = first;
+	for (i = 0; i < n; i++) {
+		tmp->data = obj_table[i];
+		last = tmp;
+		tmp = tmp->next;
+	}
+
+	/* Enqueue them to the used list */
+	nb_lifo_push(&s->used_lifo, first, last, n);
+
+	return 0;
+}
+
+static int
+nb_stack_dequeue(struct rte_mempool *mp, void **obj_table,
+		 unsigned int n)
+{
+	struct rte_mempool_nb_stack *s = mp->pool_data;
+	struct nb_lifo_elem *first, *last;
+
+	if (unlikely(n == 0))
+		return 0;
+
+	/* Pop n used elements */
+	first = nb_lifo_pop(&s->used_lifo, n, obj_table, &last);
+	if (unlikely(!first))
+		return -ENOENT;
+
+	/* Enqueue the list elements to the free list */
+	nb_lifo_push(&s->free_lifo, first, last, n);
+
+	return 0;
+}
+
+static unsigned
+nb_stack_get_count(const struct rte_mempool *mp)
+{
+	struct rte_mempool_nb_stack *s = mp->pool_data;
+
+	return nb_lifo_len(&s->used_lifo);
+}
+
+static void
+nb_stack_free(struct rte_mempool *mp)
+{
+	rte_free((void *)(mp->pool_data));
+}
+
+static struct rte_mempool_ops ops_nb_stack = {
+	.name = "nb_stack",
+	.alloc = nb_stack_alloc,
+	.free = nb_stack_free,
+	.enqueue = nb_stack_enqueue,
+	.dequeue = nb_stack_dequeue,
+	.get_count = nb_stack_get_count
+};
+
+MEMPOOL_REGISTER_OPS(ops_nb_stack);
diff --git a/drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map b/drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map
new file mode 100644
index 000000000..fc8c95e91
--- /dev/null
+++ b/drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map
@@ -0,0 +1,4 @@ 
+DPDK_19.05 {
+
+	local: *;
+};
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index 02e8b6f05..0b11d9417 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -133,6 +133,7 @@  ifeq ($(CONFIG_RTE_BUILD_SHARED_LIB),n)
 
 _LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_BUCKET) += -lrte_mempool_bucket
 _LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK)  += -lrte_mempool_stack
+_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK)  += -lrte_mempool_nb_stack
 ifeq ($(CONFIG_RTE_LIBRTE_DPAA_BUS),y)
 _LDLIBS-$(CONFIG_RTE_LIBRTE_DPAA_MEMPOOL)   += -lrte_mempool_dpaa
 endif