[v2,1/2] deque: add multi-thread unsafe double ended queue

Message ID 20240424134233.1336370-2-aditya.ambadipudi@arm.com (mailing list archive)
State Superseded
Delegated to: Thomas Monjalon
Headers
Series deque: add multithread unsafe deque library |

Checks

Context Check Description
ci/checkpatch warning coding style issues

Commit Message

Aditya Ambadipudi April 24, 2024, 1:42 p.m. UTC
  From: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

Add a multi-thread unsafe double ended queue data structure. This
library provides a simple and efficient alternative to multi-thread
safe ring when multi-thread safety is not required.

Signed-off-by: Aditya Ambadipudi <aditya.ambadipudi@arm.com>
Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Change-Id: I6f66fa2ebf750adb22ac75f8cb3c2fe8bdb5fa9e
---
v2:
  * Addressed the spell check warning issue with the word "Deque"
  * Tried to rename all objects that are named deque to avoid collision with
    std::deque
  * Added the deque library to msvc section in meson.build
  * Renamed api functions to explicitly state if the function inserts at head org
    tail.

 .mailmap                   |   1 +
 devtools/build-dict.sh     |   1 +
 lib/deque/meson.build      |  11 +
 lib/deque/rte_deque.c      | 193 +++++++++++++
 lib/deque/rte_deque.h      | 533 ++++++++++++++++++++++++++++++++++++
 lib/deque/rte_deque_core.h |  81 ++++++
 lib/deque/rte_deque_pvt.h  | 538 +++++++++++++++++++++++++++++++++++++
 lib/deque/rte_deque_zc.h   | 430 +++++++++++++++++++++++++++++
 lib/deque/version.map      |  14 +
 lib/meson.build            |   2 +
 10 files changed, 1804 insertions(+)
 create mode 100644 lib/deque/meson.build
 create mode 100644 lib/deque/rte_deque.c
 create mode 100644 lib/deque/rte_deque.h
 create mode 100644 lib/deque/rte_deque_core.h
 create mode 100644 lib/deque/rte_deque_pvt.h
 create mode 100644 lib/deque/rte_deque_zc.h
 create mode 100644 lib/deque/version.map
  

Comments

Morten Brørup April 24, 2024, 3:16 p.m. UTC | #1
[...]

> +
> +/* mask of all valid flag values to deque_create() */
> +#define __RTE_DEQUE_F_MASK (RTE_DEQUE_F_EXACT_SZ)
> +ssize_t
> +rte_deque_get_memsize_elem(unsigned int esize, unsigned int count)
> +{
> +	ssize_t sz;
> +
> +	/* Check if element size is a multiple of 4B */
> +	if (esize % 4 != 0) {
> +		rte_log(RTE_LOG_ERR, rte_deque_log_type,
> +			"%s(): element size is not a multiple of 4\n",
> +			__func__);

Double indent when continuing on the next line:

+		rte_log(RTE_LOG_ERR, rte_deque_log_type,
+				"%s(): element size is not a multiple of 4\n",
+				__func__);

Not just here, but multiple locations in the code.

> +
> +		return -EINVAL;
> +	}
> +
> +	/* count must be a power of 2 */
> +	if ((!RTE_IS_POWER_OF_2(count)) || (count > RTE_DEQUE_SZ_MASK)) {
> +		rte_log(RTE_LOG_ERR, rte_deque_log_type,
> +			"%s(): Requested number of elements is invalid,"
> +			"must be power of 2, and not exceed %u\n",
> +			__func__, RTE_DEQUE_SZ_MASK);

Please use shorter error messages, so they can fit on one line in the source code.

Note: DPDK coding style allows 100 chars source code line length, not just 80.

[...]

> +/* create the deque for a given element size */
> +struct rte_deque *
> +rte_deque_create(const char *name, unsigned int esize, unsigned int count,
> +		int socket_id, unsigned int flags)
> +{
> +	char mz_name[RTE_MEMZONE_NAMESIZE];
> +	struct rte_deque *d;
> +	const struct rte_memzone *mz;
> +	ssize_t deque_size;
> +	int mz_flags = 0;
> +	const unsigned int requested_count = count;
> +	int ret;
> +
> +	/* for an exact size deque, round up from count to a power of two */
> +	if (flags & RTE_DEQUE_F_EXACT_SZ)
> +		count = rte_align32pow2(count + 1);
> +
> +	deque_size = rte_deque_get_memsize_elem(esize, count);
> +	if (deque_size < 0) {
> +		rte_errno = -deque_size;
> +		return NULL;
> +	}
> +
> +	ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
> +		RTE_DEQUE_MZ_PREFIX, name);
> +	if (ret < 0 || ret >= (int)sizeof(mz_name)) {
> +		rte_errno = ENAMETOOLONG;
> +		return NULL;
> +	}
> +
> +	/* reserve a memory zone for this deque. If we can't get rte_config or
> +	 * we are secondary process, the memzone_reserve function will set
> +	 * rte_errno for us appropriately - hence no check in this function
> +	 */
> +	mz = rte_memzone_reserve_aligned(mz_name, deque_size, socket_id,
> +					 mz_flags, alignof(struct rte_deque));
> +	if (mz != NULL) {
> +		d = mz->addr;
> +		/* no need to check return value here, we already checked the
> +		 * arguments above
> +		 */
> +		rte_deque_init(d, name, requested_count, flags);

rte_deque_init() error handling is missing here.

> +		d->memzone = mz;
> +	} else {
> +		d = NULL;
> +		rte_log(RTE_LOG_ERR, rte_deque_log_type,
> +			"%s(): Cannot reserve memory\n", __func__);
> +	}
> +	return d;
> +}

[...]

> +#define RTE_DEQUE_MZ_PREFIX "DEQUE_"
> +/** The maximum length of a deque name. */
> +#define RTE_DEQUE_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
> +			   sizeof(RTE_DEQUE_MZ_PREFIX) + 1)
> +
> +/**
> + * Double ended queue (deque) structure.
> + *
> + * The producer and the consumer have a head and a tail index. These indices
> + * are not between 0 and size(deque)-1. These indices are between 0 and
> + * 2^32 -1. Their value is masked while accessing the objects in deque.
> + * These indices are unsigned 32bits. Hence the result of the subtraction is
> + * always a modulo of 2^32 and it is between 0 and capacity.
> + */
> +struct rte_deque {
> +	alignas(RTE_CACHE_LINE_SIZE) char name[RTE_DEQUE_NAMESIZE];

Suggest alternative:
+struct __rte_cache_aligned rte_deque {
+	char name[RTE_DEQUE_NAMESIZE];

> +	/**< Name of the deque */
> +	int flags;
> +	/**< Flags supplied at creation. */
> +	const struct rte_memzone *memzone;
> +	/**< Memzone, if any, containing the rte_deque */
> +
> +	alignas(RTE_CACHE_LINE_SIZE) char pad0; /**< empty cache line */

Why the cache alignment here?

If required, omit the pad0 field and cache align the size field instead.

Alternatively, use RTE_CACHE_GUARD, if that is what you are trying to achieve.

> +
> +	uint32_t size;           /**< Size of deque. */
> +	uint32_t mask;           /**< Mask (size-1) of deque. */
> +	uint32_t capacity;       /**< Usable size of deque */
> +	/** Ring head and tail pointers. */
> +	volatile uint32_t head;
> +	volatile uint32_t tail;
> +};

[...]

> +static __rte_always_inline void
> +__rte_deque_enqueue_elems_head_128(struct rte_deque *d,
> +				const void *obj_table,
> +				unsigned int n)
> +{
> +	unsigned int i;
> +	const uint32_t size = d->size;
> +	uint32_t idx = (d->head & d->mask);
> +	rte_int128_t *deque = (rte_int128_t *)&d[1];
> +	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
> +	if (likely(idx + n <= size)) {
> +		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
> +			memcpy((void *)(deque + idx),
> +				(const void *)(obj + i), 32);

With 100 chars source code line length, this memcpy() fits on one line.
Not just here, but in all the functions.

> +		switch (n & 0x1) {
> +		case 1:
> +			memcpy((void *)(deque + idx),
> +				(const void *)(obj + i), 16);
> +		}
> +	} else {
> +		for (i = 0; idx < size; i++, idx++)
> +			memcpy((void *)(deque + idx),
> +				(const void *)(obj + i), 16);
> +		/* Start at the beginning */
> +		for (idx = 0; i < n; i++, idx++)
> +			memcpy((void *)(deque + idx),
> +				(const void *)(obj + i), 16);
> +	}
> +}
  
Patrick Robb April 24, 2024, 5:21 p.m. UTC | #2
Hi Ali,

Wathsala reached out asking how the checkpatch CI check can be updated so
that this series passes checkpatch.

If building the dictionary is a 1 time operation for you, you may have to
apply this patch and re-run devtools/build-dict.sh so that the new
dictionary is in place for a V3 of this series.

It looks like these dictionary exceptions are submitted quite rarely. But,
if it becomes more common in the future you could look at adding a step to
your automation which produces a new dictionary every time you run
checkpatch, based on any additions to the exception list which came with
the patch. But it's probably not worth the effort with the low volume of
word exception additions.

Thanks.
  
Mattias Rönnblom April 24, 2024, 11:28 p.m. UTC | #3
On 2024-04-24 15:42, Aditya Ambadipudi wrote:
> From: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> 
> Add a multi-thread unsafe double ended queue data structure. This
> library provides a simple and efficient alternative to multi-thread
> safe ring when multi-thread safety is not required.
> 
> Signed-off-by: Aditya Ambadipudi <aditya.ambadipudi@arm.com>
> Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
> Change-Id: I6f66fa2ebf750adb22ac75f8cb3c2fe8bdb5fa9e
> ---
> v2:
>    * Addressed the spell check warning issue with the word "Deque"
>    * Tried to rename all objects that are named deque to avoid collision with
>      std::deque
>    * Added the deque library to msvc section in meson.build
>    * Renamed api functions to explicitly state if the function inserts at head org
>      tail.
> 
>   .mailmap                   |   1 +
>   devtools/build-dict.sh     |   1 +
>   lib/deque/meson.build      |  11 +
>   lib/deque/rte_deque.c      | 193 +++++++++++++
>   lib/deque/rte_deque.h      | 533 ++++++++++++++++++++++++++++++++++++
>   lib/deque/rte_deque_core.h |  81 ++++++
>   lib/deque/rte_deque_pvt.h  | 538 +++++++++++++++++++++++++++++++++++++
>   lib/deque/rte_deque_zc.h   | 430 +++++++++++++++++++++++++++++
>   lib/deque/version.map      |  14 +
>   lib/meson.build            |   2 +
>   10 files changed, 1804 insertions(+)
>   create mode 100644 lib/deque/meson.build
>   create mode 100644 lib/deque/rte_deque.c
>   create mode 100644 lib/deque/rte_deque.h
>   create mode 100644 lib/deque/rte_deque_core.h
>   create mode 100644 lib/deque/rte_deque_pvt.h
>   create mode 100644 lib/deque/rte_deque_zc.h
>   create mode 100644 lib/deque/version.map
> 
> diff --git a/.mailmap b/.mailmap
> index 3843868716..8e705ab6ab 100644
> --- a/.mailmap
> +++ b/.mailmap
> @@ -17,6 +17,7 @@ Adam Bynes <adambynes@outlook.com>
>   Adam Dybkowski <adamx.dybkowski@intel.com>
>   Adam Ludkiewicz <adam.ludkiewicz@intel.com>
>   Adham Masarwah <adham@nvidia.com> <adham@mellanox.com>
> +Aditya Ambadipudi <aditya.ambadipudi@arm.com>
>   Adrian Moreno <amorenoz@redhat.com>
>   Adrian Podlawski <adrian.podlawski@intel.com>
>   Adrien Mazarguil <adrien.mazarguil@6wind.com>
> diff --git a/devtools/build-dict.sh b/devtools/build-dict.sh
> index a8cac49029..595d8f9277 100755
> --- a/devtools/build-dict.sh
> +++ b/devtools/build-dict.sh
> @@ -17,6 +17,7 @@ sed '/^..->/d' |
>   sed '/^uint->/d' |
>   sed "/^doesn'->/d" |
>   sed '/^wasn->/d' |
> +sed '/^deque.*->/d' |
>   
>   # print to stdout
>   cat
> diff --git a/lib/deque/meson.build b/lib/deque/meson.build
> new file mode 100644
> index 0000000000..1ff45fc39f
> --- /dev/null
> +++ b/lib/deque/meson.build
> @@ -0,0 +1,11 @@
> +# SPDX-License-Identifier: BSD-3-Clause
> +# Copyright(c) 2024 Arm Limited
> +
> +sources = files('rte_deque.c')
> +headers = files('rte_deque.h')
> +# most sub-headers are not for direct inclusion
> +indirect_headers += files (
> +        'rte_deque_core.h',
> +        'rte_deque_pvt.h',
> +        'rte_deque_zc.h'
> +)
> diff --git a/lib/deque/rte_deque.c b/lib/deque/rte_deque.c
> new file mode 100644
> index 0000000000..b83a6c43c4
> --- /dev/null
> +++ b/lib/deque/rte_deque.c
> @@ -0,0 +1,193 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2024 Arm Limited
> + */
> +
> +#include <stdalign.h>
> +#include <string.h>
> +#include <stdint.h>
> +#include <inttypes.h>
> +#include <errno.h>
> +#include <sys/queue.h>
> +
> +#include <rte_common.h>
> +#include <rte_log.h>
> +#include <rte_memzone.h>
> +#include <rte_malloc.h>
> +#include <rte_eal_memconfig.h>
> +#include <rte_errno.h>
> +#include <rte_string_fns.h>
> +
> +#include "rte_deque.h"
> +
> +/* mask of all valid flag values to deque_create() */
> +#define __RTE_DEQUE_F_MASK (RTE_DEQUE_F_EXACT_SZ)
> +ssize_t
> +rte_deque_get_memsize_elem(unsigned int esize, unsigned int count)
> +{
> +	ssize_t sz;
> +
> +	/* Check if element size is a multiple of 4B */
> +	if (esize % 4 != 0) {
> +		rte_log(RTE_LOG_ERR, rte_deque_log_type,
> +			"%s(): element size is not a multiple of 4\n",
> +			__func__);
> +
> +		return -EINVAL;
> +	}
> +

Use RTE_ASSERT()/VERIFY() instead of returning an error code for API 
contract violations. The application can't do anything useful with those 
anyway. (If you think otherwise, please give an example of an app 
recovering from one of these -EINVAL).

> +	/* count must be a power of 2 */
> +	if ((!RTE_IS_POWER_OF_2(count)) || (count > RTE_DEQUE_SZ_MASK)) {
> +		rte_log(RTE_LOG_ERR, rte_deque_log_type,
> +			"%s(): Requested number of elements is invalid,"
> +			"must be power of 2, and not exceed %u\n",
> +			__func__, RTE_DEQUE_SZ_MASK);
> +
> +		return -EINVAL;
> +	}
> +
> +	sz = sizeof(struct rte_deque) + (ssize_t)count * esize;
> +	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);

Why is the size cache-line aligned?

> +	return sz;
> +}
> +
> +void
> +rte_deque_reset(struct rte_deque *d)
> +{
> +	d->head = 0;
> +	d->tail = 0;
> +}
> +
> +int
> +rte_deque_init(struct rte_deque *d, const char *name, unsigned int count,
> +	unsigned int flags)
> +{
> +	int ret;
> +
> +	/* compilation-time checks */
> +	RTE_BUILD_BUG_ON((sizeof(struct rte_deque) &
> +			  RTE_CACHE_LINE_MASK) != 0);
> +
> +	/* future proof flags, only allow supported values */
> +	if (flags & ~__RTE_DEQUE_F_MASK) {

More RTE_VERIFY().

> +		rte_log(RTE_LOG_ERR, rte_deque_log_type,
> +			"%s(): Unsupported flags requested %#x\n",
> +			__func__, flags);
> +		return -EINVAL;
> +	}
> +
> +	/* init the deque structure */
> +	memset(d, 0, sizeof(*d));
> +	ret = strlcpy(d->name, name, sizeof(d->name));
> +	if (ret < 0 || ret >= (int)sizeof(d->name))
> +		return -ENAMETOOLONG;

Is the max name length known? In that case, RTE_ASSERT().

> +	d->flags = flags;
> +
> +	if (flags & RTE_DEQUE_F_EXACT_SZ) {
> +		d->size = rte_align32pow2(count + 1);
> +		d->mask = d->size - 1;
> +		d->capacity = count;
> +	} else {
> +		if ((!RTE_IS_POWER_OF_2(count)) || (count > RTE_DEQUE_SZ_MASK)) {
> +			rte_log(RTE_LOG_ERR, rte_deque_log_type,
> +				"%s(): Requested size is invalid, must be power"
> +				" of 2, and not exceed the size limit %u\n",
> +				__func__, RTE_DEQUE_SZ_MASK);
> +			return -EINVAL;
> +		}
> +		d->size = count;
> +		d->mask = count - 1;
> +		d->capacity = d->mask;
> +	}
> +
> +	return 0;
> +}
> +
> +/* create the deque for a given element size */
> +struct rte_deque *
> +rte_deque_create(const char *name, unsigned int esize, unsigned int count,
> +		int socket_id, unsigned int flags)
> +{
> +	char mz_name[RTE_MEMZONE_NAMESIZE];
> +	struct rte_deque *d;
> +	const struct rte_memzone *mz;
> +	ssize_t deque_size;
> +	int mz_flags = 0;
> +	const unsigned int requested_count = count;
> +	int ret;
> +
> +	/* for an exact size deque, round up from count to a power of two */
> +	if (flags & RTE_DEQUE_F_EXACT_SZ)
> +		count = rte_align32pow2(count + 1);
> +
> +	deque_size = rte_deque_get_memsize_elem(esize, count);
> +	if (deque_size < 0) {
> +		rte_errno = -deque_size;
> +		return NULL;
> +	}
> +
> +	ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
> +		RTE_DEQUE_MZ_PREFIX, name);
> +	if (ret < 0 || ret >= (int)sizeof(mz_name)) {
> +		rte_errno = ENAMETOOLONG;
> +		return NULL;
> +	}
> +
> +	/* reserve a memory zone for this deque. If we can't get rte_config or
> +	 * we are secondary process, the memzone_reserve function will set
> +	 * rte_errno for us appropriately - hence no check in this function
> +	 */

Why not use rte_malloc()?

> +	mz = rte_memzone_reserve_aligned(mz_name, deque_size, socket_id,
> +					 mz_flags, alignof(struct rte_deque));
> +	if (mz != NULL) {
> +		d = mz->addr;
> +		/* no need to check return value here, we already checked the
> +		 * arguments above
> +		 */
> +		rte_deque_init(d, name, requested_count, flags);
> +		d->memzone = mz;
> +	} else {
> +		d = NULL;
> +		rte_log(RTE_LOG_ERR, rte_deque_log_type,
> +			"%s(): Cannot reserve memory\n", __func__);
> +	}
> +	return d;
> +}
> +
> +/* free the deque */
> +void
> +rte_deque_free(struct rte_deque *d)
> +{
> +	if (d == NULL)
> +		return;
> +
> +	/*
> +	 * Deque was not created with rte_deque_create,
> +	 * therefore, there is no memzone to free.
> +	 */

In case it wasn't created, it should not be free'd, I would argue. Add a 
separate function (deinit?) to reverse init.

> +	if (d->memzone == NULL) {
> +		rte_log(RTE_LOG_ERR, rte_deque_log_type,
> +			"%s(): Cannot free deque, not created "
> +			"with rte_deque_create()\n", __func__);
> +		return;
> +	}
> +
> +	if (rte_memzone_free(d->memzone) != 0)
> +		rte_log(RTE_LOG_ERR, rte_deque_log_type,
> +			"%s(): Cannot free memory\n", __func__);
> +}
> +
> +/* dump the status of the deque on the console */
> +void
> +rte_deque_dump(FILE *f, const struct rte_deque *d)
> +{
> +	fprintf(f, "deque <%s>@%p\n", d->name, d);
> +	fprintf(f, "  flags=%x\n", d->flags);
> +	fprintf(f, "  size=%"PRIu32"\n", d->size);
> +	fprintf(f, "  capacity=%"PRIu32"\n", d->capacity);
> +	fprintf(f, "  head=%"PRIu32"\n", d->head);
> +	fprintf(f, "  tail=%"PRIu32"\n", d->tail);
> +	fprintf(f, "  used=%u\n", rte_deque_count(d));
> +	fprintf(f, "  avail=%u\n", rte_deque_free_count(d));
> +}
> +
> +RTE_LOG_REGISTER_DEFAULT(rte_deque_log_type, ERR);
> diff --git a/lib/deque/rte_deque.h b/lib/deque/rte_deque.h
> new file mode 100644
> index 0000000000..6633eab377
> --- /dev/null
> +++ b/lib/deque/rte_deque.h
> @@ -0,0 +1,533 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2024 Arm Limited
> + */
> +
> +#ifndef _RTE_DEQUE_H_
> +#define _RTE_DEQUE_H_
> +
> +/**
> + * @file
> + * RTE double ended queue (Deque)
> + *
> + * This fixed-size queue does not provide concurrent access by
> + * multiple threads. If required, the application should use locks
> + * to protect the deque from concurrent access.
> + *
> + * - Double ended queue
> + * - Maximum size is fixed
> + * - Store objects of any size
> + * - Single/bulk/burst dequeue at tail or head
> + * - Single/bulk/burst enqueue at head or tail
> + *
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <rte_deque_core.h>
> +#include <rte_deque_pvt.h>
> +#include <rte_deque_zc.h>
> +
> +/**
> + * Calculate the memory size needed for a deque
> + *
> + * This function returns the number of bytes needed for a deque, given
> + * the number of objects and the object size. This value is the sum of
> + * the size of the structure rte_deque and the size of the memory needed
> + * by the objects. The value is aligned to a cache line size.
> + *
> + * @param esize
> + *   The size of deque object, in bytes. It must be a multiple of 4.
> + * @param count
> + *   The number of objects in the deque (must be a power of 2).
> + * @return
> + *   - The memory size needed for the deque on success.
> + *   - -EINVAL if count is not a power of 2.
> + */
> +__rte_experimental
> +ssize_t rte_deque_get_memsize_elem(unsigned int esize, unsigned int count);
> +
> +/**
> + * Initialize a deque structure.
> + *
> + * Initialize a deque structure in memory pointed by "d". The size of the
> + * memory area must be large enough to store the deque structure and the
> + * object table. It is advised to use rte_deque_get_memsize() to get the
> + * appropriate size.
> + *
> + * The deque size is set to *count*, which must be a power of two.
> + * The real usable deque size is *count-1* instead of *count* to
> + * differentiate a full deque from an empty deque.
> + *
> + * @param d
> + *   The pointer to the deque structure followed by the objects table.
> + * @param name
> + *   The name of the deque.
> + * @param count
> + *   The number of objects in the deque (must be a power of 2,
> + *   unless RTE_DEQUE_F_EXACT_SZ is set in flags).

What would be the performance implications of always having exact sizes, 
and exact-length allocations?

You can't have a mask, but do you need to?

> + * @param flags
> + *   - RTE_DEQUE_F_EXACT_SZ: If this flag is set, the deque will hold
> + *     exactly the requested number of objects, and the requested size
> + *     will be rounded up to the next power of two, but the usable space
> + *     will be exactly that requested. Worst case, if a power-of-2 size is
> + *     requested, half the deque space will be wasted.
> + *     Without this flag set, the deque size requested must be a power of 2,
> + *     and the usable space will be that size - 1.
> + * @return
> + *   0 on success, or a negative value on error.
> + */
> +__rte_experimental
> +int rte_deque_init(struct rte_deque *d, const char *name, unsigned int count,
> +		unsigned int flags);
> +
> +/**
> + * Create a new deque named *name* in memory.
> + *

Why does deques have names, when linked lists don't?

> + * This function uses ``memzone_reserve()`` to allocate memory. Then it
> + * calls rte_deque_init() to initialize an empty deque.
> + *
> + * The new deque size is set to *count*, which must be a power of two.
> + * The real usable deque size is *count-1* instead of *count* to
> + * differentiate a full deque from an empty deque.
> + *
> + * @param name
> + *   The name of the deque.
> + * @param esize
> + *   The size of deque object, in bytes. It must be a multiple of 4.
> + * @param count
> + *   The size of the deque (must be a power of 2,
> + *   unless RTE_DEQUE_F_EXACT_SZ is set in flags).
> + * @param socket_id
> + *   The *socket_id* argument is the socket identifier in case of
> + *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
> + *   constraint for the reserved zone.
> + * @param flags
> + *   - RTE_DEQUE_F_EXACT_SZ: If this flag is set, the deque will hold exactly the
> + *     requested number of entries, and the requested size will be rounded up
> + *     to the next power of two, but the usable space will be exactly that
> + *     requested. Worst case, if a power-of-2 size is requested, half the
> + *     deque space will be wasted.
> + *     Without this flag set, the deque size requested must be a power of 2,
> + *     and the usable space will be that size - 1.
> + * @return
> + *   On success, the pointer to the new allocated deque. NULL on error with
> + *    rte_errno set appropriately. Possible errno values include:
> + *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
> + *    - EINVAL - count provided is not a power of 2
> + *    - ENOSPC - the maximum number of memzones has already been allocated
> + *    - EEXIST - a memzone with the same name already exists
> + *    - ENOMEM - no appropriate memory area found in which to create memzone
> + */
> +__rte_experimental
> +struct rte_deque *rte_deque_create(const char *name, unsigned int esize,
> +				unsigned int count, int socket_id,
> +				unsigned int flags);
> +
> +/**
> + * De-allocate all memory used by the deque.
> + *
> + * @param d
> + *   Deque to free.
> + *   If NULL then, the function does nothing.
> + */
> +__rte_experimental
> +void rte_deque_free(struct rte_deque *d);
> +
> +/**
> + * Dump the status of the deque to a file.
> + *
> + * @param f
> + *   A pointer to a file for output
> + * @param d
> + *   A pointer to the deque structure.
> + */
> +__rte_experimental
> +void rte_deque_dump(FILE *f, const struct rte_deque *d);
> +
> +/**
> + * Return the number of entries in a deque.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @return
> + *   The number of entries in the deque.
> + */
> +static inline unsigned int
> +rte_deque_count(const struct rte_deque *d)
> +{
> +	return (d->head - d->tail) & d->mask;
> +}
> +
> +/**
> + * Return the number of free entries in a deque.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @return
> + *   The number of free entries in the deque.
> + */
> +static inline unsigned int
> +rte_deque_free_count(const struct rte_deque *d)
> +{
> +	return d->capacity - rte_deque_count(d);
> +}
> +
> +/**
> + * Enqueue fixed number of objects on a deque at the head.
> + *
> + * This function copies the objects at the head of the deque and
> + * moves the head index.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @param obj_table
> + *   A pointer to a table of objects.

Use "array", not "table".

> + * @param esize
> + *   The size of deque object, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the deque. Otherwise
> + *   the results are undefined.
> + * @param n
> + *   The number of objects to add in the deque from the obj_table.
> + * @param free_space
> + *   Returns the amount of space in the deque after the enqueue operation
> + *   has finished.

I think you should remove the parameter. Just use the free count 
function if you need this information.

> + * @return
> + *   The number of objects enqueued, either 0 or n

Do we really need both a "bulk" and a "burst" function? Seems to me like 
burst-only would be good enough, and in case you want to know if you can 
fit the whole array, you can just check first. No concurrency issues, 
since this thingy is not MT safe.

> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_deque_head_enqueue_bulk_elem(struct rte_deque *d,

Maybe use "push" and "pop" instead of "enqueue"/"dequeue"? Or maybe 
"append" and "pop" (like Python does). I think it make sense to not copy 
too much of the rte_ring terminology and design, since this thing is 
something else, way simpler, non-MT safe. Python also uses "left" and 
"right", rather than head and tail. I guess in the deque case, what is 
head and what is tail is not entirely clear.

Also, doesn't "enqueue" imply the operation is working against the tail, 
not the head?

> +			const void *obj_table,
> +			unsigned int esize,
> +			unsigned int n,
> +			unsigned int *free_space)
> +{
> +	*free_space = rte_deque_free_count(d);
> +	if (unlikely(n > *free_space))
> +		return 0;
> +	*free_space -= n;
> +	return __rte_deque_enqueue_at_head(d, obj_table, esize, n);
> +}
> +
> +/**
> + * Enqueue up to a maximum number of objects on a deque at the head.
> + *
> + * This function copies the objects at the head of the deque and
> + * moves the head index.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @param obj_table
> + *   A pointer to a table of objects.
> + * @param esize
> + *   The size of deque object, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the deque. Otherwise
> + *   the results are undefined.
> + * @param n
> + *   The number of objects to add in the deque from the obj_table.
> + * @param free_space
> + *   Returns the amount of space in the deque after the enqueue operation
> + *   has finished.
> + * @return
> + *   - n: Actual number of objects enqueued.
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_deque_head_enqueue_burst_elem(struct rte_deque *d, const void *obj_table,
> +			unsigned int esize, unsigned int n,
> +			unsigned int *free_space)
> +{
> +	unsigned int avail_space = rte_deque_free_count(d);
> +	unsigned int to_be_enqueued = (n <= avail_space ? n : avail_space);
> +	*free_space = avail_space - n;
> +	return __rte_deque_enqueue_at_head(d, obj_table, esize, to_be_enqueued);
> +}
> +
> +/**
> + * Enqueue fixed number of objects on a deque at the tail.
> + *
> + * This function copies the objects at the tail of the deque and
> + * moves the tail index (backwards).
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @param obj_table
> + *   A pointer to a table of objects.
> + * @param esize
> + *   The size of deque object, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the deque. Otherwise
> + *   the results are undefined.
> + * @param n
> + *   The number of objects to add in the deque from the obj_table.
> + * @param free_space
> + *   Returns the amount of space in the deque after the enqueue operation
> + *   has finished.
> + * @return
> + *   The number of objects enqueued, either 0 or n
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_deque_tail_enqueue_bulk_elem(struct rte_deque *d,
> +				 const void *obj_table, unsigned int esize,
> +				 unsigned int n, unsigned int *free_space)
> +{
> +	*free_space = rte_deque_free_count(d);
> +	if (unlikely(n > *free_space))
> +		return 0;
> +	*free_space -= n;
> +	return __rte_deque_enqueue_at_tail(d, obj_table, esize, n);
> +}
> +
> +/**
> + * Enqueue up to a maximum number of objects on a deque at the tail.
> + *
> + * This function copies the objects at the tail of the deque and
> + * moves the tail index (backwards).
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @param obj_table
> + *   A pointer to a table of objects.
> + * @param esize
> + *   The size of deque object, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the deque. Otherwise
> + *   the results are undefined.
> + * @param n
> + *   The number of objects to add in the deque from the obj_table.
> + * @param free_space
> + *   Returns the amount of space in the deque after the enqueue operation
> + *   has finished.
> + * @return
> + *   - n: Actual number of objects enqueued.
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_deque_tail_enqueue_burst_elem(struct rte_deque *d,
> +				const void *obj_table, unsigned int esize,
> +				unsigned int n, unsigned int *free_space)
> +{
> +	unsigned int avail_space = rte_deque_free_count(d);
> +	unsigned int to_be_enqueued = (n <= avail_space ? n : avail_space);
> +	*free_space = avail_space - to_be_enqueued;
> +	return __rte_deque_enqueue_at_tail(d, obj_table, esize, to_be_enqueued);
> +}
> +
> +/**
> + * Dequeue a fixed number of objects from a deque at tail.
> + *
> + * This function copies the objects from the tail of the deque and
> + * moves the tail index.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @param obj_table
> + *   A pointer to a table of objects that will be filled.
> + * @param esize
> + *   The size of deque object, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the deque. Otherwise
> + *   the results are undefined.
> + * @param n
> + *   The number of objects to dequeue from the deque to the obj_table.
> + * @param available
> + *   Returns the number of remaining deque entries after the dequeue
> + *   has finished.
> + * @return
> + *   The number of objects dequeued, either 0 or n
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_deque_tail_dequeue_bulk_elem(struct rte_deque *d, void *obj_table,
> +			unsigned int esize, unsigned int n,
> +			unsigned int *available)
> +{
> +	*available = rte_deque_count(d);
> +	if (unlikely(n > *available))
> +		return 0;
> +	*available -= n;
> +	return __rte_deque_dequeue_at_tail(d, obj_table, esize, n);
> +}
> +
> +/**
> + * Dequeue up to a maximum number of objects from a deque at tail.
> + *
> + * This function copies the objects from the tail of the deque and
> + * moves the tail index.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @param obj_table
> + *   A pointer to a table of objects that will be filled.
> + * @param esize
> + *   The size of deque object, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the deque. Otherwise
> + *   the results are undefined.
> + * @param n
> + *   The number of objects to dequeue from the deque to the obj_table.
> + * @param available
> + *   Returns the number of remaining deque entries after the dequeue
> + *   has finished.
> + * @return
> + *   - Number of objects dequeued
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_deque_tail_dequeue_burst_elem(struct rte_deque *d, void *obj_table,
> +			unsigned int esize, unsigned int n,
> +			unsigned int *available)
> +{
> +	unsigned int count = rte_deque_count(d);
> +	unsigned int to_be_dequeued = (n <= count ? n : count);
> +	*available = count - to_be_dequeued;
> +	return __rte_deque_dequeue_at_tail(d, obj_table, esize, to_be_dequeued);
> +}
> +
> +/**
> + * Dequeue a fixed number of objects from a deque from the head.
> + *
> + * This function copies the objects from the head of the deque and
> + * moves the head index (backwards).
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @param obj_table
> + *   A pointer to a table of objects that will be filled.
> + * @param esize
> + *   The size of deque object, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the deque. Otherwise
> + *   the results are undefined.
> + * @param n
> + *   The number of objects to dequeue from the deque to the obj_table.
> + * @param available
> + *   Returns the number of remaining deque entries after the dequeue
> + *   has finished.
> + * @return
> + *   The number of objects dequeued, either 0 or n
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_deque_head_dequeue_bulk_elem(struct rte_deque *d, void *obj_table,
> +			unsigned int esize, unsigned int n,
> +			unsigned int *available)
> +{
> +	*available = rte_deque_count(d);
> +	if (unlikely(n > *available))
> +		return 0;
> +	*available -= n;
> +	return __rte_deque_dequeue_at_head(d, obj_table, esize, n);
> +}
> +
> +/**
> + * Dequeue up to a maximum number of objects from a deque from the head.
> + *
> + * This function copies the objects from the head of the deque and
> + * moves the head index (backwards).
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @param obj_table
> + *   A pointer to a table of objects that will be filled.
> + * @param esize
> + *   The size of deque object, in bytes. It must be a multiple of 4.
> + *   This must be the same value used while creating the deque. Otherwise
> + *   the results are undefined.
> + * @param n
> + *   The number of objects to dequeue from the deque to the obj_table.
> + * @param available
> + *   Returns the number of remaining deque entries after the dequeue
> + *   has finished.
> + * @return
> + *   - Number of objects dequeued
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_deque_head_dequeue_burst_elem(struct rte_deque *d, void *obj_table,
> +			unsigned int esize, unsigned int n,
> +			unsigned int *available)
> +{
> +	unsigned int count = rte_deque_count(d);
> +	unsigned int to_be_dequeued = (n <= count ? n : count);
> +	*available = count - to_be_dequeued;
> +	return __rte_deque_dequeue_at_head(d, obj_table, esize, to_be_dequeued);
> +}
> +
> +/**
> + * Flush a deque.
> + *
> + * This function flush all the objects in a deque
> + *
> + * @warning
> + * Make sure the deque is not in use while calling this function.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + */
> +__rte_experimental
> +void rte_deque_reset(struct rte_deque *d);
> +
> +/**
> + * Test if a deque is full.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @return
> + *   - 1: The deque is full.
> + *   - 0: The deque is not full.
> + */
> +static inline int
> +rte_deque_full(const struct rte_deque *d)
> +{
> +	return rte_deque_free_count(d) == 0;
> +}
> +
> +/**
> + * Test if a deque is empty.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @return
> + *   - 1: The deque is empty.
> + *   - 0: The deque is not empty.
> + */
> +static inline int
> +rte_deque_empty(const struct rte_deque *d)
> +{
> +	return d->tail == d->head;
> +}
> +
> +/**
> + * Return the size of the deque.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @return
> + *   The size of the data store used by the deque.

What is the "data store"? The size in what? Elements, bytes.

> + *   NOTE: this is not the same as the usable space in the deque. To query that
> + *   use ``rte_deque_get_capacity()``.
> + */
> +static inline unsigned int
> +rte_deque_get_size(const struct rte_deque *d)
> +{
> +	return d->size;
> +}
> +
> +/**
> + * Return the number of objects which can be stored in the deque.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @return
> + *   The usable size of the deque.
> + */
> +static inline unsigned int
> +rte_deque_get_capacity(const struct rte_deque *d)
> +{
> +	return d->capacity;
> +}
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _RTE_DEQUE_H_ */
> diff --git a/lib/deque/rte_deque_core.h b/lib/deque/rte_deque_core.h
> new file mode 100644
> index 0000000000..0bb8695c8a
> --- /dev/null
> +++ b/lib/deque/rte_deque_core.h
> @@ -0,0 +1,81 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2024 Arm Limited
> + */
> +
> +#ifndef _RTE_DEQUE_CORE_H_
> +#define _RTE_DEQUE_CORE_H_
> +
> +/**
> + * @file
> + * This file contains definition of RTE deque structure, init flags and
> + * some related macros. This file should not be included directly,
> + * include rte_deque.h instead.
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdint.h>
> +#include <string.h>
> +#include <errno.h>
> +#include <rte_common.h>
> +#include <rte_config.h>
> +#include <rte_memory.h>
> +#include <rte_lcore.h>
> +#include <rte_atomic.h>
> +#include <rte_branch_prediction.h>
> +#include <rte_memzone.h>
> +#include <rte_pause.h>
> +#include <rte_debug.h>
> +
> +extern int rte_deque_log_type;
> +
> +#define RTE_DEQUE_MZ_PREFIX "DEQUE_"
> +/** The maximum length of a deque name. */
> +#define RTE_DEQUE_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
> +			   sizeof(RTE_DEQUE_MZ_PREFIX) + 1)
> +
> +/**
> + * Double ended queue (deque) structure.
> + *
> + * The producer and the consumer have a head and a tail index. These indices
> + * are not between 0 and size(deque)-1. These indices are between 0 and
> + * 2^32 -1. Their value is masked while accessing the objects in deque.
> + * These indices are unsigned 32bits. Hence the result of the subtraction is
> + * always a modulo of 2^32 and it is between 0 and capacity.
> + */
> +struct rte_deque {
> +	alignas(RTE_CACHE_LINE_SIZE) char name[RTE_DEQUE_NAMESIZE];
> +	/**< Name of the deque */
> +	int flags;
> +	/**< Flags supplied at creation. */
> +	const struct rte_memzone *memzone;
> +	/**< Memzone, if any, containing the rte_deque */
> +
> +	alignas(RTE_CACHE_LINE_SIZE) char pad0; /**< empty cache line */
> +
> +	uint32_t size;           /**< Size of deque. */
> +	uint32_t mask;           /**< Mask (size-1) of deque. */
> +	uint32_t capacity;       /**< Usable size of deque */
> +	/** Ring head and tail pointers. */
> +	volatile uint32_t head;
> +	volatile uint32_t tail;

Remove volatile.

> +};
> +
> +/**
> + * Deque is to hold exactly requested number of entries.
> + * Without this flag set, the deque size requested must be a power of 2, and the
> + * usable space will be that size - 1. With the flag, the requested size will
> + * be rounded up to the next power of two, but the usable space will be exactly
> + * that requested. Worst case, if a power-of-2 size is requested, half the
> + * deque space will be wasted.
> + */
> +#define RTE_DEQUE_F_EXACT_SZ 0x0004
> +#define RTE_DEQUE_SZ_MASK  (0x7fffffffU) /**< Ring size mask */
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _RTE_DEQUE_CORE_H_ */
> diff --git a/lib/deque/rte_deque_pvt.h b/lib/deque/rte_deque_pvt.h
> new file mode 100644
> index 0000000000..931bbd4d19
> --- /dev/null
> +++ b/lib/deque/rte_deque_pvt.h
> @@ -0,0 +1,538 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2024 Arm Limited
> + */
> +
> +#ifndef _RTE_DEQUE_PVT_H_
> +#define _RTE_DEQUE_PVT_H_
> +
> +#define __RTE_DEQUE_COUNT(d) ((d->head - d->tail) & d->mask)
> +#define __RTE_DEQUE_FREE_SPACE(d) (d->capacity - __RTE_DEQUE_COUNT(d))
> +
> +static __rte_always_inline void
> +__rte_deque_enqueue_elems_head_32(struct rte_deque *d,
> +				const unsigned int size,
> +				uint32_t idx,
> +				const void *obj_table,
> +				unsigned int n)
> +{
> +	unsigned int i;
> +	uint32_t *deque = (uint32_t *)&d[1];
> +	const uint32_t *obj = (const uint32_t *)obj_table;
> +	if (likely(idx + n <= size)) {
> +		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
> +			deque[idx] = obj[i];
> +			deque[idx + 1] = obj[i + 1];
> +			deque[idx + 2] = obj[i + 2];
> +			deque[idx + 3] = obj[i + 3];
> +			deque[idx + 4] = obj[i + 4];
> +			deque[idx + 5] = obj[i + 5];
> +			deque[idx + 6] = obj[i + 6];
> +			deque[idx + 7] = obj[i + 7];
> +		}
> +		switch (n & 0x7) {
> +		case 7:
> +			deque[idx++] = obj[i++]; /* fallthrough */
> +		case 6:
> +			deque[idx++] = obj[i++]; /* fallthrough */
> +		case 5:
> +			deque[idx++] = obj[i++]; /* fallthrough */
> +		case 4:
> +			deque[idx++] = obj[i++]; /* fallthrough */
> +		case 3:
> +			deque[idx++] = obj[i++]; /* fallthrough */
> +		case 2:
> +			deque[idx++] = obj[i++]; /* fallthrough */
> +		case 1:
> +			deque[idx++] = obj[i++]; /* fallthrough */
> +		}
> +	} else {
> +		for (i = 0; idx < size; i++, idx++)
> +			deque[idx] = obj[i];
> +		/* Start at the beginning */
> +		for (idx = 0; i < n; i++, idx++)
> +			deque[idx] = obj[i];
> +	}
> +}
> +
> +static __rte_always_inline void
> +__rte_deque_enqueue_elems_head_64(struct rte_deque *d,
> +				const void *obj_table,
> +				unsigned int n)
> +{
> +	unsigned int i;
> +	const uint32_t size = d->size;
> +	uint32_t idx = (d->head & d->mask);
> +	uint64_t *deque = (uint64_t *)&d[1];
> +	const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
> +	if (likely(idx + n <= size)) {
> +		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
> +			deque[idx] = obj[i];
> +			deque[idx + 1] = obj[i + 1];
> +			deque[idx + 2] = obj[i + 2];
> +			deque[idx + 3] = obj[i + 3];
> +		}
> +		switch (n & 0x3) {
> +		case 3:
> +			deque[idx++] = obj[i++]; /* fallthrough */
> +		case 2:
> +			deque[idx++] = obj[i++]; /* fallthrough */
> +		case 1:
> +			deque[idx++] = obj[i++]; /* fallthrough */
> +		}
> +	} else {
> +		for (i = 0; idx < size; i++, idx++)
> +			deque[idx] = obj[i];
> +		/* Start at the beginning */
> +		for (idx = 0; i < n; i++, idx++)
> +			deque[idx] = obj[i];
> +	}
> +}
> +
> +static __rte_always_inline void
> +__rte_deque_enqueue_elems_head_128(struct rte_deque *d,
> +				const void *obj_table,
> +				unsigned int n)
> +{
> +	unsigned int i;
> +	const uint32_t size = d->size;
> +	uint32_t idx = (d->head & d->mask);
> +	rte_int128_t *deque = (rte_int128_t *)&d[1];
> +	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
> +	if (likely(idx + n <= size)) {
> +		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
> +			memcpy((void *)(deque + idx),
> +				(const void *)(obj + i), 32);
> +		switch (n & 0x1) {
> +		case 1:
> +			memcpy((void *)(deque + idx),
> +				(const void *)(obj + i), 16);
> +		}
> +	} else {
> +		for (i = 0; idx < size; i++, idx++)
> +			memcpy((void *)(deque + idx),
> +				(const void *)(obj + i), 16);
> +		/* Start at the beginning */
> +		for (idx = 0; i < n; i++, idx++)
> +			memcpy((void *)(deque + idx),
> +				(const void *)(obj + i), 16);
> +	}
> +}
> +
> +static __rte_always_inline unsigned int
> +__rte_deque_enqueue_at_head(struct rte_deque *d,
> +			const void *obj_table,
> +			unsigned int esize,
> +			unsigned int n)
> +{
> +	/* 8B and 16B copies implemented individually because on some platforms
> +	 * there are 64 bit and 128 bit registers available for direct copying.
> +	 */
> +	if (esize == 8)
> +		__rte_deque_enqueue_elems_head_64(d, obj_table, n);
> +	else if (esize == 16)
> +		__rte_deque_enqueue_elems_head_128(d, obj_table, n);
> +	else {
> +		uint32_t idx, scale, nd_idx, nd_num, nd_size;
> +
> +		/* Normalize to uint32_t */
> +		scale = esize / sizeof(uint32_t);
> +		nd_num = n * scale;
> +		idx = d->head & d->mask;
> +		nd_idx = idx * scale;
> +		nd_size = d->size * scale;
> +		__rte_deque_enqueue_elems_head_32(d, nd_size, nd_idx,
> +						obj_table, nd_num);
> +	}
> +	d->head = (d->head + n) & d->mask;
> +	return n;
> +}
> +
> +static __rte_always_inline void
> +__rte_deque_enqueue_elems_tail_32(struct rte_deque *d,
> +				const unsigned int mask,
> +				uint32_t idx,
> +				const void *obj_table,
> +				unsigned int n,
> +				const unsigned int scale,
> +				const unsigned int elem_size)
> +{
> +	unsigned int i;
> +	uint32_t *deque = (uint32_t *)&d[1];
> +	const uint32_t *obj = (const uint32_t *)obj_table;
> +
> +	if (likely(idx >= n)) {
> +		for (i = 0; i < n; idx -= scale, i += scale)
> +			memcpy(&deque[idx], &obj[i], elem_size);
> +	} else {
> +		for (i = 0; (int32_t)idx >= 0; idx -= scale, i += scale)
> +			memcpy(&deque[idx], &obj[i], elem_size);
> +
> +		/* Start at the ending */
> +		idx = mask;
> +		for (; i < n; idx -= scale, i += scale)
> +			memcpy(&deque[idx], &obj[i], elem_size);
> +	}
> +}
> +
> +static __rte_always_inline void
> +__rte_deque_enqueue_elems_tail_64(struct rte_deque *d,
> +				const void *obj_table,
> +				unsigned int n)
> +{
> +	unsigned int i;
> +	uint32_t idx = (d->tail & d->mask);
> +	uint64_t *deque = (uint64_t *)&d[1];
> +	const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
> +	if (likely((int32_t)(idx - n) >= 0)) {
> +		for (i = 0; i < (n & ~0x3); i += 4, idx -= 4) {
> +			deque[idx] = obj[i];
> +			deque[idx - 1] = obj[i + 1];
> +			deque[idx - 2] = obj[i + 2];
> +			deque[idx - 3] = obj[i + 3];
> +		}
> +		switch (n & 0x3) {
> +		case 3:
> +			deque[idx--] = obj[i++]; /* fallthrough */
> +		case 2:
> +			deque[idx--] = obj[i++]; /* fallthrough */
> +		case 1:
> +			deque[idx--] = obj[i++]; /* fallthrough */
> +		}
> +	} else {
> +		for (i = 0; (int32_t)idx >= 0; i++, idx--)
> +			deque[idx] = obj[i];
> +		/* Start at the ending */
> +		for (idx = d->mask; i < n; i++, idx--)
> +			deque[idx] = obj[i];
> +	}
> +}
> +
> +static __rte_always_inline void
> +__rte_deque_enqueue_elems_tail_128(struct rte_deque *d,
> +				const void *obj_table,
> +				unsigned int n)
> +{
> +	unsigned int i;
> +	uint32_t idx = (d->tail & d->mask);
> +	rte_int128_t *deque = (rte_int128_t *)&d[1];
> +	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
> +	if (likely((int32_t)(idx - n) >= 0)) {
> +		for (i = 0; i < (n & ~0x1); i += 2, idx -= 2) {
> +			deque[idx] = obj[i];
> +			deque[idx - 1] = obj[i + 1];
> +		}
> +		switch (n & 0x1) {
> +		case 1:
> +			memcpy((void *)(deque + idx),
> +				(const void *)(obj + i), 16);
> +		}
> +	} else {
> +		for (i = 0; (int32_t)idx >= 0; i++, idx--)
> +			memcpy((void *)(deque + idx),
> +				(const void *)(obj + i), 16);
> +		/* Start at the ending */
> +		for (idx = d->mask; i < n; i++, idx--)
> +			memcpy((void *)(deque + idx),
> +				(const void *)(obj + i), 16);
> +	}
> +}
> +
> +static __rte_always_inline unsigned int
> +__rte_deque_enqueue_at_tail(struct rte_deque *d,
> +			const void *obj_table,
> +			unsigned int esize,
> +			unsigned int n)
> +{
> +	/* The tail point must point at an empty cell when enqueuing */
> +	d->tail--;
> +
> +	/* 8B and 16B copies implemented individually because on some platforms
> +	 * there are 64 bit and 128 bit registers available for direct copying.
> +	 */
> +	if (esize == 8)
> +		__rte_deque_enqueue_elems_tail_64(d, obj_table, n);
> +	else if (esize == 16)
> +		__rte_deque_enqueue_elems_tail_128(d, obj_table, n);
> +	else {
> +		uint32_t idx, scale, nd_idx, nd_num, nd_mask;
> +
> +		/* Normalize to uint32_t */
> +		scale = esize / sizeof(uint32_t);
> +		nd_num = n * scale;
> +		idx = d->tail & d->mask;
> +		nd_idx = idx * scale;
> +		nd_mask = d->mask * scale;
> +		__rte_deque_enqueue_elems_tail_32(d, nd_mask, nd_idx, obj_table,
> +						nd_num, scale, esize);
> +	}
> +
> +	/* The +1 is because the tail needs to point at a
> +	 * non-empty memory location after the enqueuing operation.
> +	 */
> +	d->tail = (d->tail - n + 1) & d->mask;
> +	return n;
> +}
> +
> +static __rte_always_inline void
> +__rte_deque_dequeue_elems_32(struct rte_deque *d,
> +			const unsigned int size,
> +			uint32_t idx,
> +			void *obj_table,
> +			unsigned int n)
> +{
> +	unsigned int i;
> +	const uint32_t *deque = (const uint32_t *)&d[1];
> +	uint32_t *obj = (uint32_t *)obj_table;
> +	if (likely(idx + n <= size)) {
> +		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
> +			obj[i] = deque[idx];
> +			obj[i + 1] = deque[idx + 1];
> +			obj[i + 2] = deque[idx + 2];
> +			obj[i + 3] = deque[idx + 3];
> +			obj[i + 4] = deque[idx + 4];
> +			obj[i + 5] = deque[idx + 5];
> +			obj[i + 6] = deque[idx + 6];
> +			obj[i + 7] = deque[idx + 7];
> +		}
> +		switch (n & 0x7) {
> +		case 7:
> +			obj[i++] = deque[idx++]; /* fallthrough */
> +		case 6:
> +			obj[i++] = deque[idx++]; /* fallthrough */
> +		case 5:
> +			obj[i++] = deque[idx++]; /* fallthrough */
> +		case 4:
> +			obj[i++] = deque[idx++]; /* fallthrough */
> +		case 3:
> +			obj[i++] = deque[idx++]; /* fallthrough */
> +		case 2:
> +			obj[i++] = deque[idx++]; /* fallthrough */
> +		case 1:
> +			obj[i++] = deque[idx++]; /* fallthrough */
> +		}
> +	} else {
> +		for (i = 0; idx < size; i++, idx++)
> +			obj[i] = deque[idx];
> +		/* Start at the beginning */
> +		for (idx = 0; i < n; i++, idx++)
> +			obj[i] = deque[idx];
> +	}
> +}
> +
> +static __rte_always_inline void
> +__rte_deque_dequeue_elems_64(struct rte_deque *d, void *obj_table,
> +			unsigned int n)
> +{
> +	unsigned int i;
> +	const uint32_t size = d->size;
> +	uint32_t idx = (d->tail & d->mask);
> +	const uint64_t *deque = (const uint64_t *)&d[1];
> +	unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
> +	if (likely(idx + n <= size)) {
> +		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
> +			obj[i] = deque[idx];
> +			obj[i + 1] = deque[idx + 1];
> +			obj[i + 2] = deque[idx + 2];
> +			obj[i + 3] = deque[idx + 3];
> +		}
> +		switch (n & 0x3) {
> +		case 3:
> +			obj[i++] = deque[idx++]; /* fallthrough */
> +		case 2:
> +			obj[i++] = deque[idx++]; /* fallthrough */
> +		case 1:
> +			obj[i++] = deque[idx++]; /* fallthrough */
> +		}
> +	} else {
> +		for (i = 0; idx < size; i++, idx++)
> +			obj[i] = deque[idx];
> +		/* Start at the beginning */
> +		for (idx = 0; i < n; i++, idx++)
> +			obj[i] = deque[idx];
> +	}
> +}
> +
> +static __rte_always_inline void
> +__rte_deque_dequeue_elems_128(struct rte_deque *d,
> +			void *obj_table,
> +			unsigned int n)
> +{
> +	unsigned int i;
> +	const uint32_t size = d->size;
> +	uint32_t idx = (d->tail & d->mask);
> +	const rte_int128_t *deque = (const rte_int128_t *)&d[1];
> +	rte_int128_t *obj = (rte_int128_t *)obj_table;
> +	if (likely(idx + n <= size)) {
> +		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
> +			memcpy((void *)(obj + i),
> +				(const void *)(deque + idx), 32);
> +		switch (n & 0x1) {
> +		case 1:
> +			memcpy((void *)(obj + i),
> +				(const void *)(deque + idx), 16);
> +		}
> +	} else {
> +		for (i = 0; idx < size; i++, idx++)
> +			memcpy((void *)(obj + i),
> +				(const void *)(deque + idx), 16);
> +		/* Start at the beginning */
> +		for (idx = 0; i < n; i++, idx++)
> +			memcpy((void *)(obj + i),
> +				(const void *)(deque + idx), 16);
> +	}
> +}
> +
> +static __rte_always_inline unsigned int
> +__rte_deque_dequeue_at_tail(struct rte_deque *d,
> +			void *obj_table,
> +			unsigned int esize,
> +			unsigned int n)
> +{
> +	/* 8B and 16B copies implemented individually because on some platforms
> +	 * there are 64 bit and 128 bit registers available for direct copying.
> +	 */
> +	if (esize == 8)
> +		__rte_deque_dequeue_elems_64(d, obj_table, n);
> +	else if (esize == 16)
> +		__rte_deque_dequeue_elems_128(d, obj_table, n);
> +	else {
> +		uint32_t idx, scale, nd_idx, nd_num, nd_size;
> +
> +		/* Normalize to uint32_t */
> +		scale = esize / sizeof(uint32_t);
> +		nd_num = n * scale;
> +		idx = d->tail & d->mask;
> +		nd_idx = idx * scale;
> +		nd_size = d->size * scale;
> +		__rte_deque_dequeue_elems_32(d, nd_size, nd_idx,
> +					obj_table, nd_num);
> +	}
> +	d->tail = (d->tail + n) & d->mask;
> +	return n;
> +}
> +
> +static __rte_always_inline void
> +__rte_deque_dequeue_elems_head_32(struct rte_deque *d,
> +				const unsigned int mask,
> +				uint32_t idx,
> +				void *obj_table,
> +				unsigned int n,
> +				const unsigned int scale,
> +				const unsigned int elem_size)
> +{
> +	unsigned int i;
> +	const uint32_t *deque = (uint32_t *)&d[1];
> +	uint32_t *obj = (uint32_t *)obj_table;
> +
> +	if (likely(idx >= n)) {
> +		for (i = 0; i < n; idx -= scale, i += scale)
> +			memcpy(&obj[i], &deque[idx], elem_size);
> +	} else {
> +		for (i = 0; (int32_t)idx >= 0; idx -= scale, i += scale)
> +			memcpy(&obj[i], &deque[idx], elem_size);
> +		/* Start at the ending */
> +		idx = mask;
> +		for (; i < n; idx -= scale, i += scale)
> +			memcpy(&obj[i], &deque[idx], elem_size);
> +	}
> +}
> +
> +static __rte_always_inline void
> +__rte_deque_dequeue_elems_head_64(struct rte_deque *d,
> +				void *obj_table,
> +				unsigned int n)
> +{
> +	unsigned int i;
> +	uint32_t idx = (d->head & d->mask);
> +	const uint64_t *deque = (uint64_t *)&d[1];
> +	unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
> +	if (likely((int32_t)(idx - n) >= 0)) {
> +		for (i = 0; i < (n & ~0x3); i += 4, idx -= 4) {
> +			obj[i] = deque[idx];
> +			obj[i + 1] = deque[idx - 1];
> +			obj[i + 2] = deque[idx - 2];
> +			obj[i + 3] = deque[idx - 3];
> +		}
> +		switch (n & 0x3) {
> +		case 3:
> +			obj[i++] = deque[idx--];  /* fallthrough */
> +		case 2:
> +			obj[i++] = deque[idx--]; /* fallthrough */
> +		case 1:
> +			obj[i++] = deque[idx--]; /* fallthrough */
> +		}
> +	} else {
> +		for (i = 0; (int32_t)idx >= 0; i++, idx--)
> +			obj[i] = deque[idx];
> +		/* Start at the ending */
> +		for (idx = d->mask; i < n; i++, idx--)
> +			obj[i] = deque[idx];
> +	}
> +}
> +
> +static __rte_always_inline void
> +__rte_deque_dequeue_elems_head_128(struct rte_deque *d,
> +				void *obj_table,
> +				unsigned int n)
> +{
> +	unsigned int i;
> +	uint32_t idx = (d->head & d->mask);
> +	const rte_int128_t *deque = (rte_int128_t *)&d[1];
> +	rte_int128_t *obj = (rte_int128_t *)obj_table;
> +	if (likely((int32_t)(idx - n) >= 0)) {
> +		for (i = 0; i < (n & ~0x1); i += 2, idx -= 2) {
> +			obj[i] = deque[idx];
> +			obj[i + 1] = deque[idx - 1];
> +		}
> +		switch (n & 0x1) {
> +		case 1:
> +			memcpy((void *)(obj + i),
> +				(const void *)(deque + idx), 16);
> +		}
> +	} else {
> +		for (i = 0; (int32_t)idx >= 0; i++, idx--)
> +			memcpy((void *)(obj + i),
> +				(const void *)(deque + idx), 16);
> +		/* Start at the ending */
> +		for (idx = d->mask; i < n; i++, idx--)
> +			memcpy((void *)(obj + i),
> +				(const void *)(deque + idx), 16);
> +	}
> +}
> +
> +static __rte_always_inline unsigned int
> +__rte_deque_dequeue_at_head(struct rte_deque *d,
> +			void *obj_table,
> +			unsigned int esize,
> +			unsigned int n)
> +{
> +	/* The head must point at an empty cell when dequeueing */
> +	d->head--;
> +
> +	/* 8B and 16B copies implemented individually because on some platforms
> +	 * there are 64 bit and 128 bit registers available for direct copying.
> +	 */
> +	if (esize == 8)
> +		__rte_deque_dequeue_elems_head_64(d, obj_table, n);
> +	else if (esize == 16)
> +		__rte_deque_dequeue_elems_head_128(d, obj_table, n);
> +	else {
> +		uint32_t idx, scale, nd_idx, nd_num, nd_mask;
> +
> +		/* Normalize to uint32_t */
> +		scale = esize / sizeof(uint32_t);
> +		nd_num = n * scale;
> +		idx = d->head & d->mask;
> +		nd_idx = idx * scale;
> +		nd_mask = d->mask * scale;
> +		__rte_deque_dequeue_elems_head_32(d, nd_mask, nd_idx, obj_table,
> +						nd_num, scale, esize);
> +	}
> +
> +	/* The +1 is because the head needs to point at a
> +	 * empty memory location after the dequeueing operation.
> +	 */
> +	d->head = (d->head - n + 1) & d->mask;
> +	return n;
> +}
> +#endif /* _RTE_DEQUEU_PVT_H_ */
> diff --git a/lib/deque/rte_deque_zc.h b/lib/deque/rte_deque_zc.h
> new file mode 100644
> index 0000000000..6d7167e158
> --- /dev/null
> +++ b/lib/deque/rte_deque_zc.h
> @@ -0,0 +1,430 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2024 Arm Limited
> + */
> +#ifndef _RTE_DEQUE_ZC_H_
> +#define _RTE_DEQUE_ZC_H_
> +
> +/**
> + * @file
> + * This file should not be included directly, include rte_deque.h instead.
> + *
> + * Deque Zero Copy APIs
> + * These APIs make it possible to split public enqueue/dequeue API
> + * into 3 parts:
> + * - enqueue/dequeue start
> + * - copy data to/from the deque
> + * - enqueue/dequeue finish
> + * These APIs provide the ability to avoid copying of the data to temporary area.
> + *
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +/**
> + * Deque zero-copy information structure.
> + *
> + * This structure contains the pointers and length of the space
> + * reserved on the Deque storage.
> + */
> +struct __rte_cache_aligned rte_deque_zc_data {
> +	/* Pointer to the first space in the deque */
> +	void *ptr1;
> +	/* Pointer to the second space in the deque if there is wrap-around.
> +	 * It contains valid value only if wrap-around happens.
> +	 */
> +	void *ptr2;
> +	/* Number of elements in the first pointer. If this is equal to
> +	 * the number of elements requested, then ptr2 is NULL.
> +	 * Otherwise, subtracting n1 from number of elements requested
> +	 * will give the number of elements available at ptr2.
> +	 */
> +	unsigned int n1;
> +};
> +
> +static __rte_always_inline void
> +__rte_deque_get_elem_addr(struct rte_deque *d, uint32_t pos,
> +	uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2,
> +	bool low_to_high)
> +{
> +	uint32_t idx, scale, nr_idx;
> +	uint32_t *deque_ptr = (uint32_t *)&d[1];
> +
> +	/* Normalize to uint32_t */
> +	scale = esize / sizeof(uint32_t);
> +	idx = pos & d->mask;
> +	nr_idx = idx * scale;
> +
> +	*dst1 = deque_ptr + nr_idx;
> +	*n1 = num;
> +
> +	if (low_to_high) {
> +		if (idx + num > d->size) {
> +			*n1 = d->size - idx;
> +			*dst2 = deque_ptr;
> +		} else
> +			*dst2 = NULL;
> +	} else {
> +		if ((int32_t)(idx - num) < 0) {
> +			*n1 = idx + 1;
> +			*dst2 = (void *)&deque_ptr[(-1 & d->mask) * scale];
> +		} else
> +			*dst2 = NULL;
> +	}
> +}
> +
> +/**
> + * Start to enqueue several objects on the deque.
> + * Note that no actual objects are put in the deque by this function,
> + * it just reserves space for the user on the deque.
> + * User has to copy objects into the deque using the returned pointers.
> + * User should call rte_deque_enqueue_zc_elem_finish to complete the
> + * enqueue operation.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @param esize
> + *   The size of deque element, in bytes. It must be a multiple of 4.
> + * @param n
> + *   The number of objects to add in the deque.
> + * @param zcd
> + *   Structure containing the pointers and length of the space
> + *   reserved on the deque storage.
> + * @param free_space
> + *   Returns the amount of space in the deque after the reservation operation
> + *   has finished.
> + * @return
> + *   The number of objects that can be enqueued, either 0 or n
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_deque_head_enqueue_zc_bulk_elem_start(struct rte_deque *d, unsigned int esize,
> +	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space)
> +{
> +
> +	*free_space = __RTE_DEQUE_FREE_SPACE(d);
> +	if (unlikely(*free_space < n))
> +		return 0;
> +	__rte_deque_get_elem_addr(d, d->head, esize, n, &zcd->ptr1,
> +							&zcd->n1, &zcd->ptr2, true);
> +
> +	*free_space -= n;
> +	return n;
> +}
> +
> +/**
> + * Complete enqueuing several pointers to objects on the deque.
> + * Note that number of objects to enqueue should not exceed previous
> + * enqueue_start return value.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @param n
> + *   The number of pointers to objects to add to the deque.
> + */
> +__rte_experimental
> +static __rte_always_inline void
> +rte_deque_head_enqueue_zc_elem_finish(struct rte_deque *d, unsigned int n)
> +{
> +	d->head = (d->head + n) & d->mask;
> +}
> +
> +/**
> + * Start to enqueue several objects on the deque.
> + * Note that no actual objects are put in the queue by this function,
> + * it just reserves space for the user on the deque.
> + * User has to copy objects into the queue using the returned pointers.
> + * User should call rte_deque_enqueue_zc_elem_finish to complete the
> + * enqueue operation.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @param esize
> + *   The size of deque element, in bytes. It must be a multiple of 4.
> + * @param n
> + *   The number of objects to add in the deque.
> + * @param zcd
> + *   Structure containing the pointers and length of the space
> + *   reserved on the deque storage.
> + * @param free_space
> + *   Returns the amount of space in the deque after the reservation operation
> + *   has finished.
> + * @return
> + *   The number of objects that can be enqueued, either 0 or n
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_deque_head_enqueue_zc_burst_elem_start(struct rte_deque *d, unsigned int esize,
> +	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space)
> +{
> +	*free_space = __RTE_DEQUE_FREE_SPACE(d);
> +	n = n > *free_space ? *free_space : n;
> +	return rte_deque_head_enqueue_zc_bulk_elem_start(d, esize, n, zcd, free_space);
> +}
> +
> +/**
> + * Start to enqueue several objects on the deque.
> + * Note that no actual objects are put in the deque by this function,
> + * it just reserves space for the user on the deque.
> + * User has to copy objects into the deque using the returned pointers.
> + * User should call rte_deque_enqueue_zc_elem_finish to complete the
> + * enqueue operation.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @param esize
> + *   The size of deque element, in bytes. It must be a multiple of 4.
> + * @param n
> + *   The number of objects to add in the deque.
> + * @param zcd
> + *   Structure containing the pointers and length of the space
> + *   reserved on the deque storage.
> + * @param free_space
> + *   Returns the amount of space in the deque after the reservation operation
> + *   has finished.
> + * @return
> + *   The number of objects that can be enqueued, either 0 or n
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_deque_tail_enqueue_zc_bulk_elem_start(struct rte_deque *d, unsigned int esize,
> +	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space)
> +{
> +	*free_space = __RTE_DEQUE_FREE_SPACE(d);
> +	if (unlikely(*free_space < n))
> +		return 0;
> +	__rte_deque_get_elem_addr(d, d->tail - 1, esize, n, &zcd->ptr1,
> +							  &zcd->n1, &zcd->ptr2, false);
> +
> +	*free_space -= n;
> +	return n;
> +}
> +
> +/**
> + * Complete enqueuing several pointers to objects on the deque.
> + * Note that number of objects to enqueue should not exceed previous
> + * enqueue_start return value.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @param n
> + *   The number of pointers to objects to add to the deque.
> + */
> +__rte_experimental
> +static __rte_always_inline void
> +rte_deque_tail_enqueue_zc_elem_finish(struct rte_deque *d, unsigned int n)
> +{
> +	d->tail = (d->tail - n) & d->mask;
> +}
> +
> +/**
> + * Start to enqueue several objects on the deque.
> + * Note that no actual objects are put in the queue by this function,
> + * it just reserves space for the user on the deque.
> + * User has to copy objects into the queue using the returned pointers.
> + * User should call rte_deque_enqueue_zc_elem_finish to complete the
> + * enqueue operation.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @param esize
> + *   The size of deque element, in bytes. It must be a multiple of 4.
> + * @param n
> + *   The number of objects to add in the deque.@param r
> + * @param zcd
> + *   Structure containing the pointers and length of the space
> + *   reserved on the deque storage.
> + * @param free_space
> + *   Returns the amount of space in the deque after the reservation operation
> + *   has finished.
> + * @return
> + *   The number of objects that can be enqueued, either 0 or n
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_deque_tail_enqueue_zc_burst_elem_start(struct rte_deque *d, unsigned int esize,
> +	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space)
> +{
> +	*free_space = __RTE_DEQUE_FREE_SPACE(d);
> +	n = n > *free_space ? *free_space : n;
> +	return rte_deque_tail_enqueue_zc_bulk_elem_start(d, esize, n, zcd, free_space);
> +}
> +
> +/**
> + * Start to dequeue several objects from the deque.
> + * Note that no actual objects are copied from the queue by this function.
> + * User has to copy objects from the queue using the returned pointers.
> + * User should call rte_deque_dequeue_zc_elem_finish to complete the
> + * dequeue operation.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @param esize
> + *   The size of deque element, in bytes. It must be a multiple of 4.
> + * @param n
> + *   The number of objects to remove from the deque.
> + * @param zcd
> + *   Structure containing the pointers and length of the space
> + *   reserved on the deque storage.
> + * @param available
> + *   Returns the number of remaining deque entries after the dequeue has
> + *   finished.
> + * @return
> + *   The number of objects that can be dequeued, either 0 or n.
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_deque_tail_dequeue_zc_bulk_elem_start(struct rte_deque *d, unsigned int esize,
> +	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available)
> +{
> +	*available = __RTE_DEQUE_COUNT(d);
> +	if (unlikely(*available < n))
> +		return 0;
> +	__rte_deque_get_elem_addr(d, d->tail, esize, n, &zcd->ptr1,
> +							&zcd->n1, &zcd->ptr2, true);
> +
> +	*available -= n;
> +	return n;
> +}
> +
> +/**
> + * Complete dequeuing several objects from the deque.
> + * Note that number of objects to dequeued should not exceed previous
> + * dequeue_start return value.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @param n
> + *   The number of objects to remove from the deque.
> + */
> +__rte_experimental
> +static __rte_always_inline void
> +rte_deque_tail_dequeue_zc_elem_finish(struct rte_deque *d, unsigned int n)
> +{
> +	d->tail = (d->tail + n) & d->mask;
> +}
> +
> +/**
> + * Start to dequeue several objects from the deque.
> + * Note that no actual objects are copied from the queue by this function.

Why do you even need to copy elements out from the queue, ever?

Wouldn't it be better to return a reference to the objects, rather than 
to copy objects around? Or at least have a zero-copy option.

"Peek" functions, either single-object or "burst". (The benefit of 
"bursts" is not going to be very great for this data structure, provided 
you remove "volatile" on head and tail.)

Say you have rte_event as the element (24 bytes, if I recall correctly). 
Then you don't want to needlessly copy those around to stack-allocated 
arrays.

Rather, one would like to do something like:

for (;;) {
	struct rte_event *events;
	unsigned int n = rte_deque_peek(dequeue, &events, 16);
	if (n == 0)
		break;
	process_events(events, n);
	rte_deque_pop(deque, n);
}

My overall impression is that you should forget about rte_ring, and both 
reduce rte_deque complexity and optimize for the non-MT-safe case.

I'm not even sure you need the "copy out" variant of the API.

> + * User has to copy objects from the queue using the returned pointers.
> + * User should call rte_deque_dequeue_zc_elem_finish to complete the
> + * dequeue operation.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @param esize
> + *   The size of deque element, in bytes. It must be a multiple of 4.
> + * @param n
> + *   The number of objects to remove from the deque.
> + * @param zcd
> + *   Structure containing the pointers and length of the space
> + *   reserved on the deque storage.
> + * @param available
> + *   Returns the number of remaining deque entries after the dequeue has
> + *   finished.
> + * @return
> + *   The number of objects that can be dequeued, either 0 or n.
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_deque_tail_dequeue_zc_burst_elem_start(struct rte_deque *d, unsigned int esize,
> +	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available)
> +{
> +	*available = __RTE_DEQUE_COUNT(d);
> +	n = n > *available ? *available : n;
> +	return rte_deque_tail_dequeue_zc_bulk_elem_start(d, esize, n, zcd, available);
> +}
> +
> +/**
> + * Start to dequeue several objects from the deque.
> + * Note that no actual objects are copied from the queue by this function.
> + * User has to copy objects from the queue using the returned pointers.
> + * User should call rte_deque_dequeue_zc_elem_finish to complete the
> + * dequeue operation.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @param esize
> + *   The size of deque element, in bytes. It must be a multiple of 4.
> + * @param n
> + *   The number of objects to remove from the deque.
> + * @param zcd
> + *   Structure containing the pointers and length of the space
> + *   reserved on the deque storage.
> + * @param available
> + *   Returns the number of remaining deque entries after the dequeue has
> + *   finished.
> + * @return
> + *   The number of objects that can be dequeued, either 0 or n.
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_deque_head_dequeue_zc_bulk_elem_start(struct rte_deque *d, unsigned int esize,
> +	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available)
> +{
> +	*available = __RTE_DEQUE_COUNT(d);
> +	if (unlikely(*available < n))
> +		return 0;
> +	__rte_deque_get_elem_addr(d, d->head - 1, esize, n, &zcd->ptr1,
> +							&zcd->n1, &zcd->ptr2, false);
> +
> +	*available -= n;
> +	return n;
> +}
> +
> +/**
> + * Complete dequeuing several objects from the deque.
> + * Note that number of objects to dequeued should not exceed previous
> + * dequeue_start return value.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @param n
> + *   The number of objects to remove from the deque.
> + */
> +__rte_experimental
> +static __rte_always_inline void
> +rte_deque_head_dequeue_zc_elem_finish(struct rte_deque *d, unsigned int n)
> +{
> +	d->head = (d->head - n) & d->mask;
> +}
> +
> +/**
> + * Start to dequeue several objects from the deque.
> + * Note that no actual objects are copied from the queue by this function.
> + * User has to copy objects from the queue using the returned pointers.
> + * User should call rte_deque_dequeue_zc_elem_finish to complete the
> + * dequeue operation.
> + *
> + * @param d
> + *   A pointer to the deque structure.
> + * @param esize
> + *   The size of deque element, in bytes. It must be a multiple of 4.
> + * @param n
> + *   The number of objects to remove from the deque.
> + * @param zcd
> + *   Structure containing the pointers and length of the space
> + *   reserved on the deque storage.
> + * @param available
> + *   Returns the number of remaining deque entries after the dequeue has
> + *   finished.
> + * @return
> + *   The number of objects that can be dequeued, either 0 or n.
> + */
> +__rte_experimental
> +static __rte_always_inline unsigned int
> +rte_deque_head_dequeue_zc_burst_elem_start(struct rte_deque *d, unsigned int esize,
> +	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available)
> +{
> +	*available = __RTE_DEQUE_COUNT(d);
> +	n = n > *available ? *available : n;
> +	return rte_deque_head_dequeue_zc_bulk_elem_start(d, esize, n, zcd, available);
> +}
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _RTE_DEQUE_ZC_H_ */
> diff --git a/lib/deque/version.map b/lib/deque/version.map
> new file mode 100644
> index 0000000000..103fd3b512
> --- /dev/null
> +++ b/lib/deque/version.map
> @@ -0,0 +1,14 @@
> +EXPERIMENTAL {
> +	global:
> +
> +	# added in 24.07
> +	rte_deque_log_type;
> +	rte_deque_create;
> +	rte_deque_dump;
> +	rte_deque_free;
> +	rte_deque_get_memsize_elem;
> +	rte_deque_init;
> +	rte_deque_reset;
> +
> +	local: *;
> +};
> diff --git a/lib/meson.build b/lib/meson.build
> index 179a272932..82929b7a11 100644
> --- a/lib/meson.build
> +++ b/lib/meson.build
> @@ -14,6 +14,7 @@ libraries = [
>           'argparse',
>           'telemetry', # basic info querying
>           'eal', # everything depends on eal
> +        'deque',
>           'ring',
>           'rcu', # rcu depends on ring
>           'mempool',
> @@ -74,6 +75,7 @@ if is_ms_compiler
>               'kvargs',
>               'telemetry',
>               'eal',
> +            'dpdk',
>               'ring',
>       ]
>   endif
  
Ali Alnubani April 25, 2024, 7:43 a.m. UTC | #4
> From: Patrick Robb <probb@iol.unh.edu>
> Sent: Wednesday, April 24, 2024 8:21 PM
> To: Ali Alnubani <alialnu@nvidia.com>
> Cc: Aditya Ambadipudi <aditya.ambadipudi@arm.com>; dev@dpdk.org; Jack
> Min <jackmin@nvidia.com>; stephen@networkplumber.org; Matan Azrad
> <matan@nvidia.com>; Slava Ovsiienko <viacheslavo@nvidia.com>;
> roretzla@linux.microsoft.com; konstantin.ananyev@huawei.com;
> hofors@lysator.liu.se; wathsala.vithanage@arm.com;
> dhruv.tripathi@arm.com; honnappa.nagarahalli@arm.com; nd@arm.com;
> Morten Brørup <mb@smartsharesystems.com>
> Subject: Re: [PATCH v2 1/2] deque: add multi-thread unsafe double ended
> queue
> 
> Hi Ali,
> 
> Wathsala reached out asking how the checkpatch CI check can be updated so
> that this series passes checkpatch.
> 
> If building the dictionary is a 1 time operation for you, you may have to apply
> this patch and re-run devtools/build-dict.sh so that the new dictionary is in
> place for a V3 of this series.
> 
> It looks like these dictionary exceptions are submitted quite rarely. But, if it
> becomes more common in the future you could look at adding a step to your
> automation which produces a new dictionary every time you run checkpatch,
> based on any additions to the exception list which came with the patch. But
> it's probably not worth the effort with the low volume of word exception
> additions.
> 

Hello,

Applied the change to the dictionary and reran the check.
Still failing because of the GERRIT CHANGE_ID though:
https://mails.dpdk.org/archives/test-report/2024-April/650688.html

Regards,
Ali
  
Aditya Ambadipudi May 2, 2024, 8:29 p.m. UTC | #5
Hello Ali & Patrick.

Please use v3 of this patch to see if it fixes the "deque" spell check issue that you folks were helping me & Wathsala with. I have removed Gerrit change-id from this patch.

Thank you,
Aditya Ambadipudi
  
Thomas Monjalon June 27, 2024, 3:03 p.m. UTC | #6
02/05/2024 22:19, Aditya Ambadipudi:
> As previously discussed in the mailing list [1] we are sending out this
> patch that provides the implementation and unit test cases for the
> RTE_DEQUE library. This includes functions for creating a RTE_DEQUE 
> object. Allocating memory to it. Deleting that object and free'ing the
> memory associated with it. Enqueue/Dequeue functions. Functions for 
> zero-copy API.
> 
> Aditya Ambadipudi (1):
>   deque: add unit tests for the deque library
> 
> Honnappa Nagarahalli (1):
>   deque: add multi-thread unsafe double ended queue

There were many comments on previous versions,
and no ack on the v3, so I'm not sure all comments are addressed.
We probably need a new round of reviews on this new library.

Also, in order to show its benefits, would it be a good idea
to replace some existing code with calls to this lib,
inside this patch series?
  
Wathsala Wathawana Vithanage June 28, 2024, 8:05 p.m. UTC | #7
Hi Thomas,

Aditya the original author of this patch is no longer at Arm.
One of my colleagues will take over this patch, hence we will need some time to address these comments.

Thank you

> There were many comments on previous versions, and no ack on the v3, so I'm
> not sure all comments are addressed.
> We probably need a new round of reviews on this new library.
> 
> Also, in order to show its benefits, would it be a good idea to replace some
> existing code with calls to this lib, inside this patch series?
>
  

Patch

diff --git a/.mailmap b/.mailmap
index 3843868716..8e705ab6ab 100644
--- a/.mailmap
+++ b/.mailmap
@@ -17,6 +17,7 @@  Adam Bynes <adambynes@outlook.com>
 Adam Dybkowski <adamx.dybkowski@intel.com>
 Adam Ludkiewicz <adam.ludkiewicz@intel.com>
 Adham Masarwah <adham@nvidia.com> <adham@mellanox.com>
+Aditya Ambadipudi <aditya.ambadipudi@arm.com>
 Adrian Moreno <amorenoz@redhat.com>
 Adrian Podlawski <adrian.podlawski@intel.com>
 Adrien Mazarguil <adrien.mazarguil@6wind.com>
diff --git a/devtools/build-dict.sh b/devtools/build-dict.sh
index a8cac49029..595d8f9277 100755
--- a/devtools/build-dict.sh
+++ b/devtools/build-dict.sh
@@ -17,6 +17,7 @@  sed '/^..->/d' |
 sed '/^uint->/d' |
 sed "/^doesn'->/d" |
 sed '/^wasn->/d' |
+sed '/^deque.*->/d' |
 
 # print to stdout
 cat
diff --git a/lib/deque/meson.build b/lib/deque/meson.build
new file mode 100644
index 0000000000..1ff45fc39f
--- /dev/null
+++ b/lib/deque/meson.build
@@ -0,0 +1,11 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2024 Arm Limited
+
+sources = files('rte_deque.c')
+headers = files('rte_deque.h')
+# most sub-headers are not for direct inclusion
+indirect_headers += files (
+        'rte_deque_core.h',
+        'rte_deque_pvt.h',
+        'rte_deque_zc.h'
+)
diff --git a/lib/deque/rte_deque.c b/lib/deque/rte_deque.c
new file mode 100644
index 0000000000..b83a6c43c4
--- /dev/null
+++ b/lib/deque/rte_deque.c
@@ -0,0 +1,193 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Arm Limited
+ */
+
+#include <stdalign.h>
+#include <string.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <errno.h>
+#include <sys/queue.h>
+
+#include <rte_common.h>
+#include <rte_log.h>
+#include <rte_memzone.h>
+#include <rte_malloc.h>
+#include <rte_eal_memconfig.h>
+#include <rte_errno.h>
+#include <rte_string_fns.h>
+
+#include "rte_deque.h"
+
+/* mask of all valid flag values to deque_create() */
+#define __RTE_DEQUE_F_MASK (RTE_DEQUE_F_EXACT_SZ)
+ssize_t
+rte_deque_get_memsize_elem(unsigned int esize, unsigned int count)
+{
+	ssize_t sz;
+
+	/* Check if element size is a multiple of 4B */
+	if (esize % 4 != 0) {
+		rte_log(RTE_LOG_ERR, rte_deque_log_type,
+			"%s(): element size is not a multiple of 4\n",
+			__func__);
+
+		return -EINVAL;
+	}
+
+	/* count must be a power of 2 */
+	if ((!RTE_IS_POWER_OF_2(count)) || (count > RTE_DEQUE_SZ_MASK)) {
+		rte_log(RTE_LOG_ERR, rte_deque_log_type,
+			"%s(): Requested number of elements is invalid,"
+			"must be power of 2, and not exceed %u\n",
+			__func__, RTE_DEQUE_SZ_MASK);
+
+		return -EINVAL;
+	}
+
+	sz = sizeof(struct rte_deque) + (ssize_t)count * esize;
+	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+	return sz;
+}
+
+void
+rte_deque_reset(struct rte_deque *d)
+{
+	d->head = 0;
+	d->tail = 0;
+}
+
+int
+rte_deque_init(struct rte_deque *d, const char *name, unsigned int count,
+	unsigned int flags)
+{
+	int ret;
+
+	/* compilation-time checks */
+	RTE_BUILD_BUG_ON((sizeof(struct rte_deque) &
+			  RTE_CACHE_LINE_MASK) != 0);
+
+	/* future proof flags, only allow supported values */
+	if (flags & ~__RTE_DEQUE_F_MASK) {
+		rte_log(RTE_LOG_ERR, rte_deque_log_type,
+			"%s(): Unsupported flags requested %#x\n",
+			__func__, flags);
+		return -EINVAL;
+	}
+
+	/* init the deque structure */
+	memset(d, 0, sizeof(*d));
+	ret = strlcpy(d->name, name, sizeof(d->name));
+	if (ret < 0 || ret >= (int)sizeof(d->name))
+		return -ENAMETOOLONG;
+	d->flags = flags;
+
+	if (flags & RTE_DEQUE_F_EXACT_SZ) {
+		d->size = rte_align32pow2(count + 1);
+		d->mask = d->size - 1;
+		d->capacity = count;
+	} else {
+		if ((!RTE_IS_POWER_OF_2(count)) || (count > RTE_DEQUE_SZ_MASK)) {
+			rte_log(RTE_LOG_ERR, rte_deque_log_type,
+				"%s(): Requested size is invalid, must be power"
+				" of 2, and not exceed the size limit %u\n",
+				__func__, RTE_DEQUE_SZ_MASK);
+			return -EINVAL;
+		}
+		d->size = count;
+		d->mask = count - 1;
+		d->capacity = d->mask;
+	}
+
+	return 0;
+}
+
+/* create the deque for a given element size */
+struct rte_deque *
+rte_deque_create(const char *name, unsigned int esize, unsigned int count,
+		int socket_id, unsigned int flags)
+{
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+	struct rte_deque *d;
+	const struct rte_memzone *mz;
+	ssize_t deque_size;
+	int mz_flags = 0;
+	const unsigned int requested_count = count;
+	int ret;
+
+	/* for an exact size deque, round up from count to a power of two */
+	if (flags & RTE_DEQUE_F_EXACT_SZ)
+		count = rte_align32pow2(count + 1);
+
+	deque_size = rte_deque_get_memsize_elem(esize, count);
+	if (deque_size < 0) {
+		rte_errno = -deque_size;
+		return NULL;
+	}
+
+	ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
+		RTE_DEQUE_MZ_PREFIX, name);
+	if (ret < 0 || ret >= (int)sizeof(mz_name)) {
+		rte_errno = ENAMETOOLONG;
+		return NULL;
+	}
+
+	/* reserve a memory zone for this deque. If we can't get rte_config or
+	 * we are secondary process, the memzone_reserve function will set
+	 * rte_errno for us appropriately - hence no check in this function
+	 */
+	mz = rte_memzone_reserve_aligned(mz_name, deque_size, socket_id,
+					 mz_flags, alignof(struct rte_deque));
+	if (mz != NULL) {
+		d = mz->addr;
+		/* no need to check return value here, we already checked the
+		 * arguments above
+		 */
+		rte_deque_init(d, name, requested_count, flags);
+		d->memzone = mz;
+	} else {
+		d = NULL;
+		rte_log(RTE_LOG_ERR, rte_deque_log_type,
+			"%s(): Cannot reserve memory\n", __func__);
+	}
+	return d;
+}
+
+/* free the deque */
+void
+rte_deque_free(struct rte_deque *d)
+{
+	if (d == NULL)
+		return;
+
+	/*
+	 * Deque was not created with rte_deque_create,
+	 * therefore, there is no memzone to free.
+	 */
+	if (d->memzone == NULL) {
+		rte_log(RTE_LOG_ERR, rte_deque_log_type,
+			"%s(): Cannot free deque, not created "
+			"with rte_deque_create()\n", __func__);
+		return;
+	}
+
+	if (rte_memzone_free(d->memzone) != 0)
+		rte_log(RTE_LOG_ERR, rte_deque_log_type,
+			"%s(): Cannot free memory\n", __func__);
+}
+
+/* dump the status of the deque on the console */
+void
+rte_deque_dump(FILE *f, const struct rte_deque *d)
+{
+	fprintf(f, "deque <%s>@%p\n", d->name, d);
+	fprintf(f, "  flags=%x\n", d->flags);
+	fprintf(f, "  size=%"PRIu32"\n", d->size);
+	fprintf(f, "  capacity=%"PRIu32"\n", d->capacity);
+	fprintf(f, "  head=%"PRIu32"\n", d->head);
+	fprintf(f, "  tail=%"PRIu32"\n", d->tail);
+	fprintf(f, "  used=%u\n", rte_deque_count(d));
+	fprintf(f, "  avail=%u\n", rte_deque_free_count(d));
+}
+
+RTE_LOG_REGISTER_DEFAULT(rte_deque_log_type, ERR);
diff --git a/lib/deque/rte_deque.h b/lib/deque/rte_deque.h
new file mode 100644
index 0000000000..6633eab377
--- /dev/null
+++ b/lib/deque/rte_deque.h
@@ -0,0 +1,533 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Arm Limited
+ */
+
+#ifndef _RTE_DEQUE_H_
+#define _RTE_DEQUE_H_
+
+/**
+ * @file
+ * RTE double ended queue (Deque)
+ *
+ * This fixed-size queue does not provide concurrent access by
+ * multiple threads. If required, the application should use locks
+ * to protect the deque from concurrent access.
+ *
+ * - Double ended queue
+ * - Maximum size is fixed
+ * - Store objects of any size
+ * - Single/bulk/burst dequeue at tail or head
+ * - Single/bulk/burst enqueue at head or tail
+ *
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <rte_deque_core.h>
+#include <rte_deque_pvt.h>
+#include <rte_deque_zc.h>
+
+/**
+ * Calculate the memory size needed for a deque
+ *
+ * This function returns the number of bytes needed for a deque, given
+ * the number of objects and the object size. This value is the sum of
+ * the size of the structure rte_deque and the size of the memory needed
+ * by the objects. The value is aligned to a cache line size.
+ *
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ * @param count
+ *   The number of objects in the deque (must be a power of 2).
+ * @return
+ *   - The memory size needed for the deque on success.
+ *   - -EINVAL if count is not a power of 2.
+ */
+__rte_experimental
+ssize_t rte_deque_get_memsize_elem(unsigned int esize, unsigned int count);
+
+/**
+ * Initialize a deque structure.
+ *
+ * Initialize a deque structure in memory pointed by "d". The size of the
+ * memory area must be large enough to store the deque structure and the
+ * object table. It is advised to use rte_deque_get_memsize() to get the
+ * appropriate size.
+ *
+ * The deque size is set to *count*, which must be a power of two.
+ * The real usable deque size is *count-1* instead of *count* to
+ * differentiate a full deque from an empty deque.
+ *
+ * @param d
+ *   The pointer to the deque structure followed by the objects table.
+ * @param name
+ *   The name of the deque.
+ * @param count
+ *   The number of objects in the deque (must be a power of 2,
+ *   unless RTE_DEQUE_F_EXACT_SZ is set in flags).
+ * @param flags
+ *   - RTE_DEQUE_F_EXACT_SZ: If this flag is set, the deque will hold
+ *     exactly the requested number of objects, and the requested size
+ *     will be rounded up to the next power of two, but the usable space
+ *     will be exactly that requested. Worst case, if a power-of-2 size is
+ *     requested, half the deque space will be wasted.
+ *     Without this flag set, the deque size requested must be a power of 2,
+ *     and the usable space will be that size - 1.
+ * @return
+ *   0 on success, or a negative value on error.
+ */
+__rte_experimental
+int rte_deque_init(struct rte_deque *d, const char *name, unsigned int count,
+		unsigned int flags);
+
+/**
+ * Create a new deque named *name* in memory.
+ *
+ * This function uses ``memzone_reserve()`` to allocate memory. Then it
+ * calls rte_deque_init() to initialize an empty deque.
+ *
+ * The new deque size is set to *count*, which must be a power of two.
+ * The real usable deque size is *count-1* instead of *count* to
+ * differentiate a full deque from an empty deque.
+ *
+ * @param name
+ *   The name of the deque.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ * @param count
+ *   The size of the deque (must be a power of 2,
+ *   unless RTE_DEQUE_F_EXACT_SZ is set in flags).
+ * @param socket_id
+ *   The *socket_id* argument is the socket identifier in case of
+ *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
+ *   constraint for the reserved zone.
+ * @param flags
+ *   - RTE_DEQUE_F_EXACT_SZ: If this flag is set, the deque will hold exactly the
+ *     requested number of entries, and the requested size will be rounded up
+ *     to the next power of two, but the usable space will be exactly that
+ *     requested. Worst case, if a power-of-2 size is requested, half the
+ *     deque space will be wasted.
+ *     Without this flag set, the deque size requested must be a power of 2,
+ *     and the usable space will be that size - 1.
+ * @return
+ *   On success, the pointer to the new allocated deque. NULL on error with
+ *    rte_errno set appropriately. Possible errno values include:
+ *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
+ *    - EINVAL - count provided is not a power of 2
+ *    - ENOSPC - the maximum number of memzones has already been allocated
+ *    - EEXIST - a memzone with the same name already exists
+ *    - ENOMEM - no appropriate memory area found in which to create memzone
+ */
+__rte_experimental
+struct rte_deque *rte_deque_create(const char *name, unsigned int esize,
+				unsigned int count, int socket_id,
+				unsigned int flags);
+
+/**
+ * De-allocate all memory used by the deque.
+ *
+ * @param d
+ *   Deque to free.
+ *   If NULL then, the function does nothing.
+ */
+__rte_experimental
+void rte_deque_free(struct rte_deque *d);
+
+/**
+ * Dump the status of the deque to a file.
+ *
+ * @param f
+ *   A pointer to a file for output
+ * @param d
+ *   A pointer to the deque structure.
+ */
+__rte_experimental
+void rte_deque_dump(FILE *f, const struct rte_deque *d);
+
+/**
+ * Return the number of entries in a deque.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @return
+ *   The number of entries in the deque.
+ */
+static inline unsigned int
+rte_deque_count(const struct rte_deque *d)
+{
+	return (d->head - d->tail) & d->mask;
+}
+
+/**
+ * Return the number of free entries in a deque.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @return
+ *   The number of free entries in the deque.
+ */
+static inline unsigned int
+rte_deque_free_count(const struct rte_deque *d)
+{
+	return d->capacity - rte_deque_count(d);
+}
+
+/**
+ * Enqueue fixed number of objects on a deque at the head.
+ *
+ * This function copies the objects at the head of the deque and
+ * moves the head index.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the deque from the obj_table.
+ * @param free_space
+ *   Returns the amount of space in the deque after the enqueue operation
+ *   has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_enqueue_bulk_elem(struct rte_deque *d,
+			const void *obj_table,
+			unsigned int esize,
+			unsigned int n,
+			unsigned int *free_space)
+{
+	*free_space = rte_deque_free_count(d);
+	if (unlikely(n > *free_space))
+		return 0;
+	*free_space -= n;
+	return __rte_deque_enqueue_at_head(d, obj_table, esize, n);
+}
+
+/**
+ * Enqueue up to a maximum number of objects on a deque at the head.
+ *
+ * This function copies the objects at the head of the deque and
+ * moves the head index.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the deque from the obj_table.
+ * @param free_space
+ *   Returns the amount of space in the deque after the enqueue operation
+ *   has finished.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_enqueue_burst_elem(struct rte_deque *d, const void *obj_table,
+			unsigned int esize, unsigned int n,
+			unsigned int *free_space)
+{
+	unsigned int avail_space = rte_deque_free_count(d);
+	unsigned int to_be_enqueued = (n <= avail_space ? n : avail_space);
+	*free_space = avail_space - n;
+	return __rte_deque_enqueue_at_head(d, obj_table, esize, to_be_enqueued);
+}
+
+/**
+ * Enqueue fixed number of objects on a deque at the tail.
+ *
+ * This function copies the objects at the tail of the deque and
+ * moves the tail index (backwards).
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the deque from the obj_table.
+ * @param free_space
+ *   Returns the amount of space in the deque after the enqueue operation
+ *   has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_enqueue_bulk_elem(struct rte_deque *d,
+				 const void *obj_table, unsigned int esize,
+				 unsigned int n, unsigned int *free_space)
+{
+	*free_space = rte_deque_free_count(d);
+	if (unlikely(n > *free_space))
+		return 0;
+	*free_space -= n;
+	return __rte_deque_enqueue_at_tail(d, obj_table, esize, n);
+}
+
+/**
+ * Enqueue up to a maximum number of objects on a deque at the tail.
+ *
+ * This function copies the objects at the tail of the deque and
+ * moves the tail index (backwards).
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the deque from the obj_table.
+ * @param free_space
+ *   Returns the amount of space in the deque after the enqueue operation
+ *   has finished.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_enqueue_burst_elem(struct rte_deque *d,
+				const void *obj_table, unsigned int esize,
+				unsigned int n, unsigned int *free_space)
+{
+	unsigned int avail_space = rte_deque_free_count(d);
+	unsigned int to_be_enqueued = (n <= avail_space ? n : avail_space);
+	*free_space = avail_space - to_be_enqueued;
+	return __rte_deque_enqueue_at_tail(d, obj_table, esize, to_be_enqueued);
+}
+
+/**
+ * Dequeue a fixed number of objects from a deque at tail.
+ *
+ * This function copies the objects from the tail of the deque and
+ * moves the tail index.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects that will be filled.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the deque to the obj_table.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue
+ *   has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_dequeue_bulk_elem(struct rte_deque *d, void *obj_table,
+			unsigned int esize, unsigned int n,
+			unsigned int *available)
+{
+	*available = rte_deque_count(d);
+	if (unlikely(n > *available))
+		return 0;
+	*available -= n;
+	return __rte_deque_dequeue_at_tail(d, obj_table, esize, n);
+}
+
+/**
+ * Dequeue up to a maximum number of objects from a deque at tail.
+ *
+ * This function copies the objects from the tail of the deque and
+ * moves the tail index.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects that will be filled.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the deque to the obj_table.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue
+ *   has finished.
+ * @return
+ *   - Number of objects dequeued
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_dequeue_burst_elem(struct rte_deque *d, void *obj_table,
+			unsigned int esize, unsigned int n,
+			unsigned int *available)
+{
+	unsigned int count = rte_deque_count(d);
+	unsigned int to_be_dequeued = (n <= count ? n : count);
+	*available = count - to_be_dequeued;
+	return __rte_deque_dequeue_at_tail(d, obj_table, esize, to_be_dequeued);
+}
+
+/**
+ * Dequeue a fixed number of objects from a deque from the head.
+ *
+ * This function copies the objects from the head of the deque and
+ * moves the head index (backwards).
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects that will be filled.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the deque to the obj_table.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue
+ *   has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_dequeue_bulk_elem(struct rte_deque *d, void *obj_table,
+			unsigned int esize, unsigned int n,
+			unsigned int *available)
+{
+	*available = rte_deque_count(d);
+	if (unlikely(n > *available))
+		return 0;
+	*available -= n;
+	return __rte_deque_dequeue_at_head(d, obj_table, esize, n);
+}
+
+/**
+ * Dequeue up to a maximum number of objects from a deque from the head.
+ *
+ * This function copies the objects from the head of the deque and
+ * moves the head index (backwards).
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects that will be filled.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the deque to the obj_table.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue
+ *   has finished.
+ * @return
+ *   - Number of objects dequeued
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_dequeue_burst_elem(struct rte_deque *d, void *obj_table,
+			unsigned int esize, unsigned int n,
+			unsigned int *available)
+{
+	unsigned int count = rte_deque_count(d);
+	unsigned int to_be_dequeued = (n <= count ? n : count);
+	*available = count - to_be_dequeued;
+	return __rte_deque_dequeue_at_head(d, obj_table, esize, to_be_dequeued);
+}
+
+/**
+ * Flush a deque.
+ *
+ * This function flush all the objects in a deque
+ *
+ * @warning
+ * Make sure the deque is not in use while calling this function.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ */
+__rte_experimental
+void rte_deque_reset(struct rte_deque *d);
+
+/**
+ * Test if a deque is full.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @return
+ *   - 1: The deque is full.
+ *   - 0: The deque is not full.
+ */
+static inline int
+rte_deque_full(const struct rte_deque *d)
+{
+	return rte_deque_free_count(d) == 0;
+}
+
+/**
+ * Test if a deque is empty.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @return
+ *   - 1: The deque is empty.
+ *   - 0: The deque is not empty.
+ */
+static inline int
+rte_deque_empty(const struct rte_deque *d)
+{
+	return d->tail == d->head;
+}
+
+/**
+ * Return the size of the deque.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @return
+ *   The size of the data store used by the deque.
+ *   NOTE: this is not the same as the usable space in the deque. To query that
+ *   use ``rte_deque_get_capacity()``.
+ */
+static inline unsigned int
+rte_deque_get_size(const struct rte_deque *d)
+{
+	return d->size;
+}
+
+/**
+ * Return the number of objects which can be stored in the deque.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @return
+ *   The usable size of the deque.
+ */
+static inline unsigned int
+rte_deque_get_capacity(const struct rte_deque *d)
+{
+	return d->capacity;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_DEQUE_H_ */
diff --git a/lib/deque/rte_deque_core.h b/lib/deque/rte_deque_core.h
new file mode 100644
index 0000000000..0bb8695c8a
--- /dev/null
+++ b/lib/deque/rte_deque_core.h
@@ -0,0 +1,81 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Arm Limited
+ */
+
+#ifndef _RTE_DEQUE_CORE_H_
+#define _RTE_DEQUE_CORE_H_
+
+/**
+ * @file
+ * This file contains definition of RTE deque structure, init flags and
+ * some related macros. This file should not be included directly,
+ * include rte_deque.h instead.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdint.h>
+#include <string.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_memzone.h>
+#include <rte_pause.h>
+#include <rte_debug.h>
+
+extern int rte_deque_log_type;
+
+#define RTE_DEQUE_MZ_PREFIX "DEQUE_"
+/** The maximum length of a deque name. */
+#define RTE_DEQUE_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
+			   sizeof(RTE_DEQUE_MZ_PREFIX) + 1)
+
+/**
+ * Double ended queue (deque) structure.
+ *
+ * The producer and the consumer have a head and a tail index. These indices
+ * are not between 0 and size(deque)-1. These indices are between 0 and
+ * 2^32 -1. Their value is masked while accessing the objects in deque.
+ * These indices are unsigned 32bits. Hence the result of the subtraction is
+ * always a modulo of 2^32 and it is between 0 and capacity.
+ */
+struct rte_deque {
+	alignas(RTE_CACHE_LINE_SIZE) char name[RTE_DEQUE_NAMESIZE];
+	/**< Name of the deque */
+	int flags;
+	/**< Flags supplied at creation. */
+	const struct rte_memzone *memzone;
+	/**< Memzone, if any, containing the rte_deque */
+
+	alignas(RTE_CACHE_LINE_SIZE) char pad0; /**< empty cache line */
+
+	uint32_t size;           /**< Size of deque. */
+	uint32_t mask;           /**< Mask (size-1) of deque. */
+	uint32_t capacity;       /**< Usable size of deque */
+	/** Ring head and tail pointers. */
+	volatile uint32_t head;
+	volatile uint32_t tail;
+};
+
+/**
+ * Deque is to hold exactly requested number of entries.
+ * Without this flag set, the deque size requested must be a power of 2, and the
+ * usable space will be that size - 1. With the flag, the requested size will
+ * be rounded up to the next power of two, but the usable space will be exactly
+ * that requested. Worst case, if a power-of-2 size is requested, half the
+ * deque space will be wasted.
+ */
+#define RTE_DEQUE_F_EXACT_SZ 0x0004
+#define RTE_DEQUE_SZ_MASK  (0x7fffffffU) /**< Ring size mask */
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_DEQUE_CORE_H_ */
diff --git a/lib/deque/rte_deque_pvt.h b/lib/deque/rte_deque_pvt.h
new file mode 100644
index 0000000000..931bbd4d19
--- /dev/null
+++ b/lib/deque/rte_deque_pvt.h
@@ -0,0 +1,538 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Arm Limited
+ */
+
+#ifndef _RTE_DEQUE_PVT_H_
+#define _RTE_DEQUE_PVT_H_
+
+#define __RTE_DEQUE_COUNT(d) ((d->head - d->tail) & d->mask)
+#define __RTE_DEQUE_FREE_SPACE(d) (d->capacity - __RTE_DEQUE_COUNT(d))
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_head_32(struct rte_deque *d,
+				const unsigned int size,
+				uint32_t idx,
+				const void *obj_table,
+				unsigned int n)
+{
+	unsigned int i;
+	uint32_t *deque = (uint32_t *)&d[1];
+	const uint32_t *obj = (const uint32_t *)obj_table;
+	if (likely(idx + n <= size)) {
+		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
+			deque[idx] = obj[i];
+			deque[idx + 1] = obj[i + 1];
+			deque[idx + 2] = obj[i + 2];
+			deque[idx + 3] = obj[i + 3];
+			deque[idx + 4] = obj[i + 4];
+			deque[idx + 5] = obj[i + 5];
+			deque[idx + 6] = obj[i + 6];
+			deque[idx + 7] = obj[i + 7];
+		}
+		switch (n & 0x7) {
+		case 7:
+			deque[idx++] = obj[i++]; /* fallthrough */
+		case 6:
+			deque[idx++] = obj[i++]; /* fallthrough */
+		case 5:
+			deque[idx++] = obj[i++]; /* fallthrough */
+		case 4:
+			deque[idx++] = obj[i++]; /* fallthrough */
+		case 3:
+			deque[idx++] = obj[i++]; /* fallthrough */
+		case 2:
+			deque[idx++] = obj[i++]; /* fallthrough */
+		case 1:
+			deque[idx++] = obj[i++]; /* fallthrough */
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			deque[idx] = obj[i];
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			deque[idx] = obj[i];
+	}
+}
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_head_64(struct rte_deque *d,
+				const void *obj_table,
+				unsigned int n)
+{
+	unsigned int i;
+	const uint32_t size = d->size;
+	uint32_t idx = (d->head & d->mask);
+	uint64_t *deque = (uint64_t *)&d[1];
+	const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
+	if (likely(idx + n <= size)) {
+		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
+			deque[idx] = obj[i];
+			deque[idx + 1] = obj[i + 1];
+			deque[idx + 2] = obj[i + 2];
+			deque[idx + 3] = obj[i + 3];
+		}
+		switch (n & 0x3) {
+		case 3:
+			deque[idx++] = obj[i++]; /* fallthrough */
+		case 2:
+			deque[idx++] = obj[i++]; /* fallthrough */
+		case 1:
+			deque[idx++] = obj[i++]; /* fallthrough */
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			deque[idx] = obj[i];
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			deque[idx] = obj[i];
+	}
+}
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_head_128(struct rte_deque *d,
+				const void *obj_table,
+				unsigned int n)
+{
+	unsigned int i;
+	const uint32_t size = d->size;
+	uint32_t idx = (d->head & d->mask);
+	rte_int128_t *deque = (rte_int128_t *)&d[1];
+	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
+	if (likely(idx + n <= size)) {
+		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
+			memcpy((void *)(deque + idx),
+				(const void *)(obj + i), 32);
+		switch (n & 0x1) {
+		case 1:
+			memcpy((void *)(deque + idx),
+				(const void *)(obj + i), 16);
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			memcpy((void *)(deque + idx),
+				(const void *)(obj + i), 16);
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			memcpy((void *)(deque + idx),
+				(const void *)(obj + i), 16);
+	}
+}
+
+static __rte_always_inline unsigned int
+__rte_deque_enqueue_at_head(struct rte_deque *d,
+			const void *obj_table,
+			unsigned int esize,
+			unsigned int n)
+{
+	/* 8B and 16B copies implemented individually because on some platforms
+	 * there are 64 bit and 128 bit registers available for direct copying.
+	 */
+	if (esize == 8)
+		__rte_deque_enqueue_elems_head_64(d, obj_table, n);
+	else if (esize == 16)
+		__rte_deque_enqueue_elems_head_128(d, obj_table, n);
+	else {
+		uint32_t idx, scale, nd_idx, nd_num, nd_size;
+
+		/* Normalize to uint32_t */
+		scale = esize / sizeof(uint32_t);
+		nd_num = n * scale;
+		idx = d->head & d->mask;
+		nd_idx = idx * scale;
+		nd_size = d->size * scale;
+		__rte_deque_enqueue_elems_head_32(d, nd_size, nd_idx,
+						obj_table, nd_num);
+	}
+	d->head = (d->head + n) & d->mask;
+	return n;
+}
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_tail_32(struct rte_deque *d,
+				const unsigned int mask,
+				uint32_t idx,
+				const void *obj_table,
+				unsigned int n,
+				const unsigned int scale,
+				const unsigned int elem_size)
+{
+	unsigned int i;
+	uint32_t *deque = (uint32_t *)&d[1];
+	const uint32_t *obj = (const uint32_t *)obj_table;
+
+	if (likely(idx >= n)) {
+		for (i = 0; i < n; idx -= scale, i += scale)
+			memcpy(&deque[idx], &obj[i], elem_size);
+	} else {
+		for (i = 0; (int32_t)idx >= 0; idx -= scale, i += scale)
+			memcpy(&deque[idx], &obj[i], elem_size);
+
+		/* Start at the ending */
+		idx = mask;
+		for (; i < n; idx -= scale, i += scale)
+			memcpy(&deque[idx], &obj[i], elem_size);
+	}
+}
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_tail_64(struct rte_deque *d,
+				const void *obj_table,
+				unsigned int n)
+{
+	unsigned int i;
+	uint32_t idx = (d->tail & d->mask);
+	uint64_t *deque = (uint64_t *)&d[1];
+	const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
+	if (likely((int32_t)(idx - n) >= 0)) {
+		for (i = 0; i < (n & ~0x3); i += 4, idx -= 4) {
+			deque[idx] = obj[i];
+			deque[idx - 1] = obj[i + 1];
+			deque[idx - 2] = obj[i + 2];
+			deque[idx - 3] = obj[i + 3];
+		}
+		switch (n & 0x3) {
+		case 3:
+			deque[idx--] = obj[i++]; /* fallthrough */
+		case 2:
+			deque[idx--] = obj[i++]; /* fallthrough */
+		case 1:
+			deque[idx--] = obj[i++]; /* fallthrough */
+		}
+	} else {
+		for (i = 0; (int32_t)idx >= 0; i++, idx--)
+			deque[idx] = obj[i];
+		/* Start at the ending */
+		for (idx = d->mask; i < n; i++, idx--)
+			deque[idx] = obj[i];
+	}
+}
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_tail_128(struct rte_deque *d,
+				const void *obj_table,
+				unsigned int n)
+{
+	unsigned int i;
+	uint32_t idx = (d->tail & d->mask);
+	rte_int128_t *deque = (rte_int128_t *)&d[1];
+	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
+	if (likely((int32_t)(idx - n) >= 0)) {
+		for (i = 0; i < (n & ~0x1); i += 2, idx -= 2) {
+			deque[idx] = obj[i];
+			deque[idx - 1] = obj[i + 1];
+		}
+		switch (n & 0x1) {
+		case 1:
+			memcpy((void *)(deque + idx),
+				(const void *)(obj + i), 16);
+		}
+	} else {
+		for (i = 0; (int32_t)idx >= 0; i++, idx--)
+			memcpy((void *)(deque + idx),
+				(const void *)(obj + i), 16);
+		/* Start at the ending */
+		for (idx = d->mask; i < n; i++, idx--)
+			memcpy((void *)(deque + idx),
+				(const void *)(obj + i), 16);
+	}
+}
+
+static __rte_always_inline unsigned int
+__rte_deque_enqueue_at_tail(struct rte_deque *d,
+			const void *obj_table,
+			unsigned int esize,
+			unsigned int n)
+{
+	/* The tail point must point at an empty cell when enqueuing */
+	d->tail--;
+
+	/* 8B and 16B copies implemented individually because on some platforms
+	 * there are 64 bit and 128 bit registers available for direct copying.
+	 */
+	if (esize == 8)
+		__rte_deque_enqueue_elems_tail_64(d, obj_table, n);
+	else if (esize == 16)
+		__rte_deque_enqueue_elems_tail_128(d, obj_table, n);
+	else {
+		uint32_t idx, scale, nd_idx, nd_num, nd_mask;
+
+		/* Normalize to uint32_t */
+		scale = esize / sizeof(uint32_t);
+		nd_num = n * scale;
+		idx = d->tail & d->mask;
+		nd_idx = idx * scale;
+		nd_mask = d->mask * scale;
+		__rte_deque_enqueue_elems_tail_32(d, nd_mask, nd_idx, obj_table,
+						nd_num, scale, esize);
+	}
+
+	/* The +1 is because the tail needs to point at a
+	 * non-empty memory location after the enqueuing operation.
+	 */
+	d->tail = (d->tail - n + 1) & d->mask;
+	return n;
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_32(struct rte_deque *d,
+			const unsigned int size,
+			uint32_t idx,
+			void *obj_table,
+			unsigned int n)
+{
+	unsigned int i;
+	const uint32_t *deque = (const uint32_t *)&d[1];
+	uint32_t *obj = (uint32_t *)obj_table;
+	if (likely(idx + n <= size)) {
+		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
+			obj[i] = deque[idx];
+			obj[i + 1] = deque[idx + 1];
+			obj[i + 2] = deque[idx + 2];
+			obj[i + 3] = deque[idx + 3];
+			obj[i + 4] = deque[idx + 4];
+			obj[i + 5] = deque[idx + 5];
+			obj[i + 6] = deque[idx + 6];
+			obj[i + 7] = deque[idx + 7];
+		}
+		switch (n & 0x7) {
+		case 7:
+			obj[i++] = deque[idx++]; /* fallthrough */
+		case 6:
+			obj[i++] = deque[idx++]; /* fallthrough */
+		case 5:
+			obj[i++] = deque[idx++]; /* fallthrough */
+		case 4:
+			obj[i++] = deque[idx++]; /* fallthrough */
+		case 3:
+			obj[i++] = deque[idx++]; /* fallthrough */
+		case 2:
+			obj[i++] = deque[idx++]; /* fallthrough */
+		case 1:
+			obj[i++] = deque[idx++]; /* fallthrough */
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			obj[i] = deque[idx];
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			obj[i] = deque[idx];
+	}
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_64(struct rte_deque *d, void *obj_table,
+			unsigned int n)
+{
+	unsigned int i;
+	const uint32_t size = d->size;
+	uint32_t idx = (d->tail & d->mask);
+	const uint64_t *deque = (const uint64_t *)&d[1];
+	unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
+	if (likely(idx + n <= size)) {
+		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
+			obj[i] = deque[idx];
+			obj[i + 1] = deque[idx + 1];
+			obj[i + 2] = deque[idx + 2];
+			obj[i + 3] = deque[idx + 3];
+		}
+		switch (n & 0x3) {
+		case 3:
+			obj[i++] = deque[idx++]; /* fallthrough */
+		case 2:
+			obj[i++] = deque[idx++]; /* fallthrough */
+		case 1:
+			obj[i++] = deque[idx++]; /* fallthrough */
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			obj[i] = deque[idx];
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			obj[i] = deque[idx];
+	}
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_128(struct rte_deque *d,
+			void *obj_table,
+			unsigned int n)
+{
+	unsigned int i;
+	const uint32_t size = d->size;
+	uint32_t idx = (d->tail & d->mask);
+	const rte_int128_t *deque = (const rte_int128_t *)&d[1];
+	rte_int128_t *obj = (rte_int128_t *)obj_table;
+	if (likely(idx + n <= size)) {
+		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
+			memcpy((void *)(obj + i),
+				(const void *)(deque + idx), 32);
+		switch (n & 0x1) {
+		case 1:
+			memcpy((void *)(obj + i),
+				(const void *)(deque + idx), 16);
+		}
+	} else {
+		for (i = 0; idx < size; i++, idx++)
+			memcpy((void *)(obj + i),
+				(const void *)(deque + idx), 16);
+		/* Start at the beginning */
+		for (idx = 0; i < n; i++, idx++)
+			memcpy((void *)(obj + i),
+				(const void *)(deque + idx), 16);
+	}
+}
+
+static __rte_always_inline unsigned int
+__rte_deque_dequeue_at_tail(struct rte_deque *d,
+			void *obj_table,
+			unsigned int esize,
+			unsigned int n)
+{
+	/* 8B and 16B copies implemented individually because on some platforms
+	 * there are 64 bit and 128 bit registers available for direct copying.
+	 */
+	if (esize == 8)
+		__rte_deque_dequeue_elems_64(d, obj_table, n);
+	else if (esize == 16)
+		__rte_deque_dequeue_elems_128(d, obj_table, n);
+	else {
+		uint32_t idx, scale, nd_idx, nd_num, nd_size;
+
+		/* Normalize to uint32_t */
+		scale = esize / sizeof(uint32_t);
+		nd_num = n * scale;
+		idx = d->tail & d->mask;
+		nd_idx = idx * scale;
+		nd_size = d->size * scale;
+		__rte_deque_dequeue_elems_32(d, nd_size, nd_idx,
+					obj_table, nd_num);
+	}
+	d->tail = (d->tail + n) & d->mask;
+	return n;
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_head_32(struct rte_deque *d,
+				const unsigned int mask,
+				uint32_t idx,
+				void *obj_table,
+				unsigned int n,
+				const unsigned int scale,
+				const unsigned int elem_size)
+{
+	unsigned int i;
+	const uint32_t *deque = (uint32_t *)&d[1];
+	uint32_t *obj = (uint32_t *)obj_table;
+
+	if (likely(idx >= n)) {
+		for (i = 0; i < n; idx -= scale, i += scale)
+			memcpy(&obj[i], &deque[idx], elem_size);
+	} else {
+		for (i = 0; (int32_t)idx >= 0; idx -= scale, i += scale)
+			memcpy(&obj[i], &deque[idx], elem_size);
+		/* Start at the ending */
+		idx = mask;
+		for (; i < n; idx -= scale, i += scale)
+			memcpy(&obj[i], &deque[idx], elem_size);
+	}
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_head_64(struct rte_deque *d,
+				void *obj_table,
+				unsigned int n)
+{
+	unsigned int i;
+	uint32_t idx = (d->head & d->mask);
+	const uint64_t *deque = (uint64_t *)&d[1];
+	unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
+	if (likely((int32_t)(idx - n) >= 0)) {
+		for (i = 0; i < (n & ~0x3); i += 4, idx -= 4) {
+			obj[i] = deque[idx];
+			obj[i + 1] = deque[idx - 1];
+			obj[i + 2] = deque[idx - 2];
+			obj[i + 3] = deque[idx - 3];
+		}
+		switch (n & 0x3) {
+		case 3:
+			obj[i++] = deque[idx--];  /* fallthrough */
+		case 2:
+			obj[i++] = deque[idx--]; /* fallthrough */
+		case 1:
+			obj[i++] = deque[idx--]; /* fallthrough */
+		}
+	} else {
+		for (i = 0; (int32_t)idx >= 0; i++, idx--)
+			obj[i] = deque[idx];
+		/* Start at the ending */
+		for (idx = d->mask; i < n; i++, idx--)
+			obj[i] = deque[idx];
+	}
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_head_128(struct rte_deque *d,
+				void *obj_table,
+				unsigned int n)
+{
+	unsigned int i;
+	uint32_t idx = (d->head & d->mask);
+	const rte_int128_t *deque = (rte_int128_t *)&d[1];
+	rte_int128_t *obj = (rte_int128_t *)obj_table;
+	if (likely((int32_t)(idx - n) >= 0)) {
+		for (i = 0; i < (n & ~0x1); i += 2, idx -= 2) {
+			obj[i] = deque[idx];
+			obj[i + 1] = deque[idx - 1];
+		}
+		switch (n & 0x1) {
+		case 1:
+			memcpy((void *)(obj + i),
+				(const void *)(deque + idx), 16);
+		}
+	} else {
+		for (i = 0; (int32_t)idx >= 0; i++, idx--)
+			memcpy((void *)(obj + i),
+				(const void *)(deque + idx), 16);
+		/* Start at the ending */
+		for (idx = d->mask; i < n; i++, idx--)
+			memcpy((void *)(obj + i),
+				(const void *)(deque + idx), 16);
+	}
+}
+
+static __rte_always_inline unsigned int
+__rte_deque_dequeue_at_head(struct rte_deque *d,
+			void *obj_table,
+			unsigned int esize,
+			unsigned int n)
+{
+	/* The head must point at an empty cell when dequeueing */
+	d->head--;
+
+	/* 8B and 16B copies implemented individually because on some platforms
+	 * there are 64 bit and 128 bit registers available for direct copying.
+	 */
+	if (esize == 8)
+		__rte_deque_dequeue_elems_head_64(d, obj_table, n);
+	else if (esize == 16)
+		__rte_deque_dequeue_elems_head_128(d, obj_table, n);
+	else {
+		uint32_t idx, scale, nd_idx, nd_num, nd_mask;
+
+		/* Normalize to uint32_t */
+		scale = esize / sizeof(uint32_t);
+		nd_num = n * scale;
+		idx = d->head & d->mask;
+		nd_idx = idx * scale;
+		nd_mask = d->mask * scale;
+		__rte_deque_dequeue_elems_head_32(d, nd_mask, nd_idx, obj_table,
+						nd_num, scale, esize);
+	}
+
+	/* The +1 is because the head needs to point at a
+	 * empty memory location after the dequeueing operation.
+	 */
+	d->head = (d->head - n + 1) & d->mask;
+	return n;
+}
+#endif /* _RTE_DEQUEU_PVT_H_ */
diff --git a/lib/deque/rte_deque_zc.h b/lib/deque/rte_deque_zc.h
new file mode 100644
index 0000000000..6d7167e158
--- /dev/null
+++ b/lib/deque/rte_deque_zc.h
@@ -0,0 +1,430 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Arm Limited
+ */
+#ifndef _RTE_DEQUE_ZC_H_
+#define _RTE_DEQUE_ZC_H_
+
+/**
+ * @file
+ * This file should not be included directly, include rte_deque.h instead.
+ *
+ * Deque Zero Copy APIs
+ * These APIs make it possible to split public enqueue/dequeue API
+ * into 3 parts:
+ * - enqueue/dequeue start
+ * - copy data to/from the deque
+ * - enqueue/dequeue finish
+ * These APIs provide the ability to avoid copying of the data to temporary area.
+ *
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Deque zero-copy information structure.
+ *
+ * This structure contains the pointers and length of the space
+ * reserved on the Deque storage.
+ */
+struct __rte_cache_aligned rte_deque_zc_data {
+	/* Pointer to the first space in the deque */
+	void *ptr1;
+	/* Pointer to the second space in the deque if there is wrap-around.
+	 * It contains valid value only if wrap-around happens.
+	 */
+	void *ptr2;
+	/* Number of elements in the first pointer. If this is equal to
+	 * the number of elements requested, then ptr2 is NULL.
+	 * Otherwise, subtracting n1 from number of elements requested
+	 * will give the number of elements available at ptr2.
+	 */
+	unsigned int n1;
+};
+
+static __rte_always_inline void
+__rte_deque_get_elem_addr(struct rte_deque *d, uint32_t pos,
+	uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2,
+	bool low_to_high)
+{
+	uint32_t idx, scale, nr_idx;
+	uint32_t *deque_ptr = (uint32_t *)&d[1];
+
+	/* Normalize to uint32_t */
+	scale = esize / sizeof(uint32_t);
+	idx = pos & d->mask;
+	nr_idx = idx * scale;
+
+	*dst1 = deque_ptr + nr_idx;
+	*n1 = num;
+
+	if (low_to_high) {
+		if (idx + num > d->size) {
+			*n1 = d->size - idx;
+			*dst2 = deque_ptr;
+		} else
+			*dst2 = NULL;
+	} else {
+		if ((int32_t)(idx - num) < 0) {
+			*n1 = idx + 1;
+			*dst2 = (void *)&deque_ptr[(-1 & d->mask) * scale];
+		} else
+			*dst2 = NULL;
+	}
+}
+
+/**
+ * Start to enqueue several objects on the deque.
+ * Note that no actual objects are put in the deque by this function,
+ * it just reserves space for the user on the deque.
+ * User has to copy objects into the deque using the returned pointers.
+ * User should call rte_deque_enqueue_zc_elem_finish to complete the
+ * enqueue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to add in the deque.
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param free_space
+ *   Returns the amount of space in the deque after the reservation operation
+ *   has finished.
+ * @return
+ *   The number of objects that can be enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_enqueue_zc_bulk_elem_start(struct rte_deque *d, unsigned int esize,
+	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space)
+{
+
+	*free_space = __RTE_DEQUE_FREE_SPACE(d);
+	if (unlikely(*free_space < n))
+		return 0;
+	__rte_deque_get_elem_addr(d, d->head, esize, n, &zcd->ptr1,
+							&zcd->n1, &zcd->ptr2, true);
+
+	*free_space -= n;
+	return n;
+}
+
+/**
+ * Complete enqueuing several pointers to objects on the deque.
+ * Note that number of objects to enqueue should not exceed previous
+ * enqueue_start return value.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param n
+ *   The number of pointers to objects to add to the deque.
+ */
+__rte_experimental
+static __rte_always_inline void
+rte_deque_head_enqueue_zc_elem_finish(struct rte_deque *d, unsigned int n)
+{
+	d->head = (d->head + n) & d->mask;
+}
+
+/**
+ * Start to enqueue several objects on the deque.
+ * Note that no actual objects are put in the queue by this function,
+ * it just reserves space for the user on the deque.
+ * User has to copy objects into the queue using the returned pointers.
+ * User should call rte_deque_enqueue_zc_elem_finish to complete the
+ * enqueue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to add in the deque.
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param free_space
+ *   Returns the amount of space in the deque after the reservation operation
+ *   has finished.
+ * @return
+ *   The number of objects that can be enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_enqueue_zc_burst_elem_start(struct rte_deque *d, unsigned int esize,
+	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space)
+{
+	*free_space = __RTE_DEQUE_FREE_SPACE(d);
+	n = n > *free_space ? *free_space : n;
+	return rte_deque_head_enqueue_zc_bulk_elem_start(d, esize, n, zcd, free_space);
+}
+
+/**
+ * Start to enqueue several objects on the deque.
+ * Note that no actual objects are put in the deque by this function,
+ * it just reserves space for the user on the deque.
+ * User has to copy objects into the deque using the returned pointers.
+ * User should call rte_deque_enqueue_zc_elem_finish to complete the
+ * enqueue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to add in the deque.
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param free_space
+ *   Returns the amount of space in the deque after the reservation operation
+ *   has finished.
+ * @return
+ *   The number of objects that can be enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_enqueue_zc_bulk_elem_start(struct rte_deque *d, unsigned int esize,
+	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space)
+{
+	*free_space = __RTE_DEQUE_FREE_SPACE(d);
+	if (unlikely(*free_space < n))
+		return 0;
+	__rte_deque_get_elem_addr(d, d->tail - 1, esize, n, &zcd->ptr1,
+							  &zcd->n1, &zcd->ptr2, false);
+
+	*free_space -= n;
+	return n;
+}
+
+/**
+ * Complete enqueuing several pointers to objects on the deque.
+ * Note that number of objects to enqueue should not exceed previous
+ * enqueue_start return value.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param n
+ *   The number of pointers to objects to add to the deque.
+ */
+__rte_experimental
+static __rte_always_inline void
+rte_deque_tail_enqueue_zc_elem_finish(struct rte_deque *d, unsigned int n)
+{
+	d->tail = (d->tail - n) & d->mask;
+}
+
+/**
+ * Start to enqueue several objects on the deque.
+ * Note that no actual objects are put in the queue by this function,
+ * it just reserves space for the user on the deque.
+ * User has to copy objects into the queue using the returned pointers.
+ * User should call rte_deque_enqueue_zc_elem_finish to complete the
+ * enqueue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to add in the deque.@param r
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param free_space
+ *   Returns the amount of space in the deque after the reservation operation
+ *   has finished.
+ * @return
+ *   The number of objects that can be enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_enqueue_zc_burst_elem_start(struct rte_deque *d, unsigned int esize,
+	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space)
+{
+	*free_space = __RTE_DEQUE_FREE_SPACE(d);
+	n = n > *free_space ? *free_space : n;
+	return rte_deque_tail_enqueue_zc_bulk_elem_start(d, esize, n, zcd, free_space);
+}
+
+/**
+ * Start to dequeue several objects from the deque.
+ * Note that no actual objects are copied from the queue by this function.
+ * User has to copy objects from the queue using the returned pointers.
+ * User should call rte_deque_dequeue_zc_elem_finish to complete the
+ * dequeue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to remove from the deque.
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue has
+ *   finished.
+ * @return
+ *   The number of objects that can be dequeued, either 0 or n.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_dequeue_zc_bulk_elem_start(struct rte_deque *d, unsigned int esize,
+	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available)
+{
+	*available = __RTE_DEQUE_COUNT(d);
+	if (unlikely(*available < n))
+		return 0;
+	__rte_deque_get_elem_addr(d, d->tail, esize, n, &zcd->ptr1,
+							&zcd->n1, &zcd->ptr2, true);
+
+	*available -= n;
+	return n;
+}
+
+/**
+ * Complete dequeuing several objects from the deque.
+ * Note that number of objects to dequeued should not exceed previous
+ * dequeue_start return value.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param n
+ *   The number of objects to remove from the deque.
+ */
+__rte_experimental
+static __rte_always_inline void
+rte_deque_tail_dequeue_zc_elem_finish(struct rte_deque *d, unsigned int n)
+{
+	d->tail = (d->tail + n) & d->mask;
+}
+
+/**
+ * Start to dequeue several objects from the deque.
+ * Note that no actual objects are copied from the queue by this function.
+ * User has to copy objects from the queue using the returned pointers.
+ * User should call rte_deque_dequeue_zc_elem_finish to complete the
+ * dequeue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to remove from the deque.
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue has
+ *   finished.
+ * @return
+ *   The number of objects that can be dequeued, either 0 or n.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_dequeue_zc_burst_elem_start(struct rte_deque *d, unsigned int esize,
+	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available)
+{
+	*available = __RTE_DEQUE_COUNT(d);
+	n = n > *available ? *available : n;
+	return rte_deque_tail_dequeue_zc_bulk_elem_start(d, esize, n, zcd, available);
+}
+
+/**
+ * Start to dequeue several objects from the deque.
+ * Note that no actual objects are copied from the queue by this function.
+ * User has to copy objects from the queue using the returned pointers.
+ * User should call rte_deque_dequeue_zc_elem_finish to complete the
+ * dequeue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to remove from the deque.
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue has
+ *   finished.
+ * @return
+ *   The number of objects that can be dequeued, either 0 or n.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_dequeue_zc_bulk_elem_start(struct rte_deque *d, unsigned int esize,
+	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available)
+{
+	*available = __RTE_DEQUE_COUNT(d);
+	if (unlikely(*available < n))
+		return 0;
+	__rte_deque_get_elem_addr(d, d->head - 1, esize, n, &zcd->ptr1,
+							&zcd->n1, &zcd->ptr2, false);
+
+	*available -= n;
+	return n;
+}
+
+/**
+ * Complete dequeuing several objects from the deque.
+ * Note that number of objects to dequeued should not exceed previous
+ * dequeue_start return value.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param n
+ *   The number of objects to remove from the deque.
+ */
+__rte_experimental
+static __rte_always_inline void
+rte_deque_head_dequeue_zc_elem_finish(struct rte_deque *d, unsigned int n)
+{
+	d->head = (d->head - n) & d->mask;
+}
+
+/**
+ * Start to dequeue several objects from the deque.
+ * Note that no actual objects are copied from the queue by this function.
+ * User has to copy objects from the queue using the returned pointers.
+ * User should call rte_deque_dequeue_zc_elem_finish to complete the
+ * dequeue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to remove from the deque.
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue has
+ *   finished.
+ * @return
+ *   The number of objects that can be dequeued, either 0 or n.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_dequeue_zc_burst_elem_start(struct rte_deque *d, unsigned int esize,
+	unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available)
+{
+	*available = __RTE_DEQUE_COUNT(d);
+	n = n > *available ? *available : n;
+	return rte_deque_head_dequeue_zc_bulk_elem_start(d, esize, n, zcd, available);
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_DEQUE_ZC_H_ */
diff --git a/lib/deque/version.map b/lib/deque/version.map
new file mode 100644
index 0000000000..103fd3b512
--- /dev/null
+++ b/lib/deque/version.map
@@ -0,0 +1,14 @@ 
+EXPERIMENTAL {
+	global:
+
+	# added in 24.07
+	rte_deque_log_type;
+	rte_deque_create;
+	rte_deque_dump;
+	rte_deque_free;
+	rte_deque_get_memsize_elem;
+	rte_deque_init;
+	rte_deque_reset;
+
+	local: *;
+};
diff --git a/lib/meson.build b/lib/meson.build
index 179a272932..82929b7a11 100644
--- a/lib/meson.build
+++ b/lib/meson.build
@@ -14,6 +14,7 @@  libraries = [
         'argparse',
         'telemetry', # basic info querying
         'eal', # everything depends on eal
+        'deque',
         'ring',
         'rcu', # rcu depends on ring
         'mempool',
@@ -74,6 +75,7 @@  if is_ms_compiler
             'kvargs',
             'telemetry',
             'eal',
+            'dpdk',
             'ring',
     ]
 endif