get:
Show a patch.

patch:
Update a patch.

put:
Update a patch.

GET /api/patches/51307/?format=api
HTTP 200 OK
Allow: GET, PUT, PATCH, HEAD, OPTIONS
Content-Type: application/json
Vary: Accept

{
    "id": 51307,
    "url": "http://patches.dpdk.org/api/patches/51307/?format=api",
    "web_url": "http://patches.dpdk.org/project/dpdk/patch/20190319012010.16793-4-gage.eads@intel.com/",
    "project": {
        "id": 1,
        "url": "http://patches.dpdk.org/api/projects/1/?format=api",
        "name": "DPDK",
        "link_name": "dpdk",
        "list_id": "dev.dpdk.org",
        "list_email": "dev@dpdk.org",
        "web_url": "http://core.dpdk.org",
        "scm_url": "git://dpdk.org/dpdk",
        "webscm_url": "http://git.dpdk.org/dpdk",
        "list_archive_url": "https://inbox.dpdk.org/dev",
        "list_archive_url_format": "https://inbox.dpdk.org/dev/{}",
        "commit_url_format": ""
    },
    "msgid": "<20190319012010.16793-4-gage.eads@intel.com>",
    "list_archive_url": "https://inbox.dpdk.org/dev/20190319012010.16793-4-gage.eads@intel.com",
    "date": "2019-03-19T01:20:07",
    "name": "[v8,3/6] ring: add a lock-free implementation",
    "commit_ref": null,
    "pull_url": null,
    "state": "rejected",
    "archived": true,
    "hash": "1503155813471e407fa70ea33f1cf275bc40bc47",
    "submitter": {
        "id": 586,
        "url": "http://patches.dpdk.org/api/people/586/?format=api",
        "name": "Eads, Gage",
        "email": "gage.eads@intel.com"
    },
    "delegate": {
        "id": 1,
        "url": "http://patches.dpdk.org/api/users/1/?format=api",
        "username": "tmonjalo",
        "first_name": "Thomas",
        "last_name": "Monjalon",
        "email": "thomas@monjalon.net"
    },
    "mbox": "http://patches.dpdk.org/project/dpdk/patch/20190319012010.16793-4-gage.eads@intel.com/mbox/",
    "series": [
        {
            "id": 3791,
            "url": "http://patches.dpdk.org/api/series/3791/?format=api",
            "web_url": "http://patches.dpdk.org/project/dpdk/list/?series=3791",
            "date": "2019-03-19T01:20:04",
            "name": "Add lock-free ring and mempool handler",
            "version": 8,
            "mbox": "http://patches.dpdk.org/series/3791/mbox/"
        }
    ],
    "comments": "http://patches.dpdk.org/api/patches/51307/comments/",
    "check": "fail",
    "checks": "http://patches.dpdk.org/api/patches/51307/checks/",
    "tags": {},
    "related": [],
    "headers": {
        "Return-Path": "<dev-bounces@dpdk.org>",
        "X-Original-To": "patchwork@dpdk.org",
        "Delivered-To": "patchwork@dpdk.org",
        "Received": [
            "from [92.243.14.124] (localhost [127.0.0.1])\n\tby dpdk.org (Postfix) with ESMTP id 005F34C9F;\n\tTue, 19 Mar 2019 02:20:54 +0100 (CET)",
            "from mga02.intel.com (mga02.intel.com [134.134.136.20])\n\tby dpdk.org (Postfix) with ESMTP id B93B2324D\n\tfor <dev@dpdk.org>; Tue, 19 Mar 2019 02:20:44 +0100 (CET)",
            "from orsmga008.jf.intel.com ([10.7.209.65])\n\tby orsmga101.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384;\n\t18 Mar 2019 18:20:43 -0700",
            "from txasoft-yocto.an.intel.com ([10.123.72.192])\n\tby orsmga008.jf.intel.com with ESMTP; 18 Mar 2019 18:20:42 -0700"
        ],
        "X-Amp-Result": "SKIPPED(no attachment in message)",
        "X-Amp-File-Uploaded": "False",
        "X-ExtLoop1": "1",
        "X-IronPort-AV": "E=Sophos;i=\"5.58,495,1544515200\"; d=\"scan'208\";a=\"126564255\"",
        "From": "Gage Eads <gage.eads@intel.com>",
        "To": "dev@dpdk.org",
        "Cc": "olivier.matz@6wind.com, arybchenko@solarflare.com,\n\tbruce.richardson@intel.com, konstantin.ananyev@intel.com,\n\tstephen@networkplumber.org, jerinj@marvell.com, mczekaj@marvell.com, \n\tnd@arm.com, Ola.Liljedahl@arm.com, gage.eads@intel.com",
        "Date": "Mon, 18 Mar 2019 20:20:07 -0500",
        "Message-Id": "<20190319012010.16793-4-gage.eads@intel.com>",
        "X-Mailer": "git-send-email 2.13.6",
        "In-Reply-To": "<20190319012010.16793-1-gage.eads@intel.com>",
        "References": "<20190318213555.17345-1-gage.eads@intel.com>\n\t<20190319012010.16793-1-gage.eads@intel.com>",
        "Subject": "[dpdk-dev] [PATCH v8 3/6] ring: add a lock-free implementation",
        "X-BeenThere": "dev@dpdk.org",
        "X-Mailman-Version": "2.1.15",
        "Precedence": "list",
        "List-Id": "DPDK patches and discussions <dev.dpdk.org>",
        "List-Unsubscribe": "<https://mails.dpdk.org/options/dev>,\n\t<mailto:dev-request@dpdk.org?subject=unsubscribe>",
        "List-Archive": "<http://mails.dpdk.org/archives/dev/>",
        "List-Post": "<mailto:dev@dpdk.org>",
        "List-Help": "<mailto:dev-request@dpdk.org?subject=help>",
        "List-Subscribe": "<https://mails.dpdk.org/listinfo/dev>,\n\t<mailto:dev-request@dpdk.org?subject=subscribe>",
        "Errors-To": "dev-bounces@dpdk.org",
        "Sender": "\"dev\" <dev-bounces@dpdk.org>"
    },
    "content": "This commit adds support for lock-free circular ring enqueue and dequeue\nfunctions. The ring is supported on 32- and 64-bit architectures, however\nit uses a 128-bit compare-and-swap instruction when run on a 64-bit\narchitecture, and thus is currently limited to x86_64.\n\nThe algorithm is based on Ola Liljedahl's lfring, modified to fit within\nthe rte ring API. With no contention, an enqueue of n pointers uses (1 + n)\nCAS operations and a dequeue of n pointers uses 1. This algorithm has worse\naverage-case performance than the regular rte ring (particularly a\nhighly-contended ring with large bulk accesses), however:\n- For applications with preemptible pthreads, the regular rte ring's\n  worst-case performance (i.e. one thread being preempted in the\n  update_tail() critical section) is much worse than the lock-free ring's.\n- Software caching can mitigate the average case performance for ring-based\n  algorithms. For example, a lock-free ring based mempool (a likely use\n  case for this ring) with per-thread caching.\n\nTo avoid the ABA problem, each ring entry contains a modification counter.\nOn a 64-bit architecture, the chance of ABA occurring are effectively zero;\na 64-bit counter will take many years to wrap at current CPU frequencies.\nOn a 32-bit architectures, a lock-free ring must be at least 1024-entries\ndeep; assuming 100 cycles per ring entry access, this guarantees the ring's\nmodification counters will wrap on the order of days.\n\nThe lock-free ring is enabled via a new flag, RING_F_LF. Because the ring's\nmemsize is now a function of its flags (the lock-free ring requires 128b\nfor each entry), this commit adds a new argument ('flags') to\nrte_ring_get_memsize(). An API deprecation notice will be sent in a\nseparate commit.\n\nFor ease-of-use, existing ring enqueue and dequeue functions work on both\nregular and lock-free rings. This introduces an additional branch in the\ndatapath, but this should be a highly predictable branch.\nring_perf_autotest shows a negligible performance impact; it's hard to\ndistinguish a real difference versus system noise.\n\n                                  | ring_perf_autotest cycles with branch -\n             Test                 |   ring_perf_autotest cycles without\n------------------------------------------------------------------\nSP/SC single enq/dequeue          | 0.33\nMP/MC single enq/dequeue          | -4.00\nSP/SC burst enq/dequeue (size 8)  | 0.00\nMP/MC burst enq/dequeue (size 8)  | 0.00\nSP/SC burst enq/dequeue (size 32) | 0.00\nMP/MC burst enq/dequeue (size 32) | 0.00\nSC empty dequeue                  | 1.00\nMC empty dequeue                  | 0.00\n\nSingle lcore:\nSP/SC bulk enq/dequeue (size 8)   | 0.49\nMP/MC bulk enq/dequeue (size 8)   | 0.08\nSP/SC bulk enq/dequeue (size 32)  | 0.07\nMP/MC bulk enq/dequeue (size 32)  | 0.09\n\nTwo physical cores:\nSP/SC bulk enq/dequeue (size 8)   | 0.19\nMP/MC bulk enq/dequeue (size 8)   | -0.37\nSP/SC bulk enq/dequeue (size 32)  | 0.09\nMP/MC bulk enq/dequeue (size 32)  | -0.05\n\nTwo NUMA nodes:\nSP/SC bulk enq/dequeue (size 8)   | -1.96\nMP/MC bulk enq/dequeue (size 8)   | 0.88\nSP/SC bulk enq/dequeue (size 32)  | 0.10\nMP/MC bulk enq/dequeue (size 32)  | 0.46\n\nTest setup: x86_64 build with default config, dual-socket Xeon E5-2699 v4,\nrunning on isolcpus cores with a tickless scheduler. Each test run three\ntimes and the results averaged.\n\nSigned-off-by: Gage Eads <gage.eads@intel.com>\n---\n lib/librte_ring/rte_ring.c           |  92 +++++++--\n lib/librte_ring/rte_ring.h           | 308 ++++++++++++++++++++++++++---\n lib/librte_ring/rte_ring_c11_mem.h   | 366 ++++++++++++++++++++++++++++++++++-\n lib/librte_ring/rte_ring_generic.h   | 357 +++++++++++++++++++++++++++++++++-\n lib/librte_ring/rte_ring_version.map |   7 +\n 5 files changed, 1082 insertions(+), 48 deletions(-)",
    "diff": "diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c\nindex d215acecc..d4a176f57 100644\n--- a/lib/librte_ring/rte_ring.c\n+++ b/lib/librte_ring/rte_ring.c\n@@ -45,9 +45,9 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)\n \n /* return the size of memory occupied by a ring */\n ssize_t\n-rte_ring_get_memsize(unsigned count)\n+rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags)\n {\n-\tssize_t sz;\n+\tssize_t sz, elt_sz;\n \n \t/* count must be a power of 2 */\n \tif ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {\n@@ -57,10 +57,23 @@ rte_ring_get_memsize(unsigned count)\n \t\treturn -EINVAL;\n \t}\n \n-\tsz = sizeof(struct rte_ring) + count * sizeof(void *);\n+\telt_sz = (flags & RING_F_LF) ? 2 * sizeof(void *) : sizeof(void *);\n+\n+\tsz = sizeof(struct rte_ring) + count * elt_sz;\n \tsz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);\n \treturn sz;\n }\n+BIND_DEFAULT_SYMBOL(rte_ring_get_memsize, _v1905, 19.05);\n+MAP_STATIC_SYMBOL(ssize_t rte_ring_get_memsize(unsigned int count,\n+\t\t\t\t\t       unsigned int flags),\n+\t\t  rte_ring_get_memsize_v1905);\n+\n+ssize_t\n+rte_ring_get_memsize_v20(unsigned int count)\n+{\n+\treturn rte_ring_get_memsize_v1905(count, 0);\n+}\n+VERSION_SYMBOL(rte_ring_get_memsize, _v20, 2.0);\n \n int\n rte_ring_init(struct rte_ring *r, const char *name, unsigned count,\n@@ -75,6 +88,8 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,\n \t\t\t  RTE_CACHE_LINE_MASK) != 0);\n \tRTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) &\n \t\t\t  RTE_CACHE_LINE_MASK) != 0);\n+\tRTE_BUILD_BUG_ON(sizeof(struct rte_ring_lf_entry) !=\n+\t\t\t 2 * sizeof(void *));\n \n \t/* init the ring structure */\n \tmemset(r, 0, sizeof(*r));\n@@ -82,8 +97,6 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,\n \tif (ret < 0 || ret >= (int)sizeof(r->name))\n \t\treturn -ENAMETOOLONG;\n \tr->flags = flags;\n-\tr->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;\n-\tr->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;\n \n \tif (flags & RING_F_EXACT_SZ) {\n \t\tr->size = rte_align32pow2(count + 1);\n@@ -100,12 +113,46 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,\n \t\tr->mask = count - 1;\n \t\tr->capacity = r->mask;\n \t}\n-\tr->prod.head = r->cons.head = 0;\n-\tr->prod.tail = r->cons.tail = 0;\n+\n+\tr->log2_size = rte_log2_u64(r->size);\n+\n+\tif (flags & RING_F_LF) {\n+\t\tuint32_t i;\n+\n+\t\tr->prod_ptr.single =\n+\t\t\t(flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;\n+\t\tr->cons_ptr.single =\n+\t\t\t(flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;\n+\t\tr->prod_ptr.head = r->cons_ptr.head = 0;\n+\t\tr->prod_ptr.tail = r->cons_ptr.tail = 0;\n+\n+\t\tfor (i = 0; i < r->size; i++) {\n+\t\t\tstruct rte_ring_lf_entry *ring_ptr, *base;\n+\n+\t\t\tbase = (struct rte_ring_lf_entry *)&r->ring;\n+\n+\t\t\tring_ptr = &base[i & r->mask];\n+\n+\t\t\tring_ptr->cnt = 0;\n+\t\t}\n+\t} else {\n+\t\tr->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;\n+\t\tr->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;\n+\t\tr->prod.head = r->cons.head = 0;\n+\t\tr->prod.tail = r->cons.tail = 0;\n+\t}\n \n \treturn 0;\n }\n \n+/* If a ring entry is written on average every M cycles, then a ring entry is\n+ * reused every M*count cycles, and a ring entry's counter repeats every\n+ * M*count*2^32 cycles. If M=100 on a 2GHz system, then a 1024-entry ring's\n+ * counters would repeat every 2.37 days. The likelihood of ABA occurring is\n+ * considered sufficiently low for 1024-entry and larger rings.\n+ */\n+#define MIN_32_BIT_LF_RING_SIZE 1024\n+\n /* create the ring */\n struct rte_ring *\n rte_ring_create(const char *name, unsigned count, int socket_id,\n@@ -123,11 +170,25 @@ rte_ring_create(const char *name, unsigned count, int socket_id,\n \n \tring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);\n \n+#ifdef RTE_ARCH_64\n+#if !defined(RTE_ARCH_X86_64)\n+\tprintf(\"This platform does not support the atomic operation required for RING_F_LF\\n\");\n+\trte_errno = EINVAL;\n+\treturn NULL;\n+#endif\n+#else\n+\tif ((flags & RING_F_LF) && count < MIN_32_BIT_LF_RING_SIZE) {\n+\t\tprintf(\"RING_F_LF is only supported on 32-bit platforms for rings with at least 1024 entries.\\n\");\n+\t\trte_errno = EINVAL;\n+\t\treturn NULL;\n+\t}\n+#endif\n+\n \t/* for an exact size ring, round up from count to a power of two */\n \tif (flags & RING_F_EXACT_SZ)\n \t\tcount = rte_align32pow2(count + 1);\n \n-\tring_size = rte_ring_get_memsize(count);\n+\tring_size = rte_ring_get_memsize(count, flags);\n \tif (ring_size < 0) {\n \t\trte_errno = ring_size;\n \t\treturn NULL;\n@@ -227,10 +288,17 @@ rte_ring_dump(FILE *f, const struct rte_ring *r)\n \tfprintf(f, \"  flags=%x\\n\", r->flags);\n \tfprintf(f, \"  size=%\"PRIu32\"\\n\", r->size);\n \tfprintf(f, \"  capacity=%\"PRIu32\"\\n\", r->capacity);\n-\tfprintf(f, \"  ct=%\"PRIu32\"\\n\", r->cons.tail);\n-\tfprintf(f, \"  ch=%\"PRIu32\"\\n\", r->cons.head);\n-\tfprintf(f, \"  pt=%\"PRIu32\"\\n\", r->prod.tail);\n-\tfprintf(f, \"  ph=%\"PRIu32\"\\n\", r->prod.head);\n+\tif (r->flags & RING_F_LF) {\n+\t\tfprintf(f, \"  ct=%\"PRIuPTR\"\\n\", r->cons_ptr.tail);\n+\t\tfprintf(f, \"  ch=%\"PRIuPTR\"\\n\", r->cons_ptr.head);\n+\t\tfprintf(f, \"  pt=%\"PRIuPTR\"\\n\", r->prod_ptr.tail);\n+\t\tfprintf(f, \"  ph=%\"PRIuPTR\"\\n\", r->prod_ptr.head);\n+\t} else {\n+\t\tfprintf(f, \"  ct=%\"PRIu32\"\\n\", r->cons.tail);\n+\t\tfprintf(f, \"  ch=%\"PRIu32\"\\n\", r->cons.head);\n+\t\tfprintf(f, \"  pt=%\"PRIu32\"\\n\", r->prod.tail);\n+\t\tfprintf(f, \"  ph=%\"PRIu32\"\\n\", r->prod.head);\n+\t}\n \tfprintf(f, \"  used=%u\\n\", rte_ring_count(r));\n \tfprintf(f, \"  avail=%u\\n\", rte_ring_free_count(r));\n }\ndiff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h\nindex f16d77b8a..200d7b2a0 100644\n--- a/lib/librte_ring/rte_ring.h\n+++ b/lib/librte_ring/rte_ring.h\n@@ -20,7 +20,7 @@\n  *\n  * - FIFO (First In First Out)\n  * - Maximum size is fixed; the pointers are stored in a table.\n- * - Lockless implementation.\n+ * - Lockless (and optionally, non-blocking/lock-free) implementation.\n  * - Multi- or single-consumer dequeue.\n  * - Multi- or single-producer enqueue.\n  * - Bulk dequeue.\n@@ -98,6 +98,7 @@ struct rte_ring {\n \tconst struct rte_memzone *memzone;\n \t\t\t/**< Memzone, if any, containing the rte_ring */\n \tuint32_t size;           /**< Size of ring. */\n+\tuint32_t log2_size;      /**< log2(size of ring) */\n \tuint32_t mask;           /**< Mask (size-1) of ring. */\n \tuint32_t capacity;       /**< Usable size of ring */\n \n@@ -133,6 +134,18 @@ struct rte_ring {\n  */\n #define RING_F_EXACT_SZ 0x0004\n #define RTE_RING_SZ_MASK  (0x7fffffffU) /**< Ring size mask */\n+/**\n+ * The ring uses lock-free enqueue and dequeue functions. These functions\n+ * do not have the \"non-preemptive\" constraint of a regular rte ring, and thus\n+ * are suited for applications using preemptible pthreads. However, the\n+ * lock-free functions have worse average-case performance than their regular\n+ * rte ring counterparts. When used as the handler for a mempool, per-thread\n+ * caching can mitigate the performance difference by reducing the number (and\n+ * contention) of ring accesses.\n+ *\n+ * This flag is only supported on 32-bit and x86_64 platforms.\n+ */\n+#define RING_F_LF 0x0008\n \n /* @internal defines for passing to the enqueue dequeue worker functions */\n #define __IS_SP 1\n@@ -150,11 +163,15 @@ struct rte_ring {\n  *\n  * @param count\n  *   The number of elements in the ring (must be a power of 2).\n+ * @param flags\n+ *   The flags the ring will be created with.\n  * @return\n  *   - The memory size needed for the ring on success.\n  *   - -EINVAL if count is not a power of 2.\n  */\n-ssize_t rte_ring_get_memsize(unsigned count);\n+ssize_t rte_ring_get_memsize(unsigned int count, unsigned int flags);\n+ssize_t rte_ring_get_memsize_v20(unsigned int count);\n+ssize_t rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags);\n \n /**\n  * Initialize a ring structure.\n@@ -187,6 +204,10 @@ ssize_t rte_ring_get_memsize(unsigned count);\n  *    - RING_F_SC_DEQ: If this flag is set, the default behavior when\n  *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``\n  *      is \"single-consumer\". Otherwise, it is \"multi-consumers\".\n+ *    - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2\n+ *      number, but up to half the ring space may be wasted.\n+ *    - RING_F_LF: If this flag is set, the ring uses lock-free variants of the\n+ *      dequeue and enqueue functions.\n  * @return\n  *   0 on success, or a negative value on error.\n  */\n@@ -222,12 +243,17 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,\n  *    - RING_F_SC_DEQ: If this flag is set, the default behavior when\n  *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``\n  *      is \"single-consumer\". Otherwise, it is \"multi-consumers\".\n+ *    - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2\n+ *      number, but up to half the ring space may be wasted.\n+ *    - RING_F_LF: If this flag is set, the ring uses lock-free variants of the\n+ *      dequeue and enqueue functions.\n  * @return\n  *   On success, the pointer to the new allocated ring. NULL on error with\n  *    rte_errno set appropriately. Possible errno values include:\n  *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure\n  *    - E_RTE_SECONDARY - function was called from a secondary process instance\n- *    - EINVAL - count provided is not a power of 2\n+ *    - EINVAL - count provided is not a power of 2, or RING_F_LF is used on an\n+ *      unsupported platform\n  *    - ENOSPC - the maximum number of memzones has already been allocated\n  *    - EEXIST - a memzone with the same name already exists\n  *    - ENOMEM - no appropriate memory area found in which to create memzone\n@@ -283,6 +309,50 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);\n \t} \\\n } while (0)\n \n+/* The actual enqueue of pointers on the lock-free ring, used by the\n+ * single-producer lock-free enqueue function.\n+ */\n+#define ENQUEUE_PTRS_LF(r, base, prod_head, obj_table, n) do { \\\n+\tunsigned int i; \\\n+\tconst uint32_t size = (r)->size; \\\n+\tsize_t idx = prod_head & (r)->mask; \\\n+\tsize_t new_cnt = prod_head + size; \\\n+\tstruct rte_ring_lf_entry *ring = (struct rte_ring_lf_entry *)base; \\\n+\tunsigned int mask = ~0x3; \\\n+\tif (likely(idx + n < size)) { \\\n+\t\tfor (i = 0; i < (n & mask); i += 4, idx += 4) { \\\n+\t\t\tring[idx].ptr = obj_table[i]; \\\n+\t\t\tring[idx].cnt = (new_cnt + i) >> r->log2_size; \\\n+\t\t\tring[idx + 1].ptr = obj_table[i + 1]; \\\n+\t\t\tring[idx + 1].cnt = (new_cnt + i + 1) >> r->log2_size; \\\n+\t\t\tring[idx + 2].ptr = obj_table[i + 2]; \\\n+\t\t\tring[idx + 2].cnt = (new_cnt + i + 2) >> r->log2_size; \\\n+\t\t\tring[idx + 3].ptr = obj_table[i + 3]; \\\n+\t\t\tring[idx + 3].cnt = (new_cnt + i + 3) >> r->log2_size; \\\n+\t\t} \\\n+\t\tswitch (n & 0x3) { \\\n+\t\tcase 3: \\\n+\t\t\tring[idx].cnt = (new_cnt + i) >> r->log2_size; \\\n+\t\t\tring[idx++].ptr = obj_table[i++]; /* fallthrough */ \\\n+\t\tcase 2: \\\n+\t\t\tring[idx].cnt = (new_cnt + i) >> r->log2_size; \\\n+\t\t\tring[idx++].ptr = obj_table[i++]; /* fallthrough */ \\\n+\t\tcase 1: \\\n+\t\t\tring[idx].cnt = (new_cnt + i) >> r->log2_size; \\\n+\t\t\tring[idx++].ptr = obj_table[i++]; \\\n+\t\t} \\\n+\t} else { \\\n+\t\tfor (i = 0; idx < size; i++, idx++) { \\\n+\t\t\tring[idx].cnt = (new_cnt + i) >> r->log2_size; \\\n+\t\t\tring[idx].ptr = obj_table[i]; \\\n+\t\t} \\\n+\t\tfor (idx = 0; i < n; i++, idx++) {    \\\n+\t\t\tring[idx].cnt = (new_cnt + i) >> r->log2_size; \\\n+\t\t\tring[idx].ptr = obj_table[i]; \\\n+\t\t} \\\n+\t} \\\n+} while (0)\n+\n /* the actual copy of pointers on the ring to obj_table.\n  * Placed here since identical code needed in both\n  * single and multi consumer dequeue functions */\n@@ -314,6 +384,43 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);\n \t} \\\n } while (0)\n \n+/* The actual copy of pointers on the lock-free ring to obj_table. */\n+#define DEQUEUE_PTRS_LF(r, base, cons_head, obj_table, n) do { \\\n+\tunsigned int i; \\\n+\tsize_t idx = cons_head & (r)->mask; \\\n+\tconst uint32_t size = (r)->size; \\\n+\tstruct rte_ring_lf_entry *ring = (struct rte_ring_lf_entry *)base; \\\n+\tunsigned int mask = ~0x3; \\\n+\tif (likely(idx + n < size)) { \\\n+\t\tfor (i = 0; i < (n & mask); i += 4, idx += 4) {\\\n+\t\t\tobj_table[i] = ring[idx].ptr; \\\n+\t\t\tobj_table[i + 1] = ring[idx + 1].ptr; \\\n+\t\t\tobj_table[i + 2] = ring[idx + 2].ptr; \\\n+\t\t\tobj_table[i + 3] = ring[idx + 3].ptr; \\\n+\t\t} \\\n+\t\tswitch (n & 0x3) { \\\n+\t\tcase 3: \\\n+\t\t\tobj_table[i++] = ring[idx++].ptr; /* fallthrough */ \\\n+\t\tcase 2: \\\n+\t\t\tobj_table[i++] = ring[idx++].ptr; /* fallthrough */ \\\n+\t\tcase 1: \\\n+\t\t\tobj_table[i++] = ring[idx++].ptr; \\\n+\t\t} \\\n+\t} else { \\\n+\t\tfor (i = 0; idx < size; i++, idx++) \\\n+\t\t\tobj_table[i] = ring[idx].ptr; \\\n+\t\tfor (idx = 0; i < n; i++, idx++) \\\n+\t\t\tobj_table[i] = ring[idx].ptr; \\\n+\t} \\\n+} while (0)\n+\n+\n+/* @internal 128-bit structure used by the lock-free ring */\n+struct rte_ring_lf_entry {\n+\tvoid *ptr; /**< Data pointer */\n+\tuintptr_t cnt; /**< Modification counter */\n+};\n+\n /* Between load and load. there might be cpu reorder in weak model\n  * (powerpc/arm).\n  * There are 2 choices for the users\n@@ -330,6 +437,70 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);\n #endif\n \n /**\n+ * @internal Enqueue several objects on the lock-free ring\n+ *\n+ * @param r\n+ *   A pointer to the ring structure.\n+ * @param obj_table\n+ *   A pointer to a table of void * pointers (objects).\n+ * @param n\n+ *   The number of objects to add in the ring from the obj_table.\n+ * @param behavior\n+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring\n+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring\n+ * @param is_sp\n+ *   Indicates whether to use single producer or multi-producer head update\n+ * @param free_space\n+ *   returns the amount of space after the enqueue operation has finished\n+ * @return\n+ *   Actual number of objects enqueued.\n+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.\n+ */\n+static __rte_always_inline unsigned int\n+__rte_ring_do_lf_enqueue(struct rte_ring *r, void * const *obj_table,\n+\t\t\t unsigned int n, enum rte_ring_queue_behavior behavior,\n+\t\t\t unsigned int is_sp, unsigned int *free_space)\n+{\n+\tif (is_sp)\n+\t\treturn __rte_ring_do_lf_enqueue_sp(r, obj_table, n,\n+\t\t\t\t\t\t   behavior, free_space);\n+\telse\n+\t\treturn __rte_ring_do_lf_enqueue_mp(r, obj_table, n,\n+\t\t\t\t\t\t   behavior, free_space);\n+}\n+\n+/**\n+ * @internal Dequeue several objects from the lock-free ring\n+ *\n+ * @param r\n+ *   A pointer to the ring structure.\n+ * @param obj_table\n+ *   A pointer to a table of void * pointers (objects).\n+ * @param n\n+ *   The number of objects to pull from the ring.\n+ * @param behavior\n+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring\n+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring\n+ * @param available\n+ *   returns the number of remaining ring entries after the dequeue has finished\n+ * @return\n+ *   - Actual number of objects dequeued.\n+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.\n+ */\n+static __rte_always_inline unsigned int\n+__rte_ring_do_lf_dequeue(struct rte_ring *r, void **obj_table,\n+\t\t unsigned int n, enum rte_ring_queue_behavior behavior,\n+\t\t unsigned int is_sc, unsigned int *available)\n+{\n+\tif (is_sc)\n+\t\treturn __rte_ring_do_lf_dequeue_sc(r, obj_table, n,\n+\t\t\t\t\t\t   behavior, available);\n+\telse\n+\t\treturn __rte_ring_do_lf_dequeue_mc(r, obj_table, n,\n+\t\t\t\t\t\t   behavior, available);\n+}\n+\n+/**\n  * @internal Enqueue several objects on the ring\n  *\n   * @param r\n@@ -436,8 +607,14 @@ static __rte_always_inline unsigned int\n rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,\n \t\t\t unsigned int n, unsigned int *free_space)\n {\n-\treturn __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,\n-\t\t\t__IS_MP, free_space);\n+\tif (r->flags & RING_F_LF)\n+\t\treturn __rte_ring_do_lf_enqueue(r, obj_table, n,\n+\t\t\t\t\t\tRTE_RING_QUEUE_FIXED, __IS_MP,\n+\t\t\t\t\t\tfree_space);\n+\telse\n+\t\treturn __rte_ring_do_enqueue(r, obj_table, n,\n+\t\t\t\t\t     RTE_RING_QUEUE_FIXED, __IS_MP,\n+\t\t\t\t\t     free_space);\n }\n \n /**\n@@ -459,8 +636,14 @@ static __rte_always_inline unsigned int\n rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,\n \t\t\t unsigned int n, unsigned int *free_space)\n {\n-\treturn __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,\n-\t\t\t__IS_SP, free_space);\n+\tif (r->flags & RING_F_LF)\n+\t\treturn __rte_ring_do_lf_enqueue(r, obj_table, n,\n+\t\t\t\t\t\tRTE_RING_QUEUE_FIXED, __IS_SP,\n+\t\t\t\t\t\tfree_space);\n+\telse\n+\t\treturn __rte_ring_do_enqueue(r, obj_table, n,\n+\t\t\t\t\t     RTE_RING_QUEUE_FIXED, __IS_SP,\n+\t\t\t\t\t     free_space);\n }\n \n /**\n@@ -486,8 +669,14 @@ static __rte_always_inline unsigned int\n rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,\n \t\t      unsigned int n, unsigned int *free_space)\n {\n-\treturn __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,\n-\t\t\tr->prod.single, free_space);\n+\tif (r->flags & RING_F_LF)\n+\t\treturn __rte_ring_do_lf_enqueue(r, obj_table, n,\n+\t\t\t\t\t\tRTE_RING_QUEUE_FIXED,\n+\t\t\t\t\t\tr->prod_ptr.single, free_space);\n+\telse\n+\t\treturn __rte_ring_do_enqueue(r, obj_table, n,\n+\t\t\t\t\t     RTE_RING_QUEUE_FIXED,\n+\t\t\t\t\t     r->prod.single, free_space);\n }\n \n /**\n@@ -570,8 +759,14 @@ static __rte_always_inline unsigned int\n rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,\n \t\tunsigned int n, unsigned int *available)\n {\n-\treturn __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,\n-\t\t\t__IS_MC, available);\n+\tif (r->flags & RING_F_LF)\n+\t\treturn __rte_ring_do_lf_dequeue(r, obj_table, n,\n+\t\t\t\t\t\tRTE_RING_QUEUE_FIXED, __IS_MC,\n+\t\t\t\t\t\tavailable);\n+\telse\n+\t\treturn __rte_ring_do_dequeue(r, obj_table, n,\n+\t\t\t\t\t     RTE_RING_QUEUE_FIXED, __IS_MC,\n+\t\t\t\t\t     available);\n }\n \n /**\n@@ -594,8 +789,14 @@ static __rte_always_inline unsigned int\n rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,\n \t\tunsigned int n, unsigned int *available)\n {\n-\treturn __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,\n-\t\t\t__IS_SC, available);\n+\tif (r->flags & RING_F_LF)\n+\t\treturn __rte_ring_do_lf_dequeue(r, obj_table, n,\n+\t\t\t\t\t\tRTE_RING_QUEUE_FIXED, __IS_SC,\n+\t\t\t\t\t\tavailable);\n+\telse\n+\t\treturn __rte_ring_do_dequeue(r, obj_table, n,\n+\t\t\t\t\t     RTE_RING_QUEUE_FIXED, __IS_SC,\n+\t\t\t\t\t     available);\n }\n \n /**\n@@ -621,8 +822,14 @@ static __rte_always_inline unsigned int\n rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,\n \t\tunsigned int *available)\n {\n-\treturn __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,\n-\t\t\t\tr->cons.single, available);\n+\tif (r->flags & RING_F_LF)\n+\t\treturn __rte_ring_do_lf_dequeue(r, obj_table, n,\n+\t\t\t\t\t\tRTE_RING_QUEUE_FIXED,\n+\t\t\t\t\t\tr->cons_ptr.single, available);\n+\telse\n+\t\treturn __rte_ring_do_dequeue(r, obj_table, n,\n+\t\t\t\t\t     RTE_RING_QUEUE_FIXED,\n+\t\t\t\t\t     r->cons.single, available);\n }\n \n /**\n@@ -697,9 +904,13 @@ rte_ring_dequeue(struct rte_ring *r, void **obj_p)\n static inline unsigned\n rte_ring_count(const struct rte_ring *r)\n {\n-\tuint32_t prod_tail = r->prod.tail;\n-\tuint32_t cons_tail = r->cons.tail;\n-\tuint32_t count = (prod_tail - cons_tail) & r->mask;\n+\tuint32_t count;\n+\n+\tif (r->flags & RING_F_LF)\n+\t\tcount = (r->prod_ptr.tail - r->cons_ptr.tail) & r->mask;\n+\telse\n+\t\tcount = (r->prod.tail - r->cons.tail) & r->mask;\n+\n \treturn (count > r->capacity) ? r->capacity : count;\n }\n \n@@ -819,8 +1030,14 @@ static __rte_always_inline unsigned\n rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,\n \t\t\t unsigned int n, unsigned int *free_space)\n {\n-\treturn __rte_ring_do_enqueue(r, obj_table, n,\n-\t\t\tRTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);\n+\tif (r->flags & RING_F_LF)\n+\t\treturn __rte_ring_do_lf_enqueue(r, obj_table, n,\n+\t\t\t\t\t\tRTE_RING_QUEUE_VARIABLE,\n+\t\t\t\t\t\t__IS_MP, free_space);\n+\telse\n+\t\treturn __rte_ring_do_enqueue(r, obj_table, n,\n+\t\t\t\t\t     RTE_RING_QUEUE_VARIABLE,\n+\t\t\t\t\t     __IS_MP, free_space);\n }\n \n /**\n@@ -842,8 +1059,14 @@ static __rte_always_inline unsigned\n rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,\n \t\t\t unsigned int n, unsigned int *free_space)\n {\n-\treturn __rte_ring_do_enqueue(r, obj_table, n,\n-\t\t\tRTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);\n+\tif (r->flags & RING_F_LF)\n+\t\treturn __rte_ring_do_lf_enqueue(r, obj_table, n,\n+\t\t\t\t\t\tRTE_RING_QUEUE_VARIABLE,\n+\t\t\t\t\t\t__IS_SP, free_space);\n+\telse\n+\t\treturn __rte_ring_do_enqueue(r, obj_table, n,\n+\t\t\t\t\t     RTE_RING_QUEUE_VARIABLE,\n+\t\t\t\t\t     __IS_SP, free_space);\n }\n \n /**\n@@ -869,8 +1092,14 @@ static __rte_always_inline unsigned\n rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,\n \t\t      unsigned int n, unsigned int *free_space)\n {\n-\treturn __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE,\n-\t\t\tr->prod.single, free_space);\n+\tif (r->flags & RING_F_LF)\n+\t\treturn __rte_ring_do_lf_enqueue(r, obj_table, n,\n+\t\t\t\t\t\tRTE_RING_QUEUE_VARIABLE,\n+\t\t\t\t\t\tr->prod_ptr.single, free_space);\n+\telse\n+\t\treturn __rte_ring_do_enqueue(r, obj_table, n,\n+\t\t\t\t\t     RTE_RING_QUEUE_VARIABLE,\n+\t\t\t\t\t     r->prod.single, free_space);\n }\n \n /**\n@@ -897,8 +1126,14 @@ static __rte_always_inline unsigned\n rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,\n \t\tunsigned int n, unsigned int *available)\n {\n-\treturn __rte_ring_do_dequeue(r, obj_table, n,\n-\t\t\tRTE_RING_QUEUE_VARIABLE, __IS_MC, available);\n+\tif (r->flags & RING_F_LF)\n+\t\treturn __rte_ring_do_lf_dequeue(r, obj_table, n,\n+\t\t\t\t\t\tRTE_RING_QUEUE_VARIABLE,\n+\t\t\t\t\t\t__IS_MC, available);\n+\telse\n+\t\treturn __rte_ring_do_dequeue(r, obj_table, n,\n+\t\t\t\t\t     RTE_RING_QUEUE_VARIABLE,\n+\t\t\t\t\t     __IS_MC, available);\n }\n \n /**\n@@ -922,8 +1157,14 @@ static __rte_always_inline unsigned\n rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,\n \t\tunsigned int n, unsigned int *available)\n {\n-\treturn __rte_ring_do_dequeue(r, obj_table, n,\n-\t\t\tRTE_RING_QUEUE_VARIABLE, __IS_SC, available);\n+\tif (r->flags & RING_F_LF)\n+\t\treturn __rte_ring_do_lf_dequeue(r, obj_table, n,\n+\t\t\t\t\t\tRTE_RING_QUEUE_VARIABLE,\n+\t\t\t\t\t\t__IS_SC, available);\n+\telse\n+\t\treturn __rte_ring_do_dequeue(r, obj_table, n,\n+\t\t\t\t\t     RTE_RING_QUEUE_VARIABLE,\n+\t\t\t\t\t     __IS_SC, available);\n }\n \n /**\n@@ -949,9 +1190,14 @@ static __rte_always_inline unsigned\n rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,\n \t\tunsigned int n, unsigned int *available)\n {\n-\treturn __rte_ring_do_dequeue(r, obj_table, n,\n-\t\t\t\tRTE_RING_QUEUE_VARIABLE,\n-\t\t\t\tr->cons.single, available);\n+\tif (r->flags & RING_F_LF)\n+\t\treturn __rte_ring_do_lf_dequeue(r, obj_table, n,\n+\t\t\t\t\t\tRTE_RING_QUEUE_VARIABLE,\n+\t\t\t\t\t\tr->cons_ptr.single, available);\n+\telse\n+\t\treturn __rte_ring_do_dequeue(r, obj_table, n,\n+\t\t\t\t\t     RTE_RING_QUEUE_VARIABLE,\n+\t\t\t\t\t     r->cons.single, available);\n }\n \n #ifdef __cplusplus\ndiff --git a/lib/librte_ring/rte_ring_c11_mem.h b/lib/librte_ring/rte_ring_c11_mem.h\nindex 545caf257..3e94ac2c4 100644\n--- a/lib/librte_ring/rte_ring_c11_mem.h\n+++ b/lib/librte_ring/rte_ring_c11_mem.h\n@@ -1,5 +1,7 @@\n /* SPDX-License-Identifier: BSD-3-Clause\n  *\n+ * Copyright (c) 2018-2019 Intel Corporation\n+ * Copyright (c) 2018-2019 Arm Limited\n  * Copyright (c) 2017,2018 HXT-semitech Corporation.\n  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org\n  * All rights reserved.\n@@ -221,8 +223,8 @@ __rte_ring_move_prod_head_ptr(struct rte_ring *r, unsigned int is_sp,\n \t\t/* Ensure the head is read before tail */\n \t\t__atomic_thread_fence(__ATOMIC_ACQUIRE);\n \n-\t\t/* load-acquire synchronize with store-release of ht->tail\n-\t\t * in update_tail.\n+\t\t/* load-acquire synchronize with store-release of tail in\n+\t\t * __rte_ring_do_lf_dequeue_{sc, mc}.\n \t\t */\n \t\tcons_tail = __atomic_load_n(&r->cons_ptr.tail,\n \t\t\t\t\t__ATOMIC_ACQUIRE);\n@@ -247,6 +249,7 @@ __rte_ring_move_prod_head_ptr(struct rte_ring *r, unsigned int is_sp,\n \t\t\t\t\t0, __ATOMIC_RELAXED,\n \t\t\t\t\t__ATOMIC_RELAXED);\n \t} while (unlikely(success == 0));\n+\n \treturn n;\n }\n \n@@ -293,8 +296,8 @@ __rte_ring_move_cons_head_ptr(struct rte_ring *r, unsigned int is_sc,\n \t\t/* Ensure the head is read before tail */\n \t\t__atomic_thread_fence(__ATOMIC_ACQUIRE);\n \n-\t\t/* this load-acquire synchronize with store-release of ht->tail\n-\t\t * in update_tail.\n+\t\t/* load-acquire synchronize with store-release of tail in\n+\t\t * __rte_ring_do_lf_enqueue_{sp, mp}.\n \t\t */\n \t\tprod_tail = __atomic_load_n(&r->prod_ptr.tail,\n \t\t\t\t\t__ATOMIC_ACQUIRE);\n@@ -318,6 +321,361 @@ __rte_ring_move_cons_head_ptr(struct rte_ring *r, unsigned int is_sc,\n \t\t\t\t\t\t\t0, __ATOMIC_RELAXED,\n \t\t\t\t\t\t\t__ATOMIC_RELAXED);\n \t} while (unlikely(success == 0));\n+\n+\treturn n;\n+}\n+\n+/**\n+ * @internal\n+ *   Enqueue several objects on the lock-free ring (single-producer only)\n+ *\n+ * @param r\n+ *   A pointer to the ring structure.\n+ * @param obj_table\n+ *   A pointer to a table of void * pointers (objects).\n+ * @param n\n+ *   The number of objects to add in the ring from the obj_table.\n+ * @param behavior\n+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring\n+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring\n+ * @param free_space\n+ *   returns the amount of space after the enqueue operation has finished\n+ * @return\n+ *   Actual number of objects enqueued.\n+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.\n+ */\n+static __rte_always_inline unsigned int\n+__rte_ring_do_lf_enqueue_sp(struct rte_ring *r, void * const *obj_table,\n+\t\t\t    unsigned int n,\n+\t\t\t    enum rte_ring_queue_behavior behavior,\n+\t\t\t    unsigned int *free_space)\n+{\n+\tuint32_t free_entries;\n+\tuintptr_t head, next;\n+\n+\tn = __rte_ring_move_prod_head_ptr(r, 1, n, behavior,\n+\t\t\t\t\t  &head, &next, &free_entries);\n+\tif (n == 0)\n+\t\tgoto end;\n+\n+\tENQUEUE_PTRS_LF(r, &r->ring, head, obj_table, n);\n+\n+\t__atomic_store_n(&r->prod_ptr.tail,\n+\t\t\t r->prod_ptr.tail + n,\n+\t\t\t __ATOMIC_RELEASE);\n+end:\n+\tif (free_space != NULL)\n+\t\t*free_space = free_entries - n;\n+\treturn n;\n+}\n+\n+/* This macro defines the number of times an enqueueing thread can fail to find\n+ * a free ring slot before reloading its producer tail index.\n+ */\n+#define ENQ_RETRY_LIMIT 32\n+\n+/**\n+ * @internal\n+ *   Get the next producer tail index.\n+ *\n+ * @param r\n+ *   A pointer to the ring structure.\n+ * @param idx\n+ *   The local tail index\n+ * @return\n+ *   If the ring's tail is ahead of the local tail, return the shared tail.\n+ *   Else, return tail + 1.\n+ */\n+static __rte_always_inline uintptr_t\n+__rte_ring_lf_load_tail(struct rte_ring *r, uintptr_t idx)\n+{\n+\tuintptr_t fresh = __atomic_load_n(&r->prod_ptr.tail, __ATOMIC_RELAXED);\n+\n+\tif ((intptr_t)(idx - fresh) < 0)\n+\t\tidx = fresh; /* fresh is after idx, use it instead */\n+\telse\n+\t\tidx++; /* Continue with next slot */\n+\n+\treturn idx;\n+}\n+\n+/**\n+ * @internal\n+ *   Update the ring's producer tail index. If another thread already updated\n+ *   the index beyond the caller's tail value, do nothing.\n+ *\n+ * @param r\n+ *   A pointer to the ring structure.\n+ * @param idx\n+ *   The local tail index\n+ * @return\n+ *   If the shared tail is ahead of the local tail, return the shared tail.\n+ *   Else, return tail + 1.\n+ */\n+static __rte_always_inline void\n+__rte_ring_lf_update_tail(struct rte_ring *r, uintptr_t val)\n+{\n+\tvolatile uintptr_t *loc = &r->prod_ptr.tail;\n+\tuintptr_t old = __atomic_load_n(loc, __ATOMIC_RELAXED);\n+\n+\tdo {\n+\t\t/* Check if the tail has already been updated. */\n+\t\tif ((intptr_t)(val - old) < 0)\n+\t\t\treturn;\n+\n+\t\t/* Else val >= old, need to update *loc */\n+\t} while (!__atomic_compare_exchange_n(loc, &old, val,\n+\t\t\t\t\t      1, __ATOMIC_RELEASE,\n+\t\t\t\t\t      __ATOMIC_RELAXED));\n+}\n+\n+/**\n+ * @internal\n+ *   Enqueue several objects on the lock-free ring (multi-producer safe)\n+ *\n+ * @param r\n+ *   A pointer to the ring structure.\n+ * @param obj_table\n+ *   A pointer to a table of void * pointers (objects).\n+ * @param n\n+ *   The number of objects to add in the ring from the obj_table.\n+ * @param behavior\n+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring\n+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring\n+ * @param free_space\n+ *   returns the amount of space after the enqueue operation has finished\n+ * @return\n+ *   Actual number of objects enqueued.\n+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.\n+ */\n+static __rte_always_inline unsigned int\n+__rte_ring_do_lf_enqueue_mp(struct rte_ring *r, void * const *obj_table,\n+\t\t\t    unsigned int n,\n+\t\t\t    enum rte_ring_queue_behavior behavior,\n+\t\t\t    unsigned int *free_space)\n+{\n+#if !defined(ALLOW_EXPERIMENTAL_API)\n+\tRTE_SET_USED(r);\n+\tRTE_SET_USED(obj_table);\n+\tRTE_SET_USED(n);\n+\tRTE_SET_USED(behavior);\n+\tRTE_SET_USED(free_space);\n+\tprintf(\"[%s()] RING_F_LF requires an experimental API.\"\n+\t       \" Recompile with ALLOW_EXPERIMENTAL_API to use it.\\n\"\n+\t       , __func__);\n+\treturn 0;\n+#else\n+\tstruct rte_ring_lf_entry *base;\n+\tuintptr_t head, next, tail;\n+\tunsigned int i;\n+\tuint32_t avail;\n+\n+\t/* Atomically update the prod head to reserve n slots. The prod tail\n+\t * is modified at the end of the function.\n+\t */\n+\tn = __rte_ring_move_prod_head_ptr(r, 0, n, behavior,\n+\t\t\t\t\t  &head, &next, &avail);\n+\n+\ttail = __atomic_load_n(&r->prod_ptr.tail, __ATOMIC_RELAXED);\n+\thead = __atomic_load_n(&r->cons_ptr.tail, __ATOMIC_ACQUIRE);\n+\n+\tif (unlikely(n == 0))\n+\t\tgoto end;\n+\n+\tbase = (struct rte_ring_lf_entry *)&r->ring;\n+\n+\tfor (i = 0; i < n; i++) {\n+\t\tunsigned int retries = 0;\n+\t\tint success = 0;\n+\n+\t\t/* Enqueue to the tail entry. If another thread wins the race,\n+\t\t * retry with the new tail.\n+\t\t */\n+\t\tdo {\n+\t\t\tstruct rte_ring_lf_entry old_value, new_value;\n+\t\t\tstruct rte_ring_lf_entry *ring_ptr;\n+\n+\t\t\tring_ptr = &base[tail & r->mask];\n+\n+\t\t\told_value = *ring_ptr;\n+\n+\t\t\tif (old_value.cnt != (tail >> r->log2_size)) {\n+\t\t\t\t/* This slot has already been used. Depending\n+\t\t\t\t * on how far behind this thread is, either go\n+\t\t\t\t * to the next slot or reload the tail.\n+\t\t\t\t */\n+\t\t\t\tuintptr_t next_tail;\n+\n+\t\t\t\tnext_tail = (tail + r->size) >> r->log2_size;\n+\n+\t\t\t\tif (old_value.cnt != next_tail ||\n+\t\t\t\t    ++retries == ENQ_RETRY_LIMIT) {\n+\t\t\t\t\t/* This thread either fell 2+ laps\n+\t\t\t\t\t * behind or hit the retry limit, so\n+\t\t\t\t\t * reload the tail index.\n+\t\t\t\t\t */\n+\t\t\t\t\ttail = __rte_ring_lf_load_tail(r, tail);\n+\t\t\t\t\tretries = 0;\n+\t\t\t\t} else {\n+\t\t\t\t\t/* Slot already used, try the next. */\n+\t\t\t\t\ttail++;\n+\n+\t\t\t\t}\n+\n+\t\t\t\tcontinue;\n+\t\t\t}\n+\n+\t\t\t/* Found a free slot, try to enqueue next element. */\n+\t\t\tnew_value.ptr = obj_table[i];\n+\t\t\tnew_value.cnt = (tail + r->size) >> r->log2_size;\n+\n+#ifdef RTE_ARCH_64\n+\t\t\tsuccess = rte_atomic128_cmp_exchange(\n+\t\t\t\t\t(rte_int128_t *)ring_ptr,\n+\t\t\t\t\t(rte_int128_t *)&old_value,\n+\t\t\t\t\t(rte_int128_t *)&new_value,\n+\t\t\t\t\t1, __ATOMIC_RELEASE,\n+\t\t\t\t\t__ATOMIC_RELAXED);\n+#else\n+\t\t\tsuccess = __atomic_compare_exchange(\n+\t\t\t\t\t(uint64_t *)ring_ptr,\n+\t\t\t\t\t&old_value,\n+\t\t\t\t\t&new_value,\n+\t\t\t\t\t1, __ATOMIC_RELEASE,\n+\t\t\t\t\t__ATOMIC_RELAXED);\n+#endif\n+\t\t} while (success == 0);\n+\n+\t\t/* Only increment tail if the CAS succeeds, since it can\n+\t\t * spuriously fail on some architectures.\n+\t\t */\n+\t\ttail++;\n+\t}\n+\n+end:\n+\t__rte_ring_lf_update_tail(r, tail);\n+\n+\tif (free_space != NULL)\n+\t\t*free_space = avail - n;\n+\treturn n;\n+#endif\n+}\n+\n+/**\n+ * @internal\n+ *   Dequeue several objects from the lock-free ring (single-consumer only)\n+ *\n+ * @param r\n+ *   A pointer to the ring structure.\n+ * @param obj_table\n+ *   A pointer to a table of void * pointers (objects).\n+ * @param n\n+ *   The number of objects to pull from the ring.\n+ * @param behavior\n+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring\n+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring\n+ * @param available\n+ *   returns the number of remaining ring entries after the dequeue has finished\n+ * @return\n+ *   - Actual number of objects dequeued.\n+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.\n+ */\n+static __rte_always_inline unsigned int\n+__rte_ring_do_lf_dequeue_sc(struct rte_ring *r, void **obj_table,\n+\t\t\t    unsigned int n,\n+\t\t\t    enum rte_ring_queue_behavior behavior,\n+\t\t\t    unsigned int *available)\n+{\n+\tuintptr_t cons_tail, prod_tail, avail;\n+\n+\tcons_tail = __atomic_load_n(&r->cons_ptr.tail, __ATOMIC_RELAXED);\n+\tprod_tail = __atomic_load_n(&r->prod_ptr.tail, __ATOMIC_ACQUIRE);\n+\n+\tavail = prod_tail - cons_tail;\n+\n+\t/* Set the actual entries for dequeue */\n+\tif (unlikely(avail < n))\n+\t\tn = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail;\n+\n+\tif (unlikely(n == 0))\n+\t\tgoto end;\n+\n+\tDEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n);\n+\n+\t/* Use a read barrier and store-relaxed so we don't unnecessarily order\n+\t * writes.\n+\t */\n+\trte_smp_rmb();\n+\n+\t__atomic_store_n(&r->cons_ptr.tail, cons_tail + n, __ATOMIC_RELAXED);\n+end:\n+\tif (available != NULL)\n+\t\t*available = avail - n;\n+\n+\treturn n;\n+}\n+\n+/**\n+ * @internal\n+ *   Dequeue several objects from the lock-free ring (multi-consumer safe)\n+ *\n+ * @param r\n+ *   A pointer to the ring structure.\n+ * @param obj_table\n+ *   A pointer to a table of void * pointers (objects).\n+ * @param n\n+ *   The number of objects to pull from the ring.\n+ * @param behavior\n+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring\n+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring\n+ * @param available\n+ *   returns the number of remaining ring entries after the dequeue has finished\n+ * @return\n+ *   - Actual number of objects dequeued.\n+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.\n+ */\n+static __rte_always_inline unsigned int\n+__rte_ring_do_lf_dequeue_mc(struct rte_ring *r, void **obj_table,\n+\t\t\t    unsigned int n,\n+\t\t\t    enum rte_ring_queue_behavior behavior,\n+\t\t\t    unsigned int *available)\n+{\n+\tuintptr_t cons_tail, prod_tail, avail;\n+\n+\tcons_tail = __atomic_load_n(&r->cons_ptr.tail, __ATOMIC_RELAXED);\n+\n+\tdo {\n+\t\t/* Load tail on every iteration to avoid spurious queue empty\n+\t\t * situations.\n+\t\t */\n+\t\tprod_tail = __atomic_load_n(&r->prod_ptr.tail,\n+\t\t\t\t\t    __ATOMIC_ACQUIRE);\n+\n+\t\tavail = prod_tail - cons_tail;\n+\n+\t\t/* Set the actual entries for dequeue */\n+\t\tif (unlikely(avail < n))\n+\t\t\tn = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail;\n+\n+\t\tif (unlikely(n == 0))\n+\t\t\tgoto end;\n+\n+\t\tDEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n);\n+\n+\t\t/* Use a read barrier and store-relaxed so we don't\n+\t\t * unnecessarily order writes.\n+\t\t */\n+\t\trte_smp_rmb();\n+\n+\t} while (!__atomic_compare_exchange_n(&r->cons_ptr.tail,\n+\t\t\t\t\t      &cons_tail, cons_tail + n,\n+\t\t\t\t\t      0, __ATOMIC_RELAXED,\n+\t\t\t\t\t      __ATOMIC_RELAXED));\n+\n+end:\n+\tif (available != NULL)\n+\t\t*available = avail - n;\n+\n \treturn n;\n }\n \ndiff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h\nindex 6a0e1bbfb..322d003ad 100644\n--- a/lib/librte_ring/rte_ring_generic.h\n+++ b/lib/librte_ring/rte_ring_generic.h\n@@ -1,6 +1,7 @@\n /* SPDX-License-Identifier: BSD-3-Clause\n  *\n- * Copyright (c) 2010-2017 Intel Corporation\n+ * Copyright (c) 2010-2019 Intel Corporation\n+ * Copyright (c) 2018-2019 Arm Limited\n  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org\n  * All rights reserved.\n  * Derived from FreeBSD's bufring.h\n@@ -297,4 +298,358 @@ __rte_ring_move_cons_head_ptr(struct rte_ring *r, unsigned int is_sc,\n \treturn n;\n }\n \n+/**\n+ * @internal\n+ *   Enqueue several objects on the lock-free ring (single-producer only)\n+ *\n+ * @param r\n+ *   A pointer to the ring structure.\n+ * @param obj_table\n+ *   A pointer to a table of void * pointers (objects).\n+ * @param n\n+ *   The number of objects to add in the ring from the obj_table.\n+ * @param behavior\n+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring\n+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring\n+ * @param free_space\n+ *   returns the amount of space after the enqueue operation has finished\n+ * @return\n+ *   Actual number of objects enqueued.\n+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.\n+ */\n+static __rte_always_inline unsigned int\n+__rte_ring_do_lf_enqueue_sp(struct rte_ring *r, void * const *obj_table,\n+\t\t\t    unsigned int n,\n+\t\t\t    enum rte_ring_queue_behavior behavior,\n+\t\t\t    unsigned int *free_space)\n+{\n+\tuint32_t free_entries;\n+\tuintptr_t head, next;\n+\n+\tn = __rte_ring_move_prod_head_ptr(r, 1, n, behavior,\n+\t\t\t\t\t  &head, &next, &free_entries);\n+\tif (n == 0)\n+\t\tgoto end;\n+\n+\tENQUEUE_PTRS_LF(r, &r->ring, head, obj_table, n);\n+\n+\trte_smp_wmb();\n+\n+\tr->prod_ptr.tail += n;\n+end:\n+\tif (free_space != NULL)\n+\t\t*free_space = free_entries - n;\n+\treturn n;\n+}\n+\n+/* This macro defines the number of times an enqueueing thread can fail to find\n+ * a free ring slot before reloading its producer tail index.\n+ */\n+#define ENQ_RETRY_LIMIT 32\n+\n+/**\n+ * @internal\n+ *   Get the next producer tail index.\n+ *\n+ * @param r\n+ *   A pointer to the ring structure.\n+ * @param idx\n+ *   The local tail index\n+ * @return\n+ *   If the ring's tail is ahead of the local tail, return the shared tail.\n+ *   Else, return tail + 1.\n+ */\n+static __rte_always_inline uintptr_t\n+__rte_ring_lf_load_tail(struct rte_ring *r, uintptr_t idx)\n+{\n+\tuintptr_t fresh = r->prod_ptr.tail;\n+\n+\tif ((intptr_t)(idx - fresh) < 0)\n+\t\t/* fresh is after idx, use it instead */\n+\t\tidx = fresh;\n+\telse\n+\t\t/* Continue with next slot */\n+\t\tidx++;\n+\n+\treturn idx;\n+}\n+\n+/**\n+ * @internal\n+ *   Update the ring's producer tail index. If another thread already updated\n+ *   the index beyond the caller's tail value, do nothing.\n+ *\n+ * @param r\n+ *   A pointer to the ring structure.\n+ * @param idx\n+ *   The local tail index\n+ * @return\n+ *   If the shared tail is ahead of the local tail, return the shared tail.\n+ *   Else, return tail + 1.\n+ */\n+static __rte_always_inline void\n+__rte_ring_lf_update_tail(struct rte_ring *r, uintptr_t val)\n+{\n+\tvolatile uintptr_t *loc = &r->prod_ptr.tail;\n+\tuintptr_t old;\n+\n+\tdo {\n+\t\told = *loc;\n+\n+\t\t/* Check if the tail has already been updated. */\n+\t\tif ((intptr_t)(val - old) < 0)\n+\t\t\treturn;\n+\n+\t\t/* Else val >= old, need to update *loc */\n+\t} while (!__sync_bool_compare_and_swap(loc, old, val));\n+}\n+\n+/**\n+ * @internal\n+ *   Enqueue several objects on the lock-free ring (multi-producer safe)\n+ *\n+ * @param r\n+ *   A pointer to the ring structure.\n+ * @param obj_table\n+ *   A pointer to a table of void * pointers (objects).\n+ * @param n\n+ *   The number of objects to add in the ring from the obj_table.\n+ * @param behavior\n+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring\n+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring\n+ * @param free_space\n+ *   returns the amount of space after the enqueue operation has finished\n+ * @return\n+ *   Actual number of objects enqueued.\n+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.\n+ */\n+static __rte_always_inline unsigned int\n+__rte_ring_do_lf_enqueue_mp(struct rte_ring *r, void * const *obj_table,\n+\t\t\t    unsigned int n,\n+\t\t\t    enum rte_ring_queue_behavior behavior,\n+\t\t\t    unsigned int *free_space)\n+{\n+#if !defined(ALLOW_EXPERIMENTAL_API)\n+\tRTE_SET_USED(r);\n+\tRTE_SET_USED(obj_table);\n+\tRTE_SET_USED(n);\n+\tRTE_SET_USED(behavior);\n+\tRTE_SET_USED(free_space);\n+\tprintf(\"[%s()] RING_F_LF requires an experimental API.\"\n+\t       \" Recompile with ALLOW_EXPERIMENTAL_API to use it.\\n\"\n+\t       , __func__);\n+\treturn 0;\n+#else\n+\tstruct rte_ring_lf_entry *base;\n+\tuintptr_t head, next, tail;\n+\tunsigned int i;\n+\tuint32_t avail;\n+\n+\t/* Atomically update the prod head to reserve n slots. The prod tail\n+\t * is modified at the end of the function.\n+\t */\n+\tn = __rte_ring_move_prod_head_ptr(r, 0, n, behavior,\n+\t\t\t\t\t  &head, &next, &avail);\n+\n+\ttail = r->prod_ptr.tail;\n+\n+\trte_smp_rmb();\n+\n+\thead = r->cons_ptr.tail;\n+\n+\tif (unlikely(n == 0))\n+\t\tgoto end;\n+\n+\tbase = (struct rte_ring_lf_entry *)&r->ring;\n+\n+\tfor (i = 0; i < n; i++) {\n+\t\tunsigned int retries = 0;\n+\t\tint success = 0;\n+\n+\t\t/* Enqueue to the tail entry. If another thread wins the race,\n+\t\t * retry with the new tail.\n+\t\t */\n+\t\tdo {\n+\t\t\tstruct rte_ring_lf_entry old_value, new_value;\n+\t\t\tstruct rte_ring_lf_entry *ring_ptr;\n+\n+\t\t\tring_ptr = &base[tail & r->mask];\n+\n+\t\t\told_value = *ring_ptr;\n+\n+\t\t\tif (old_value.cnt != (tail >> r->log2_size)) {\n+\t\t\t\t/* This slot has already been used. Depending\n+\t\t\t\t * on how far behind this thread is, either go\n+\t\t\t\t * to the next slot or reload the tail.\n+\t\t\t\t */\n+\t\t\t\tuintptr_t next_tail;\n+\n+\t\t\t\tnext_tail = (tail + r->size) >> r->log2_size;\n+\n+\t\t\t\tif (old_value.cnt != next_tail ||\n+\t\t\t\t    ++retries == ENQ_RETRY_LIMIT) {\n+\t\t\t\t\t/* This thread either fell 2+ laps\n+\t\t\t\t\t * behind or hit the retry limit, so\n+\t\t\t\t\t * reload the tail index.\n+\t\t\t\t\t */\n+\t\t\t\t\ttail = __rte_ring_lf_load_tail(r, tail);\n+\t\t\t\t\tretries = 0;\n+\t\t\t\t} else {\n+\t\t\t\t\t/* Slot already used, try the next. */\n+\t\t\t\t\ttail++;\n+\n+\t\t\t\t}\n+\n+\t\t\t\tcontinue;\n+\t\t\t}\n+\n+\t\t\t/* Found a free slot, try to enqueue next element. */\n+\t\t\tnew_value.ptr = obj_table[i];\n+\t\t\tnew_value.cnt = (tail + r->size) >> r->log2_size;\n+\n+#ifdef RTE_ARCH_64\n+\t\t\tsuccess = rte_atomic128_cmp_exchange(\n+\t\t\t\t\t(rte_int128_t *)ring_ptr,\n+\t\t\t\t\t(rte_int128_t *)&old_value,\n+\t\t\t\t\t(rte_int128_t *)&new_value,\n+\t\t\t\t\t1, __ATOMIC_RELEASE,\n+\t\t\t\t\t__ATOMIC_RELAXED);\n+#else\n+\t\t\tuint64_t *old_ptr = (uint64_t *)&old_value;\n+\t\t\tuint64_t *new_ptr = (uint64_t *)&new_value;\n+\n+\t\t\tsuccess = rte_atomic64_cmpset(\n+\t\t\t\t\t(volatile uint64_t *)ring_ptr,\n+\t\t\t\t\t*old_ptr, *new_ptr);\n+#endif\n+\t\t} while (success == 0);\n+\n+\t\t/* Only increment tail if the CAS succeeds, since it can\n+\t\t * spuriously fail on some architectures.\n+\t\t */\n+\t\ttail++;\n+\t}\n+\n+end:\n+\n+\t__rte_ring_lf_update_tail(r, tail);\n+\n+\tif (free_space != NULL)\n+\t\t*free_space = avail - n;\n+\treturn n;\n+#endif\n+}\n+\n+/**\n+ * @internal\n+ *   Dequeue several objects from the lock-free ring (single-consumer only)\n+ *\n+ * @param r\n+ *   A pointer to the ring structure.\n+ * @param obj_table\n+ *   A pointer to a table of void * pointers (objects).\n+ * @param n\n+ *   The number of objects to pull from the ring.\n+ * @param behavior\n+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring\n+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring\n+ * @param available\n+ *   returns the number of remaining ring entries after the dequeue has finished\n+ * @return\n+ *   - Actual number of objects dequeued.\n+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.\n+ */\n+static __rte_always_inline unsigned int\n+__rte_ring_do_lf_dequeue_sc(struct rte_ring *r, void **obj_table,\n+\t\t\t    unsigned int n,\n+\t\t\t    enum rte_ring_queue_behavior behavior,\n+\t\t\t    unsigned int *available)\n+{\n+\tuintptr_t cons_tail, prod_tail, avail;\n+\n+\tcons_tail = r->cons_ptr.tail;\n+\n+\trte_smp_rmb();\n+\n+\tprod_tail = r->prod_ptr.tail;\n+\n+\tavail = prod_tail - cons_tail;\n+\n+\t/* Set the actual entries for dequeue */\n+\tif (unlikely(avail < n))\n+\t\tn = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail;\n+\n+\tif (unlikely(n == 0))\n+\t\tgoto end;\n+\n+\tDEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n);\n+\n+\trte_smp_rmb();\n+\n+\tr->cons_ptr.tail += n;\n+end:\n+\tif (available != NULL)\n+\t\t*available = avail - n;\n+\n+\treturn n;\n+}\n+\n+/**\n+ * @internal\n+ *   Dequeue several objects from the lock-free ring (multi-consumer safe)\n+ *\n+ * @param r\n+ *   A pointer to the ring structure.\n+ * @param obj_table\n+ *   A pointer to a table of void * pointers (objects).\n+ * @param n\n+ *   The number of objects to pull from the ring.\n+ * @param behavior\n+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring\n+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring\n+ * @param available\n+ *   returns the number of remaining ring entries after the dequeue has finished\n+ * @return\n+ *   - Actual number of objects dequeued.\n+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.\n+ */\n+static __rte_always_inline unsigned int\n+__rte_ring_do_lf_dequeue_mc(struct rte_ring *r, void **obj_table,\n+\t\t\t    unsigned int n,\n+\t\t\t    enum rte_ring_queue_behavior behavior,\n+\t\t\t    unsigned int *available)\n+{\n+\tuintptr_t cons_tail, prod_tail, avail;\n+\n+\tdo {\n+\t\tcons_tail = r->cons_ptr.tail;\n+\n+\t\trte_smp_rmb();\n+\n+\t\t/* Load tail on every iteration to avoid spurious queue empty\n+\t\t * situations.\n+\t\t */\n+\t\tprod_tail = r->prod_ptr.tail;\n+\n+\t\tavail = prod_tail - cons_tail;\n+\n+\t\t/* Set the actual entries for dequeue */\n+\t\tif (unlikely(avail < n))\n+\t\t\tn = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail;\n+\n+\t\tif (unlikely(n == 0))\n+\t\t\tgoto end;\n+\n+\t\tDEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n);\n+\n+\t} while (!__sync_bool_compare_and_swap(&r->cons_ptr.tail,\n+\t\t\t\t\t       cons_tail, cons_tail + n));\n+\n+end:\n+\tif (available != NULL)\n+\t\t*available = avail - n;\n+\n+\treturn n;\n+}\n+\n #endif /* _RTE_RING_GENERIC_H_ */\ndiff --git a/lib/librte_ring/rte_ring_version.map b/lib/librte_ring/rte_ring_version.map\nindex d935efd0d..8969467af 100644\n--- a/lib/librte_ring/rte_ring_version.map\n+++ b/lib/librte_ring/rte_ring_version.map\n@@ -17,3 +17,10 @@ DPDK_2.2 {\n \trte_ring_free;\n \n } DPDK_2.0;\n+\n+DPDK_19.05 {\n+\tglobal:\n+\n+\trte_ring_get_memsize;\n+\n+} DPDK_2.2;\n",
    "prefixes": [
        "v8",
        "3/6"
    ]
}