get:
Show a patch.

patch:
Update a patch.

put:
Update a patch.

GET /api/patches/81167/?format=api
HTTP 200 OK
Allow: GET, PUT, PATCH, HEAD, OPTIONS
Content-Type: application/json
Vary: Accept

{
    "id": 81167,
    "url": "http://patches.dpdk.org/api/patches/81167/?format=api",
    "web_url": "http://patches.dpdk.org/project/dpdk/patch/20201017030701.16134-5-l.wojciechow@partner.samsung.com/",
    "project": {
        "id": 1,
        "url": "http://patches.dpdk.org/api/projects/1/?format=api",
        "name": "DPDK",
        "link_name": "dpdk",
        "list_id": "dev.dpdk.org",
        "list_email": "dev@dpdk.org",
        "web_url": "http://core.dpdk.org",
        "scm_url": "git://dpdk.org/dpdk",
        "webscm_url": "http://git.dpdk.org/dpdk",
        "list_archive_url": "https://inbox.dpdk.org/dev",
        "list_archive_url_format": "https://inbox.dpdk.org/dev/{}",
        "commit_url_format": ""
    },
    "msgid": "<20201017030701.16134-5-l.wojciechow@partner.samsung.com>",
    "list_archive_url": "https://inbox.dpdk.org/dev/20201017030701.16134-5-l.wojciechow@partner.samsung.com",
    "date": "2020-10-17T03:06:48",
    "name": "[v8,04/17] distributor: handle worker shutdown in burst mode",
    "commit_ref": null,
    "pull_url": null,
    "state": "accepted",
    "archived": true,
    "hash": "ce33ee7b3e2dba4998ef0375e6ba70e2527b0295",
    "submitter": {
        "id": 1628,
        "url": "http://patches.dpdk.org/api/people/1628/?format=api",
        "name": "Lukasz Wojciechowski",
        "email": "l.wojciechow@partner.samsung.com"
    },
    "delegate": {
        "id": 24651,
        "url": "http://patches.dpdk.org/api/users/24651/?format=api",
        "username": "dmarchand",
        "first_name": "David",
        "last_name": "Marchand",
        "email": "david.marchand@redhat.com"
    },
    "mbox": "http://patches.dpdk.org/project/dpdk/patch/20201017030701.16134-5-l.wojciechow@partner.samsung.com/mbox/",
    "series": [
        {
            "id": 13072,
            "url": "http://patches.dpdk.org/api/series/13072/?format=api",
            "web_url": "http://patches.dpdk.org/project/dpdk/list/?series=13072",
            "date": "2020-10-17T03:06:44",
            "name": "fix distributor synchronization issues",
            "version": 8,
            "mbox": "http://patches.dpdk.org/series/13072/mbox/"
        }
    ],
    "comments": "http://patches.dpdk.org/api/patches/81167/comments/",
    "check": "success",
    "checks": "http://patches.dpdk.org/api/patches/81167/checks/",
    "tags": {},
    "related": [],
    "headers": {
        "Return-Path": "<dev-bounces@dpdk.org>",
        "X-Original-To": "patchwork@inbox.dpdk.org",
        "Delivered-To": "patchwork@inbox.dpdk.org",
        "Received": [
            "from dpdk.org (dpdk.org [92.243.14.124])\n\tby inbox.dpdk.org (Postfix) with ESMTP id A415BA04DB;\n\tSat, 17 Oct 2020 05:08:36 +0200 (CEST)",
            "from [92.243.14.124] (localhost [127.0.0.1])\n\tby dpdk.org (Postfix) with ESMTP id 4483EE2AB;\n\tSat, 17 Oct 2020 05:07:41 +0200 (CEST)",
            "from mailout1.w1.samsung.com (mailout1.w1.samsung.com\n [210.118.77.11]) by dpdk.org (Postfix) with ESMTP id 3CA2DE29E\n for <dev@dpdk.org>; Sat, 17 Oct 2020 05:07:31 +0200 (CEST)",
            "from eucas1p1.samsung.com (unknown [182.198.249.206])\n by mailout1.w1.samsung.com (KnoxPortal) with ESMTP id\n 20201017030720euoutp0101b4efac82adb84a1b275ea636a805bb~_qLtdc3LP1627716277euoutp01U\n for <dev@dpdk.org>; Sat, 17 Oct 2020 03:07:20 +0000 (GMT)",
            "from eusmges3new.samsung.com (unknown [203.254.199.245]) by\n eucas1p2.samsung.com (KnoxPortal) with ESMTP id\n 20201017030713eucas1p2740872b5aade23eb8f922eaf5e22ee83~_qLnhlT1g2440824408eucas1p2w;\n Sat, 17 Oct 2020 03:07:13 +0000 (GMT)",
            "from eucas1p1.samsung.com ( [182.198.249.206]) by\n eusmges3new.samsung.com (EUCPMTA) with SMTP id E7.D5.06318.1EF5A8F5; Sat, 17\n Oct 2020 04:07:13 +0100 (BST)",
            "from eusmtrp1.samsung.com (unknown [182.198.249.138]) by\n eucas1p1.samsung.com (KnoxPortal) with ESMTPA id\n 20201017030712eucas1p1ce19efadc60ed2888dc615cbb2549bdc~_qLmhjR0U2253222532eucas1p17;\n Sat, 17 Oct 2020 03:07:12 +0000 (GMT)",
            "from eusmgms2.samsung.com (unknown [182.198.249.180]) by\n eusmtrp1.samsung.com (KnoxPortal) with ESMTP id\n 20201017030712eusmtrp1028ad8c0eef5a9511bdf965ed1336934~_qLmhCawn3018830188eusmtrp1U;\n Sat, 17 Oct 2020 03:07:12 +0000 (GMT)",
            "from eusmtip1.samsung.com ( [203.254.199.221]) by\n eusmgms2.samsung.com (EUCPMTA) with SMTP id 0D.DE.06017.0EF5A8F5; Sat, 17\n Oct 2020 04:07:12 +0100 (BST)",
            "from localhost.localdomain (unknown [106.210.88.70]) by\n eusmtip1.samsung.com (KnoxPortal) with ESMTPA id\n 20201017030711eusmtip12127a0ab18f1695d30587d8cf12ce849~_qLl3oN7z0424404244eusmtip1N;\n Sat, 17 Oct 2020 03:07:11 +0000 (GMT)"
        ],
        "DKIM-Filter": "OpenDKIM Filter v2.11.0 mailout1.w1.samsung.com\n 20201017030720euoutp0101b4efac82adb84a1b275ea636a805bb~_qLtdc3LP1627716277euoutp01U",
        "DKIM-Signature": "v=1; a=rsa-sha256; c=relaxed/relaxed; d=samsung.com;\n s=mail20170921; t=1602904040;\n bh=4Fg38YzAOQjDzzsTx+jLEB0J7F07DOC8klhoNTn9h6M=;\n h=From:To:Cc:Subject:Date:In-Reply-To:References:From;\n b=Mj6jDmB5YgHoCDqpBYhJNM2Xi+RqLc48TXTEqGDPCigWtBVg1UykNE60kjjtHzRJL\n lc6GFoYlF93wnFAz5hz3WIw4t5eWHU2tn7GjfPNdf6/XvUI9PhB7VFRQko8ngBVRje\n uoX82dFjFiDBZVuL9Ajm+iYY6h6U4yIbEbAsaNec=",
        "X-AuditID": "cbfec7f5-371ff700000018ae-91-5f8a5fe170b8",
        "From": "Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>",
        "To": "David Hunt <david.hunt@intel.com>, Bruce Richardson\n <bruce.richardson@intel.com>",
        "Cc": "dev@dpdk.org, l.wojciechow@partner.samsung.com, stable@dpdk.org",
        "Date": "Sat, 17 Oct 2020 05:06:48 +0200",
        "Message-Id": "<20201017030701.16134-5-l.wojciechow@partner.samsung.com>",
        "X-Mailer": "git-send-email 2.17.1",
        "In-Reply-To": "<20201017030701.16134-1-l.wojciechow@partner.samsung.com>",
        "X-Brightmail-Tracker": [
            "\n H4sIAAAAAAAAA+NgFnrCIsWRmVeSWpSXmKPExsWy7djPc7oP47viDWa8UrW4screom/SRyaL\n d5+2M1k861nHaPGv4w+7A6vHrwVLWT0W73nJ5HHw3R6mAOYoLpuU1JzMstQifbsErowL1yYz\n FlzOr5h16i1TA+OTiC5GTg4JAROJTXOOMncxcnEICaxglJj9cSYbhPOFUeLOwTOsIFVCAp8Z\n JaZfMoPpuHe2BapjOaPE61nPmOCKWjaygNhsArYSR2Z+BWsWEQiTaG7eCxTn4GAWcJZ48pUN\n JCws4C3x+P5KJpAwi4CqRM89TZAwr4CrxNZfM5ghVslLrN5wgBmkhFPATaLxuCvIVgmB62wS\n /642sEDUuEjsv7acHcIWlnh1fAuULSNxenIPC0TDNkaJq79/MkI4+xklrveugKqyljj87zcb\n xG2aEut36UOEHSVaz4DUcwDZfBI33gqChJmBzEnbpjNDhHklOtqEIKr1JJ72TGWEWftn7ROo\n 0zwkphzfAQ3Oq4wSDz9MY53AKD8LYdkCRsZVjOKppcW56anFxnmp5XrFibnFpXnpesn5uZsY\n gXF/+t/xrzsY9/1JOsQowMGoxMPLsbQzXog1say4MvcQowQHs5IIr9PZ03FCvCmJlVWpRfnx\n RaU5qcWHGKU5WJTEeY0XvYwVEkhPLEnNTk0tSC2CyTJxcEo1MBb23gwwnCaTf8l5Bt8PseJ1\n 2j9vHxJit9fLUal+8uRUeZWb6f8ZtxKZhNw19kmeVb7yKCzh2oLjh9b+1z/oJ3V1bdyDvRxx\n aV/3GS10vua9cdGKE/w78+0mH+eR1f1xTnkrZx5//qE35nnZPcaOtxe2K+W+vnMwqGbKV9FS\n 6XlZzWK1c9JM/imxFGckGmoxFxUnAgCsw4n59wIAAA==",
            "\n H4sIAAAAAAAAA+NgFjrGLMWRmVeSWpSXmKPExsVy+t/xu7oP4rviDc7dULC4screom/SRyaL\n d5+2M1k861nHaPGv4w+7A6vHrwVLWT0W73nJ5HHw3R6mAOYoPZui/NKSVIWM/OISW6VoQwsj\n PUNLCz0jE0s9Q2PzWCsjUyV9O5uU1JzMstQifbsEvYwL1yYzFlzOr5h16i1TA+OTiC5GTg4J\n AROJe2dbmLsYuTiEBJYyStw/8YCpi5EDKCEj8eGSAESNsMSfa11sEDUfGSXWHJnMDpJgE7CV\n ODLzKytIvYhAmMSJlf4gYWYBd4kti6cyg9jCAt4Sj++vBBvJIqAq0XNPEyTMK+AqsfXXDGaI\n 8fISqzccYAYp4RRwk2g87gqxqZFR4uysdSwTGPkWMDKsYhRJLS3OTc8tNtIrTswtLs1L10vO\n z93ECAzDbcd+btnB2PUu+BCjAAejEg/vhkWd8UKsiWXFlbmHGCU4mJVEeJ3Ono4T4k1JrKxK\n LcqPLyrNSS0+xGgKdNNEZinR5HxgjOSVxBuaGppbWBqaG5sbm1koifN2CByMERJITyxJzU5N\n LUgtgulj4uCUamCM27VNnIu7a77PzyYfiQlF8nYMBfv6bQRDM86U1l486PG37v8luzPp/F5z\n K68+ObEpr9zvSm2ToLP2huyAilvX/Lx42SX3/dMT+Wvpo3tNbcpr3h+HhJbkTsiI8L5+X8U7\n /5VWgMLaUr5ZN302xapwvpA2Zzn7t+uJfvzxnZv2yvGGMFX+3aTEUpyRaKjFXFScCADupt30\n WQIAAA=="
        ],
        "X-CMS-MailID": "20201017030712eucas1p1ce19efadc60ed2888dc615cbb2549bdc",
        "X-Msg-Generator": "CA",
        "Content-Type": "text/plain; charset=\"utf-8\"",
        "X-RootMTR": "20201017030712eucas1p1ce19efadc60ed2888dc615cbb2549bdc",
        "X-EPHeader": "CA",
        "CMS-TYPE": "201P",
        "X-CMS-RootMailID": "20201017030712eucas1p1ce19efadc60ed2888dc615cbb2549bdc",
        "References": "<20201010160508.19709-1-l.wojciechow@partner.samsung.com>\n <20201017030701.16134-1-l.wojciechow@partner.samsung.com>\n <CGME20201017030712eucas1p1ce19efadc60ed2888dc615cbb2549bdc@eucas1p1.samsung.com>",
        "Subject": "[dpdk-dev] [PATCH v8 04/17] distributor: handle worker shutdown in\n\tburst mode",
        "X-BeenThere": "dev@dpdk.org",
        "X-Mailman-Version": "2.1.15",
        "Precedence": "list",
        "List-Id": "DPDK patches and discussions <dev.dpdk.org>",
        "List-Unsubscribe": "<https://mails.dpdk.org/options/dev>,\n <mailto:dev-request@dpdk.org?subject=unsubscribe>",
        "List-Archive": "<http://mails.dpdk.org/archives/dev/>",
        "List-Post": "<mailto:dev@dpdk.org>",
        "List-Help": "<mailto:dev-request@dpdk.org?subject=help>",
        "List-Subscribe": "<https://mails.dpdk.org/listinfo/dev>,\n <mailto:dev-request@dpdk.org?subject=subscribe>",
        "Errors-To": "dev-bounces@dpdk.org",
        "Sender": "\"dev\" <dev-bounces@dpdk.org>"
    },
    "content": "The burst version of distributor implementation was missing proper\nhandling of worker shutdown. A worker processing packets received\nfrom distributor can call rte_distributor_return_pkt() function\ninforming distributor that it want no more packets. Further calls to\nrte_distributor_request_pkt() or rte_distributor_get_pkt() however\nshould inform distributor that new packets are requested again.\n\nLack of the proper implementation has caused that even after worker\ninformed about returning last packets, new packets were still sent\nfrom distributor causing deadlocks as no one could get them on worker\nside.\n\nThis patch adds handling shutdown of the worker in following way:\n1) It fixes usage of RTE_DISTRIB_VALID_BUF handshake flag. This flag\nwas formerly unused in burst implementation and now it is used\nfor marking valid packets in retptr64 replacing invalid use\nof RTE_DISTRIB_RETURN_BUF flag.\n2) Uses RTE_DISTRIB_RETURN_BUF as a worker to distributor handshake\nin retptr64 to indicate that worker has shutdown.\n3) Worker that shuts down blocks also bufptr for itself with\nRTE_DISTRIB_RETURN_BUF flag allowing distributor to retrieve any\nin flight packets.\n4) When distributor receives information about shutdown of a worker,\nit: marks worker as not active; retrieves any in flight and backlog\npackets and process them to different workers; unlocks bufptr64\nby clearing RTE_DISTRIB_RETURN_BUF flag and allowing use in\nthe future if worker requests any new packages.\n5) Do not allow to: send or add to backlog any packets for not\nactive workers. Such workers are also ignored if matched.\n6) Adjust calls to handle_returns() and tags matching procedure\nto react for possible activation deactivation of workers.\n\nFixes: 775003ad2f96 (\"distributor: add new burst-capable library\")\nCc: david.hunt@intel.com\nCc: stable@dpdk.org\n\nSigned-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>\nAcked-by: David Hunt <david.hunt@intel.com>\n---\n lib/librte_distributor/distributor_private.h |   3 +\n lib/librte_distributor/rte_distributor.c     | 175 +++++++++++++++----\n 2 files changed, 146 insertions(+), 32 deletions(-)",
    "diff": "diff --git a/lib/librte_distributor/distributor_private.h b/lib/librte_distributor/distributor_private.h\nindex 489aef2ac..689fe3e18 100644\n--- a/lib/librte_distributor/distributor_private.h\n+++ b/lib/librte_distributor/distributor_private.h\n@@ -155,6 +155,9 @@ struct rte_distributor {\n \tenum rte_distributor_match_function dist_match_fn;\n \n \tstruct rte_distributor_single *d_single;\n+\n+\tuint8_t active[RTE_DISTRIB_MAX_WORKERS];\n+\tuint8_t activesum;\n };\n \n void\ndiff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c\nindex 93c90cf54..7aa079d53 100644\n--- a/lib/librte_distributor/rte_distributor.c\n+++ b/lib/librte_distributor/rte_distributor.c\n@@ -51,7 +51,7 @@ rte_distributor_request_pkt(struct rte_distributor *d,\n \t * Sync with worker on GET_BUF flag.\n \t */\n \twhile (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE)\n-\t\t\t& RTE_DISTRIB_GET_BUF)) {\n+\t\t\t& (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) {\n \t\trte_pause();\n \t\tuint64_t t = rte_rdtsc()+100;\n \n@@ -67,11 +67,11 @@ rte_distributor_request_pkt(struct rte_distributor *d,\n \tfor (i = count; i < RTE_DIST_BURST_SIZE; i++)\n \t\tbuf->retptr64[i] = 0;\n \n-\t/* Set Return bit for each packet returned */\n+\t/* Set VALID_BUF bit for each packet returned */\n \tfor (i = count; i-- > 0; )\n \t\tbuf->retptr64[i] =\n \t\t\t(((int64_t)(uintptr_t)(oldpkt[i])) <<\n-\t\t\tRTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;\n+\t\t\tRTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF;\n \n \t/*\n \t * Finally, set the GET_BUF  to signal to distributor that cache\n@@ -97,11 +97,13 @@ rte_distributor_poll_pkt(struct rte_distributor *d,\n \t\treturn (pkts[0]) ? 1 : 0;\n \t}\n \n-\t/* If bit is set, return\n+\t/* If any of below bits is set, return.\n+\t * GET_BUF is set when distributor hasn't sent any packets yet\n+\t * RETURN_BUF is set when distributor must retrieve in-flight packets\n \t * Sync with distributor to acquire bufptrs\n \t */\n \tif (__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)\n-\t\t& RTE_DISTRIB_GET_BUF)\n+\t\t& (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))\n \t\treturn -1;\n \n \t/* since bufptr64 is signed, this should be an arithmetic shift */\n@@ -113,7 +115,7 @@ rte_distributor_poll_pkt(struct rte_distributor *d,\n \t}\n \n \t/*\n-\t * so now we've got the contents of the cacheline into an  array of\n+\t * so now we've got the contents of the cacheline into an array of\n \t * mbuf pointers, so toggle the bit so scheduler can start working\n \t * on the next cacheline while we're working.\n \t * Sync with distributor on GET_BUF flag. Release bufptrs.\n@@ -173,7 +175,7 @@ rte_distributor_return_pkt(struct rte_distributor *d,\n \t * Sync with worker on GET_BUF flag.\n \t */\n \twhile (unlikely(__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_RELAXED)\n-\t\t\t& RTE_DISTRIB_GET_BUF)) {\n+\t\t\t& (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) {\n \t\trte_pause();\n \t\tuint64_t t = rte_rdtsc()+100;\n \n@@ -185,17 +187,25 @@ rte_distributor_return_pkt(struct rte_distributor *d,\n \t__atomic_thread_fence(__ATOMIC_ACQUIRE);\n \tfor (i = 0; i < RTE_DIST_BURST_SIZE; i++)\n \t\t/* Switch off the return bit first */\n-\t\tbuf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;\n+\t\tbuf->retptr64[i] = 0;\n \n \tfor (i = num; i-- > 0; )\n \t\tbuf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) <<\n-\t\t\tRTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;\n+\t\t\tRTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF;\n+\n+\t/* Use RETURN_BUF on bufptr64 to notify distributor that\n+\t * we won't read any mbufs from there even if GET_BUF is set.\n+\t * This allows distributor to retrieve in-flight already sent packets.\n+\t */\n+\t__atomic_or_fetch(&(buf->bufptr64[0]), RTE_DISTRIB_RETURN_BUF,\n+\t\t__ATOMIC_ACQ_REL);\n \n-\t/* set the GET_BUF but even if we got no returns.\n-\t * Sync with distributor on GET_BUF flag. Release retptrs.\n+\t/* set the RETURN_BUF on retptr64 even if we got no returns.\n+\t * Sync with distributor on RETURN_BUF flag. Release retptrs.\n+\t * Notify distributor that we don't request more packets any more.\n \t */\n \t__atomic_store_n(&(buf->retptr64[0]),\n-\t\tbuf->retptr64[0] | RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);\n+\t\tbuf->retptr64[0] | RTE_DISTRIB_RETURN_BUF, __ATOMIC_RELEASE);\n \n \treturn 0;\n }\n@@ -265,6 +275,59 @@ find_match_scalar(struct rte_distributor *d,\n \t */\n }\n \n+/*\n+ * When worker called rte_distributor_return_pkt()\n+ * and passed RTE_DISTRIB_RETURN_BUF handshake through retptr64,\n+ * distributor must retrieve both inflight and backlog packets assigned\n+ * to the worker and reprocess them to another worker.\n+ */\n+static void\n+handle_worker_shutdown(struct rte_distributor *d, unsigned int wkr)\n+{\n+\tstruct rte_distributor_buffer *buf = &(d->bufs[wkr]);\n+\t/* double BURST size for storing both inflights and backlog */\n+\tstruct rte_mbuf *pkts[RTE_DIST_BURST_SIZE * 2];\n+\tunsigned int pkts_count = 0;\n+\tunsigned int i;\n+\n+\t/* If GET_BUF is cleared there are in-flight packets sent\n+\t * to worker which does not require new packets.\n+\t * They must be retrieved and assigned to another worker.\n+\t */\n+\tif (!(__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)\n+\t\t& RTE_DISTRIB_GET_BUF))\n+\t\tfor (i = 0; i < RTE_DIST_BURST_SIZE; i++)\n+\t\t\tif (buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)\n+\t\t\t\tpkts[pkts_count++] = (void *)((uintptr_t)\n+\t\t\t\t\t(buf->bufptr64[i]\n+\t\t\t\t\t\t>> RTE_DISTRIB_FLAG_BITS));\n+\n+\t/* Make following operations on handshake flags on bufptr64:\n+\t * - set GET_BUF to indicate that distributor can overwrite buffer\n+\t *     with new packets if worker will make a new request.\n+\t * - clear RETURN_BUF to unlock reads on worker side.\n+\t */\n+\t__atomic_store_n(&(buf->bufptr64[0]), RTE_DISTRIB_GET_BUF,\n+\t\t__ATOMIC_RELEASE);\n+\n+\t/* Collect backlog packets from worker */\n+\tfor (i = 0; i < d->backlog[wkr].count; i++)\n+\t\tpkts[pkts_count++] = (void *)((uintptr_t)\n+\t\t\t(d->backlog[wkr].pkts[i] >> RTE_DISTRIB_FLAG_BITS));\n+\n+\td->backlog[wkr].count = 0;\n+\n+\t/* Clear both inflight and backlog tags */\n+\tfor (i = 0; i < RTE_DIST_BURST_SIZE; i++) {\n+\t\td->in_flight_tags[wkr][i] = 0;\n+\t\td->backlog[wkr].tags[i] = 0;\n+\t}\n+\n+\t/* Recursive call */\n+\tif (pkts_count > 0)\n+\t\trte_distributor_process(d, pkts, pkts_count);\n+}\n+\n \n /*\n  * When the handshake bits indicate that there are packets coming\n@@ -283,19 +346,33 @@ handle_returns(struct rte_distributor *d, unsigned int wkr)\n \n \t/* Sync on GET_BUF flag. Acquire retptrs. */\n \tif (__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_ACQUIRE)\n-\t\t& RTE_DISTRIB_GET_BUF) {\n+\t\t& (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF)) {\n \t\tfor (i = 0; i < RTE_DIST_BURST_SIZE; i++) {\n-\t\t\tif (buf->retptr64[i] & RTE_DISTRIB_RETURN_BUF) {\n+\t\t\tif (buf->retptr64[i] & RTE_DISTRIB_VALID_BUF) {\n \t\t\t\toldbuf = ((uintptr_t)(buf->retptr64[i] >>\n \t\t\t\t\tRTE_DISTRIB_FLAG_BITS));\n \t\t\t\t/* store returns in a circular buffer */\n \t\t\t\tstore_return(oldbuf, d, &ret_start, &ret_count);\n \t\t\t\tcount++;\n-\t\t\t\tbuf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;\n+\t\t\t\tbuf->retptr64[i] &= ~RTE_DISTRIB_VALID_BUF;\n \t\t\t}\n \t\t}\n \t\td->returns.start = ret_start;\n \t\td->returns.count = ret_count;\n+\n+\t\t/* If worker requested packets with GET_BUF, set it to active\n+\t\t * otherwise (RETURN_BUF), set it to not active.\n+\t\t */\n+\t\td->activesum -= d->active[wkr];\n+\t\td->active[wkr] = !!(buf->retptr64[0] & RTE_DISTRIB_GET_BUF);\n+\t\td->activesum += d->active[wkr];\n+\n+\t\t/* If worker returned packets without requesting new ones,\n+\t\t * handle all in-flights and backlog packets assigned to it.\n+\t\t */\n+\t\tif (unlikely(buf->retptr64[0] & RTE_DISTRIB_RETURN_BUF))\n+\t\t\thandle_worker_shutdown(d, wkr);\n+\n \t\t/* Clear for the worker to populate with more returns.\n \t\t * Sync with distributor on GET_BUF flag. Release retptrs.\n \t\t */\n@@ -320,11 +397,15 @@ release(struct rte_distributor *d, unsigned int wkr)\n \tunsigned int i;\n \n \thandle_returns(d, wkr);\n+\tif (unlikely(!d->active[wkr]))\n+\t\treturn 0;\n \n \t/* Sync with worker on GET_BUF flag */\n \twhile (!(__atomic_load_n(&(d->bufs[wkr].bufptr64[0]), __ATOMIC_ACQUIRE)\n \t\t& RTE_DISTRIB_GET_BUF)) {\n \t\thandle_returns(d, wkr);\n+\t\tif (unlikely(!d->active[wkr]))\n+\t\t\treturn 0;\n \t\trte_pause();\n \t}\n \n@@ -364,7 +445,7 @@ rte_distributor_process(struct rte_distributor *d,\n \tint64_t next_value = 0;\n \tuint16_t new_tag = 0;\n \tuint16_t flows[RTE_DIST_BURST_SIZE] __rte_cache_aligned;\n-\tunsigned int i, j, w, wid;\n+\tunsigned int i, j, w, wid, matching_required;\n \n \tif (d->alg_type == RTE_DIST_ALG_SINGLE) {\n \t\t/* Call the old API */\n@@ -372,11 +453,13 @@ rte_distributor_process(struct rte_distributor *d,\n \t\t\tmbufs, num_mbufs);\n \t}\n \n+\tfor (wid = 0 ; wid < d->num_workers; wid++)\n+\t\thandle_returns(d, wid);\n+\n \tif (unlikely(num_mbufs == 0)) {\n \t\t/* Flush out all non-full cache-lines to workers. */\n \t\tfor (wid = 0 ; wid < d->num_workers; wid++) {\n \t\t\t/* Sync with worker on GET_BUF flag. */\n-\t\t\thandle_returns(d, wid);\n \t\t\tif (__atomic_load_n(&(d->bufs[wid].bufptr64[0]),\n \t\t\t\t__ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF) {\n \t\t\t\trelease(d, wid);\n@@ -386,6 +469,9 @@ rte_distributor_process(struct rte_distributor *d,\n \t\treturn 0;\n \t}\n \n+\tif (unlikely(!d->activesum))\n+\t\treturn 0;\n+\n \twhile (next_idx < num_mbufs) {\n \t\tuint16_t matches[RTE_DIST_BURST_SIZE];\n \t\tunsigned int pkts;\n@@ -410,22 +496,30 @@ rte_distributor_process(struct rte_distributor *d,\n \t\tfor (; i < RTE_DIST_BURST_SIZE; i++)\n \t\t\tflows[i] = 0;\n \n-\t\tswitch (d->dist_match_fn) {\n-\t\tcase RTE_DIST_MATCH_VECTOR:\n-\t\t\tfind_match_vec(d, &flows[0], &matches[0]);\n-\t\t\tbreak;\n-\t\tdefault:\n-\t\t\tfind_match_scalar(d, &flows[0], &matches[0]);\n-\t\t}\n+\t\tmatching_required = 1;\n \n+\t\tfor (j = 0; j < pkts; j++) {\n+\t\t\tif (unlikely(!d->activesum))\n+\t\t\t\treturn next_idx;\n+\n+\t\t\tif (unlikely(matching_required)) {\n+\t\t\t\tswitch (d->dist_match_fn) {\n+\t\t\t\tcase RTE_DIST_MATCH_VECTOR:\n+\t\t\t\t\tfind_match_vec(d, &flows[0],\n+\t\t\t\t\t\t&matches[0]);\n+\t\t\t\t\tbreak;\n+\t\t\t\tdefault:\n+\t\t\t\t\tfind_match_scalar(d, &flows[0],\n+\t\t\t\t\t\t&matches[0]);\n+\t\t\t\t}\n+\t\t\t\tmatching_required = 0;\n+\t\t\t}\n \t\t/*\n \t\t * Matches array now contain the intended worker ID (+1) of\n \t\t * the incoming packets. Any zeroes need to be assigned\n \t\t * workers.\n \t\t */\n \n-\t\tfor (j = 0; j < pkts; j++) {\n-\n \t\t\tnext_mb = mbufs[next_idx++];\n \t\t\tnext_value = (((int64_t)(uintptr_t)next_mb) <<\n \t\t\t\t\tRTE_DISTRIB_FLAG_BITS);\n@@ -445,12 +539,18 @@ rte_distributor_process(struct rte_distributor *d,\n \t\t\t */\n \t\t\t/* matches[j] = 0; */\n \n-\t\t\tif (matches[j]) {\n+\t\t\tif (matches[j] && d->active[matches[j]-1]) {\n \t\t\t\tstruct rte_distributor_backlog *bl =\n \t\t\t\t\t\t&d->backlog[matches[j]-1];\n \t\t\t\tif (unlikely(bl->count ==\n \t\t\t\t\t\tRTE_DIST_BURST_SIZE)) {\n \t\t\t\t\trelease(d, matches[j]-1);\n+\t\t\t\t\tif (!d->active[matches[j]-1]) {\n+\t\t\t\t\t\tj--;\n+\t\t\t\t\t\tnext_idx--;\n+\t\t\t\t\t\tmatching_required = 1;\n+\t\t\t\t\t\tcontinue;\n+\t\t\t\t\t}\n \t\t\t\t}\n \n \t\t\t\t/* Add to worker that already has flow */\n@@ -460,11 +560,21 @@ rte_distributor_process(struct rte_distributor *d,\n \t\t\t\tbl->pkts[idx] = next_value;\n \n \t\t\t} else {\n-\t\t\t\tstruct rte_distributor_backlog *bl =\n-\t\t\t\t\t\t&d->backlog[wkr];\n+\t\t\t\tstruct rte_distributor_backlog *bl;\n+\n+\t\t\t\twhile (unlikely(!d->active[wkr]))\n+\t\t\t\t\twkr = (wkr + 1) % d->num_workers;\n+\t\t\t\tbl = &d->backlog[wkr];\n+\n \t\t\t\tif (unlikely(bl->count ==\n \t\t\t\t\t\tRTE_DIST_BURST_SIZE)) {\n \t\t\t\t\trelease(d, wkr);\n+\t\t\t\t\tif (!d->active[wkr]) {\n+\t\t\t\t\t\tj--;\n+\t\t\t\t\t\tnext_idx--;\n+\t\t\t\t\t\tmatching_required = 1;\n+\t\t\t\t\t\tcontinue;\n+\t\t\t\t\t}\n \t\t\t\t}\n \n \t\t\t\t/* Add to current worker worker */\n@@ -483,9 +593,7 @@ rte_distributor_process(struct rte_distributor *d,\n \t\t\t\t\t\tmatches[w] = wkr+1;\n \t\t\t}\n \t\t}\n-\t\twkr++;\n-\t\tif (wkr >= d->num_workers)\n-\t\t\twkr = 0;\n+\t\twkr = (wkr + 1) % d->num_workers;\n \t}\n \n \t/* Flush out all non-full cache-lines to workers. */\n@@ -661,6 +769,9 @@ rte_distributor_create(const char *name,\n \tfor (i = 0 ; i < num_workers ; i++)\n \t\td->backlog[i].tags = &d->in_flight_tags[i][RTE_DIST_BURST_SIZE];\n \n+\tmemset(d->active, 0, sizeof(d->active));\n+\td->activesum = 0;\n+\n \tdist_burst_list = RTE_TAILQ_CAST(rte_dist_burst_tailq.head,\n \t\t\t\t\t  rte_dist_burst_list);\n \n",
    "prefixes": [
        "v8",
        "04/17"
    ]
}