get:
Show a patch.

patch:
Update a patch.

put:
Update a patch.

GET /api/patches/81392/?format=api
HTTP 200 OK
Allow: GET, PUT, PATCH, HEAD, OPTIONS
Content-Type: application/json
Vary: Accept

{
    "id": 81392,
    "url": "http://patches.dpdk.org/api/patches/81392/?format=api",
    "web_url": "http://patches.dpdk.org/project/dpdk/patch/20201019163519.28180-2-dharmik.thakkar@arm.com/",
    "project": {
        "id": 1,
        "url": "http://patches.dpdk.org/api/projects/1/?format=api",
        "name": "DPDK",
        "link_name": "dpdk",
        "list_id": "dev.dpdk.org",
        "list_email": "dev@dpdk.org",
        "web_url": "http://core.dpdk.org",
        "scm_url": "git://dpdk.org/dpdk",
        "webscm_url": "http://git.dpdk.org/dpdk",
        "list_archive_url": "https://inbox.dpdk.org/dev",
        "list_archive_url_format": "https://inbox.dpdk.org/dev/{}",
        "commit_url_format": ""
    },
    "msgid": "<20201019163519.28180-2-dharmik.thakkar@arm.com>",
    "list_archive_url": "https://inbox.dpdk.org/dev/20201019163519.28180-2-dharmik.thakkar@arm.com",
    "date": "2020-10-19T16:35:17",
    "name": "[v4,1/3] lib/hash: integrate RCU QSBR",
    "commit_ref": null,
    "pull_url": null,
    "state": "superseded",
    "archived": true,
    "hash": "7a45113c2d8655e0b367a8f21ee8b1cd086bccce",
    "submitter": {
        "id": 1108,
        "url": "http://patches.dpdk.org/api/people/1108/?format=api",
        "name": "Dharmik Thakkar",
        "email": "dharmik.thakkar@arm.com"
    },
    "delegate": {
        "id": 24651,
        "url": "http://patches.dpdk.org/api/users/24651/?format=api",
        "username": "dmarchand",
        "first_name": "David",
        "last_name": "Marchand",
        "email": "david.marchand@redhat.com"
    },
    "mbox": "http://patches.dpdk.org/project/dpdk/patch/20201019163519.28180-2-dharmik.thakkar@arm.com/mbox/",
    "series": [
        {
            "id": 13109,
            "url": "http://patches.dpdk.org/api/series/13109/?format=api",
            "web_url": "http://patches.dpdk.org/project/dpdk/list/?series=13109",
            "date": "2020-10-19T16:35:17",
            "name": "hash: integrate RCU QSBR",
            "version": 4,
            "mbox": "http://patches.dpdk.org/series/13109/mbox/"
        }
    ],
    "comments": "http://patches.dpdk.org/api/patches/81392/comments/",
    "check": "warning",
    "checks": "http://patches.dpdk.org/api/patches/81392/checks/",
    "tags": {},
    "related": [],
    "headers": {
        "Return-Path": "<dev-bounces@dpdk.org>",
        "X-Original-To": "patchwork@inbox.dpdk.org",
        "Delivered-To": "patchwork@inbox.dpdk.org",
        "Received": [
            "from dpdk.org (dpdk.org [92.243.14.124])\n\tby inbox.dpdk.org (Postfix) with ESMTP id 5EDE5A04DC;\n\tMon, 19 Oct 2020 18:35:34 +0200 (CEST)",
            "from [92.243.14.124] (localhost [127.0.0.1])\n\tby dpdk.org (Postfix) with ESMTP id A16F1E277;\n\tMon, 19 Oct 2020 18:35:32 +0200 (CEST)",
            "from foss.arm.com (foss.arm.com [217.140.110.172])\n by dpdk.org (Postfix) with ESMTP id 22446E22A\n for <dev@dpdk.org>; Mon, 19 Oct 2020 18:35:31 +0200 (CEST)",
            "from usa-sjc-imap-foss1.foss.arm.com (unknown [10.121.207.14])\n by usa-sjc-mx-foss1.foss.arm.com (Postfix) with ESMTP id 87759D6E;\n Mon, 19 Oct 2020 09:35:29 -0700 (PDT)",
            "from 2p2660v4-1.austin.arm.com (2p2660v4-1.austin.arm.com\n [10.118.12.95])\n by usa-sjc-imap-foss1.foss.arm.com (Postfix) with ESMTPSA id 7CA1C3F66B;\n Mon, 19 Oct 2020 09:35:29 -0700 (PDT)"
        ],
        "From": "Dharmik Thakkar <dharmik.thakkar@arm.com>",
        "To": "Yipeng Wang <yipeng1.wang@intel.com>,\n Sameh Gobriel <sameh.gobriel@intel.com>,\n Bruce Richardson <bruce.richardson@intel.com>,\n Ray Kinsella <mdr@ashroe.eu>, Neil Horman <nhorman@tuxdriver.com>",
        "Cc": "dev@dpdk.org,\n\tnd@arm.com,\n\tDharmik Thakkar <dharmik.thakkar@arm.com>",
        "Date": "Mon, 19 Oct 2020 11:35:17 -0500",
        "Message-Id": "<20201019163519.28180-2-dharmik.thakkar@arm.com>",
        "X-Mailer": "git-send-email 2.17.1",
        "In-Reply-To": "<20201019163519.28180-1-dharmik.thakkar@arm.com>",
        "References": "<20201016173858.1134-1-dharmik.thakkar@arm.com>\n <20201019163519.28180-1-dharmik.thakkar@arm.com>",
        "Subject": "[dpdk-dev] [PATCH v4 1/3] lib/hash: integrate RCU QSBR",
        "X-BeenThere": "dev@dpdk.org",
        "X-Mailman-Version": "2.1.15",
        "Precedence": "list",
        "List-Id": "DPDK patches and discussions <dev.dpdk.org>",
        "List-Unsubscribe": "<https://mails.dpdk.org/options/dev>,\n <mailto:dev-request@dpdk.org?subject=unsubscribe>",
        "List-Archive": "<http://mails.dpdk.org/archives/dev/>",
        "List-Post": "<mailto:dev@dpdk.org>",
        "List-Help": "<mailto:dev-request@dpdk.org?subject=help>",
        "List-Subscribe": "<https://mails.dpdk.org/listinfo/dev>,\n <mailto:dev-request@dpdk.org?subject=subscribe>",
        "Errors-To": "dev-bounces@dpdk.org",
        "Sender": "\"dev\" <dev-bounces@dpdk.org>"
    },
    "content": "Currently, users have to use external RCU mechanisms to free resources\nwhen using lock free hash algorithm.\n\nIntegrate RCU QSBR process to make it easier for the applications to use \nlock free algorithm.\nRefer to RCU documentation to understand various aspects of\nintegrating RCU library into other libraries.\n\nSuggested-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>\nSigned-off-by: Dharmik Thakkar <dharmik.thakkar@arm.com>\nReviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>\nAcked-by: Ray Kinsella <mdr@ashroe.eu>\n---\n doc/guides/prog_guide/hash_lib.rst   |  11 +-\n lib/librte_hash/meson.build          |   1 +\n lib/librte_hash/rte_cuckoo_hash.c    | 302 +++++++++++++++++++++------\n lib/librte_hash/rte_cuckoo_hash.h    |   8 +\n lib/librte_hash/rte_hash.h           |  77 ++++++-\n lib/librte_hash/rte_hash_version.map |   2 +-\n 6 files changed, 325 insertions(+), 76 deletions(-)",
    "diff": "diff --git a/doc/guides/prog_guide/hash_lib.rst b/doc/guides/prog_guide/hash_lib.rst\nindex d06c7de2ead1..63e183ed1f08 100644\n--- a/doc/guides/prog_guide/hash_lib.rst\n+++ b/doc/guides/prog_guide/hash_lib.rst\n@@ -102,6 +102,9 @@ For concurrent writes, and concurrent reads and writes the following flag values\n *  If the 'do not free on delete' (RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL) flag is set, the position of the entry in the hash table is not freed upon calling delete(). This flag is enabled\n    by default when the lock free read/write concurrency flag is set. The application should free the position after all the readers have stopped referencing the position.\n    Where required, the application can make use of RCU mechanisms to determine when the readers have stopped referencing the position.\n+   RCU QSBR process is integrated within the Hash library for safe freeing of the position. Application has certain responsibilities while using this feature.\n+   Please refer to resource reclamation framework of :ref:`RCU library <RCU_Library>` for more details.\n+\n \n Extendable Bucket Functionality support\n ----------------------------------------\n@@ -109,8 +112,8 @@ An extra flag is used to enable this functionality (flag is not set by default).\n in the very unlikely case due to excessive hash collisions that a key has failed to be inserted, the hash table bucket is extended with a linked\n list to insert these failed keys. This feature is important for the workloads (e.g. telco workloads) that need to insert up to 100% of the\n hash table size and can't tolerate any key insertion failure (even if very few).\n-Please note that with the 'lock free read/write concurrency' flag enabled, users need to call 'rte_hash_free_key_with_position' API in order to free the empty buckets and\n-deleted keys, to maintain the 100% capacity guarantee.\n+Please note that with the 'lock free read/write concurrency' flag enabled, users need to call 'rte_hash_free_key_with_position' API or configure integrated RCU QSBR\n+(or use external RCU mechanisms) in order to free the empty buckets and deleted keys, to maintain the 100% capacity guarantee.\n \n Implementation Details (non Extendable Bucket Case)\n ---------------------------------------------------\n@@ -172,7 +175,7 @@ Example of deletion:\n Similar to lookup, the key is searched in its primary and secondary buckets. If the key is found, the\n entry is marked as empty. If the hash table was configured with 'no free on delete' or 'lock free read/write concurrency',\n the position of the key is not freed. It is the responsibility of the user to free the position after\n-readers are not referencing the position anymore.\n+readers are not referencing the position anymore. User can configure integrated RCU QSBR or use external RCU mechanisms to safely free the position on delete\n \n \n Implementation Details (with Extendable Bucket)\n@@ -286,6 +289,8 @@ The flow table operations on the application side are described below:\n *   Free flow: Free flow key position. If 'no free on delete' or 'lock-free read/write concurrency' flags are set,\n     wait till the readers are not referencing the position returned during add/delete flow and then free the position.\n     RCU mechanisms can be used to find out when the readers are not referencing the position anymore.\n+    RCU QSBR process is integrated within the Hash library for safe freeing of the position. Application has certain responsibilities while using this feature.\n+    Please refer to resource reclamation framework of :ref:`RCU library <RCU_Library>` for more details.\n \n *   Lookup flow: Lookup for the flow key in the hash.\n     If the returned position is valid (flow lookup hit), use the returned position to access the flow entry in the flow table.\ndiff --git a/lib/librte_hash/meson.build b/lib/librte_hash/meson.build\nindex 6ab46ae9d768..0977a63fd279 100644\n--- a/lib/librte_hash/meson.build\n+++ b/lib/librte_hash/meson.build\n@@ -10,3 +10,4 @@ headers = files('rte_crc_arm64.h',\n \n sources = files('rte_cuckoo_hash.c', 'rte_fbk_hash.c')\n deps += ['ring']\n+deps += ['rcu']\ndiff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c\nindex aad0c965be5e..b9e4d82a0c14 100644\n--- a/lib/librte_hash/rte_cuckoo_hash.c\n+++ b/lib/librte_hash/rte_cuckoo_hash.c\n@@ -52,6 +52,11 @@ static struct rte_tailq_elem rte_hash_tailq = {\n };\n EAL_REGISTER_TAILQ(rte_hash_tailq)\n \n+struct __rte_hash_rcu_dq_entry {\n+\tuint32_t key_idx;\n+\tuint32_t ext_bkt_idx; /**< Extended bkt index */\n+};\n+\n struct rte_hash *\n rte_hash_find_existing(const char *name)\n {\n@@ -210,7 +215,10 @@ rte_hash_create(const struct rte_hash_parameters *params)\n \n \tif (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF) {\n \t\treadwrite_concur_lf_support = 1;\n-\t\t/* Enable not freeing internal memory/index on delete */\n+\t\t/* Enable not freeing internal memory/index on delete.\n+\t\t * If internal RCU is enabled, freeing of internal memory/index\n+\t\t * is done on delete\n+\t\t */\n \t\tno_free_on_del = 1;\n \t}\n \n@@ -505,6 +513,10 @@ rte_hash_free(struct rte_hash *h)\n \n \trte_mcfg_tailq_write_unlock();\n \n+\t/* RCU clean up. */\n+\tif (h->dq)\n+\t\trte_rcu_qsbr_dq_delete(h->dq);\n+\n \tif (h->use_local_cache)\n \t\trte_free(h->local_free_slots);\n \tif (h->writer_takes_lock)\n@@ -607,11 +619,21 @@ void\n rte_hash_reset(struct rte_hash *h)\n {\n \tuint32_t tot_ring_cnt, i;\n+\tunsigned int pending;\n \n \tif (h == NULL)\n \t\treturn;\n \n \t__hash_rw_writer_lock(h);\n+\n+\t/* RCU QSBR clean up. */\n+\tif (h->dq) {\n+\t\t/* Reclaim all the resources */\n+\t\trte_rcu_qsbr_dq_reclaim(h->dq, ~0, NULL, &pending, NULL);\n+\t\tif (pending != 0)\n+\t\t\tRTE_LOG(ERR, HASH, \"RCU reclaim all resources failed\\n\");\n+\t}\n+\n \tmemset(h->buckets, 0, h->num_buckets * sizeof(struct rte_hash_bucket));\n \tmemset(h->key_store, 0, h->key_entry_size * (h->entries + 1));\n \t*h->tbl_chng_cnt = 0;\n@@ -952,6 +974,37 @@ rte_hash_cuckoo_make_space_mw(const struct rte_hash *h,\n \treturn -ENOSPC;\n }\n \n+static inline uint32_t\n+alloc_slot(const struct rte_hash *h, struct lcore_cache *cached_free_slots)\n+{\n+\tunsigned int  n_slots;\n+\tuint32_t slot_id;\n+\tif (h->use_local_cache) {\n+\t\t/* Try to get a free slot from the local cache */\n+\t\tif (cached_free_slots->len == 0) {\n+\t\t\t/* Need to get another burst of free slots from global ring */\n+\t\t\tn_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots,\n+\t\t\t\t\tcached_free_slots->objs,\n+\t\t\t\t\tsizeof(uint32_t),\n+\t\t\t\t\tLCORE_CACHE_SIZE, NULL);\n+\t\t\tif (n_slots == 0)\n+\t\t\t\treturn EMPTY_SLOT;\n+\n+\t\t\tcached_free_slots->len += n_slots;\n+\t\t}\n+\n+\t\t/* Get a free slot from the local cache */\n+\t\tcached_free_slots->len--;\n+\t\tslot_id = cached_free_slots->objs[cached_free_slots->len];\n+\t} else {\n+\t\tif (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id,\n+\t\t\t\t\t\tsizeof(uint32_t)) != 0)\n+\t\t\treturn EMPTY_SLOT;\n+\t}\n+\n+\treturn slot_id;\n+}\n+\n static inline int32_t\n __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,\n \t\t\t\t\t\thash_sig_t sig, void *data)\n@@ -963,7 +1016,6 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,\n \tuint32_t ext_bkt_id = 0;\n \tuint32_t slot_id;\n \tint ret;\n-\tunsigned n_slots;\n \tunsigned lcore_id;\n \tunsigned int i;\n \tstruct lcore_cache *cached_free_slots = NULL;\n@@ -1001,28 +1053,20 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,\n \tif (h->use_local_cache) {\n \t\tlcore_id = rte_lcore_id();\n \t\tcached_free_slots = &h->local_free_slots[lcore_id];\n-\t\t/* Try to get a free slot from the local cache */\n-\t\tif (cached_free_slots->len == 0) {\n-\t\t\t/* Need to get another burst of free slots from global ring */\n-\t\t\tn_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots,\n-\t\t\t\t\tcached_free_slots->objs,\n-\t\t\t\t\tsizeof(uint32_t),\n-\t\t\t\t\tLCORE_CACHE_SIZE, NULL);\n-\t\t\tif (n_slots == 0) {\n-\t\t\t\treturn -ENOSPC;\n-\t\t\t}\n-\n-\t\t\tcached_free_slots->len += n_slots;\n+\t}\n+\tslot_id = alloc_slot(h, cached_free_slots);\n+\tif (slot_id == EMPTY_SLOT) {\n+\t\tif (h->dq) {\n+\t\t\t__hash_rw_writer_lock(h);\n+\t\t\tret = rte_rcu_qsbr_dq_reclaim(h->dq,\n+\t\t\t\t\th->hash_rcu_cfg->max_reclaim_size,\n+\t\t\t\t\tNULL, NULL, NULL);\n+\t\t\t__hash_rw_writer_unlock(h);\n+\t\t\tif (ret == 0)\n+\t\t\t\tslot_id = alloc_slot(h, cached_free_slots);\n \t\t}\n-\n-\t\t/* Get a free slot from the local cache */\n-\t\tcached_free_slots->len--;\n-\t\tslot_id = cached_free_slots->objs[cached_free_slots->len];\n-\t} else {\n-\t\tif (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id,\n-\t\t\t\t\t\tsizeof(uint32_t)) != 0) {\n+\t\tif (slot_id == EMPTY_SLOT)\n \t\t\treturn -ENOSPC;\n-\t\t}\n \t}\n \n \tnew_k = RTE_PTR_ADD(keys, slot_id * h->key_entry_size);\n@@ -1118,8 +1162,19 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,\n \tif (rte_ring_sc_dequeue_elem(h->free_ext_bkts, &ext_bkt_id,\n \t\t\t\t\t\tsizeof(uint32_t)) != 0 ||\n \t\t\t\t\text_bkt_id == 0) {\n-\t\tret = -ENOSPC;\n-\t\tgoto failure;\n+\t\tif (h->dq) {\n+\t\t\tif (rte_rcu_qsbr_dq_reclaim(h->dq,\n+\t\t\t\t\th->hash_rcu_cfg->max_reclaim_size,\n+\t\t\t\t\tNULL, NULL, NULL) == 0) {\n+\t\t\t\trte_ring_sc_dequeue_elem(h->free_ext_bkts,\n+\t\t\t\t\t\t\t &ext_bkt_id,\n+\t\t\t\t\t\t\t sizeof(uint32_t));\n+\t\t\t}\n+\t\t}\n+\t\tif (ext_bkt_id == 0) {\n+\t\t\tret = -ENOSPC;\n+\t\t\tgoto failure;\n+\t\t}\n \t}\n \n \t/* Use the first location of the new bucket */\n@@ -1395,12 +1450,12 @@ rte_hash_lookup_data(const struct rte_hash *h, const void *key, void **data)\n \treturn __rte_hash_lookup_with_hash(h, key, rte_hash_hash(h, key), data);\n }\n \n-static inline void\n-remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)\n+static int\n+free_slot(const struct rte_hash *h, uint32_t slot_id)\n {\n \tunsigned lcore_id, n_slots;\n-\tstruct lcore_cache *cached_free_slots;\n-\n+\tstruct lcore_cache *cached_free_slots = NULL;\n+\t/* Return key indexes to free slot ring */\n \tif (h->use_local_cache) {\n \t\tlcore_id = rte_lcore_id();\n \t\tcached_free_slots = &h->local_free_slots[lcore_id];\n@@ -1411,18 +1466,127 @@ remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)\n \t\t\t\t\t\tcached_free_slots->objs,\n \t\t\t\t\t\tsizeof(uint32_t),\n \t\t\t\t\t\tLCORE_CACHE_SIZE, NULL);\n-\t\t\tERR_IF_TRUE((n_slots == 0),\n-\t\t\t\t\"%s: could not enqueue free slots in global ring\\n\",\n-\t\t\t\t__func__);\n+\t\t\tRETURN_IF_TRUE((n_slots == 0), -EFAULT);\n \t\t\tcached_free_slots->len -= n_slots;\n \t\t}\n-\t\t/* Put index of new free slot in cache. */\n-\t\tcached_free_slots->objs[cached_free_slots->len] =\n-\t\t\t\t\t\t\tbkt->key_idx[i];\n-\t\tcached_free_slots->len++;\n+\t}\n+\n+\tenqueue_slot_back(h, cached_free_slots, slot_id);\n+\treturn 0;\n+}\n+\n+static void\n+__hash_rcu_qsbr_free_resource(void *p, void *e, unsigned int n)\n+{\n+\tvoid *key_data = NULL;\n+\tint ret;\n+\tstruct rte_hash_key *keys, *k;\n+\tstruct rte_hash *h = (struct rte_hash *)p;\n+\tstruct __rte_hash_rcu_dq_entry rcu_dq_entry =\n+\t\t\t*((struct __rte_hash_rcu_dq_entry *)e);\n+\n+\tRTE_SET_USED(n);\n+\tkeys = h->key_store;\n+\n+\tk = (struct rte_hash_key *) ((char *)keys +\n+\t\t\t\trcu_dq_entry.key_idx * h->key_entry_size);\n+\tkey_data = k->pdata;\n+\tif (h->hash_rcu_cfg->free_key_data_func)\n+\t\th->hash_rcu_cfg->free_key_data_func(h->hash_rcu_cfg->key_data_ptr,\n+\t\t\t\t\t\t    key_data);\n+\n+\tif (h->ext_table_support && rcu_dq_entry.ext_bkt_idx != EMPTY_SLOT)\n+\t\t/* Recycle empty ext bkt to free list. */\n+\t\trte_ring_sp_enqueue_elem(h->free_ext_bkts,\n+\t\t\t&rcu_dq_entry.ext_bkt_idx, sizeof(uint32_t));\n+\n+\t/* Return key indexes to free slot ring */\n+\tret = free_slot(h, rcu_dq_entry.key_idx);\n+\tif (ret < 0) {\n+\t\tRTE_LOG(ERR, HASH,\n+\t\t\t\"%s: could not enqueue free slots in global ring\\n\",\n+\t\t\t\t__func__);\n+\t}\n+}\n+\n+int\n+rte_hash_rcu_qsbr_add(struct rte_hash *h,\n+\t\t\t\tstruct rte_hash_rcu_config *cfg)\n+{\n+\tstruct rte_rcu_qsbr_dq_parameters params = {0};\n+\tchar rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];\n+\tstruct rte_hash_rcu_config *hash_rcu_cfg = NULL;\n+\n+\tconst uint32_t total_entries = h->use_local_cache ?\n+\t\th->entries + (RTE_MAX_LCORE - 1) * (LCORE_CACHE_SIZE - 1) + 1\n+\t\t\t\t\t\t\t: h->entries + 1;\n+\n+\tif ((h == NULL) || cfg == NULL || cfg->v == NULL) {\n+\t\trte_errno = EINVAL;\n+\t\treturn 1;\n+\t}\n+\n+\tif (h->hash_rcu_cfg) {\n+\t\trte_errno = EEXIST;\n+\t\treturn 1;\n+\t}\n+\n+\thash_rcu_cfg = rte_zmalloc(NULL, sizeof(struct rte_hash_rcu_config), 0);\n+\tif (hash_rcu_cfg == NULL) {\n+\t\tRTE_LOG(ERR, HASH, \"memory allocation failed\\n\");\n+\t\treturn 1;\n+\t}\n+\n+\tif (cfg->mode == RTE_HASH_QSBR_MODE_SYNC) {\n+\t\t/* No other things to do. */\n+\t} else if (cfg->mode == RTE_HASH_QSBR_MODE_DQ) {\n+\t\t/* Init QSBR defer queue. */\n+\t\tsnprintf(rcu_dq_name, sizeof(rcu_dq_name),\n+\t\t\t\t\t\"HASH_RCU_%s\", h->name);\n+\t\tparams.name = rcu_dq_name;\n+\t\tparams.size = cfg->dq_size;\n+\t\tif (params.size == 0)\n+\t\t\tparams.size = total_entries;\n+\t\tparams.trigger_reclaim_limit = cfg->trigger_reclaim_limit;\n+\t\tif (params.max_reclaim_size == 0)\n+\t\t\tparams.max_reclaim_size = RTE_HASH_RCU_DQ_RECLAIM_MAX;\n+\t\tparams.esize = sizeof(struct __rte_hash_rcu_dq_entry);\n+\t\tparams.free_fn = __hash_rcu_qsbr_free_resource;\n+\t\tparams.p = h;\n+\t\tparams.v = cfg->v;\n+\t\th->dq = rte_rcu_qsbr_dq_create(&params);\n+\t\tif (h->dq == NULL) {\n+\t\t\trte_free(hash_rcu_cfg);\n+\t\t\tRTE_LOG(ERR, HASH, \"HASH defer queue creation failed\\n\");\n+\t\t\treturn 1;\n+\t\t}\n \t} else {\n-\t\trte_ring_sp_enqueue_elem(h->free_slots,\n-\t\t\t\t&bkt->key_idx[i], sizeof(uint32_t));\n+\t\trte_free(hash_rcu_cfg);\n+\t\trte_errno = EINVAL;\n+\t\treturn 1;\n+\t}\n+\n+\thash_rcu_cfg->v = cfg->v;\n+\thash_rcu_cfg->mode = cfg->mode;\n+\thash_rcu_cfg->dq_size = params.size;\n+\thash_rcu_cfg->trigger_reclaim_limit = params.trigger_reclaim_limit;\n+\thash_rcu_cfg->max_reclaim_size = params.max_reclaim_size;\n+\thash_rcu_cfg->free_key_data_func = cfg->free_key_data_func;\n+\thash_rcu_cfg->key_data_ptr = cfg->key_data_ptr;\n+\n+\th->hash_rcu_cfg = hash_rcu_cfg;\n+\n+\treturn 0;\n+}\n+\n+static inline void\n+remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)\n+{\n+\tint ret = free_slot(h, bkt->key_idx[i]);\n+\tif (ret < 0) {\n+\t\tRTE_LOG(ERR, HASH,\n+\t\t\t\"%s: could not enqueue free slots in global ring\\n\",\n+\t\t\t\t__func__);\n \t}\n }\n \n@@ -1521,6 +1685,8 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,\n \tint pos;\n \tint32_t ret, i;\n \tuint16_t short_sig;\n+\tuint32_t index = EMPTY_SLOT;\n+\tstruct __rte_hash_rcu_dq_entry rcu_dq_entry;\n \n \tshort_sig = get_short_sig(sig);\n \tprim_bucket_idx = get_prim_bucket_index(h, sig);\n@@ -1555,10 +1721,9 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,\n \n /* Search last bucket to see if empty to be recycled */\n return_bkt:\n-\tif (!last_bkt) {\n-\t\t__hash_rw_writer_unlock(h);\n-\t\treturn ret;\n-\t}\n+\tif (!last_bkt)\n+\t\tgoto return_key;\n+\n \twhile (last_bkt->next) {\n \t\tprev_bkt = last_bkt;\n \t\tlast_bkt = last_bkt->next;\n@@ -1571,11 +1736,11 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,\n \t/* found empty bucket and recycle */\n \tif (i == RTE_HASH_BUCKET_ENTRIES) {\n \t\tprev_bkt->next = NULL;\n-\t\tuint32_t index = last_bkt - h->buckets_ext + 1;\n+\t\tindex = last_bkt - h->buckets_ext + 1;\n \t\t/* Recycle the empty bkt if\n \t\t * no_free_on_del is disabled.\n \t\t */\n-\t\tif (h->no_free_on_del)\n+\t\tif (h->no_free_on_del) {\n \t\t\t/* Store index of an empty ext bkt to be recycled\n \t\t\t * on calling rte_hash_del_xxx APIs.\n \t\t\t * When lock free read-write concurrency is enabled,\n@@ -1583,12 +1748,34 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,\n \t\t\t * immediately (as readers might be using it still).\n \t\t\t * Hence freeing of the ext bkt is piggy-backed to\n \t\t\t * freeing of the key index.\n+\t\t\t * If using external RCU, store this index in an array.\n \t\t\t */\n-\t\t\th->ext_bkt_to_free[ret] = index;\n-\t\telse\n+\t\t\tif (h->hash_rcu_cfg == NULL)\n+\t\t\t\th->ext_bkt_to_free[ret] = index;\n+\t\t} else\n \t\t\trte_ring_sp_enqueue_elem(h->free_ext_bkts, &index,\n \t\t\t\t\t\t\tsizeof(uint32_t));\n \t}\n+\n+return_key:\n+\t/* Using internal RCU QSBR */\n+\tif (h->hash_rcu_cfg) {\n+\t\t/* Key index where key is stored, adding the first dummy index */\n+\t\trcu_dq_entry.key_idx = ret + 1;\n+\t\trcu_dq_entry.ext_bkt_idx = index;\n+\t\tif (h->dq == NULL) {\n+\t\t\t/* Wait for quiescent state change if using\n+\t\t\t * RTE_HASH_QSBR_MODE_SYNC\n+\t\t\t */\n+\t\t\trte_rcu_qsbr_synchronize(h->hash_rcu_cfg->v,\n+\t\t\t\t\t\t RTE_QSBR_THRID_INVALID);\n+\t\t\t__hash_rcu_qsbr_free_resource((void *)((uintptr_t)h),\n+\t\t\t\t\t\t      &rcu_dq_entry, 1);\n+\t\t} else if (h->dq)\n+\t\t\t/* Push into QSBR FIFO if using RTE_HASH_QSBR_MODE_DQ */\n+\t\t\tif (rte_rcu_qsbr_dq_enqueue(h->dq, &rcu_dq_entry) != 0)\n+\t\t\t\tRTE_LOG(ERR, HASH, \"Failed to push QSBR FIFO\\n\");\n+\t}\n \t__hash_rw_writer_unlock(h);\n \treturn ret;\n }\n@@ -1637,8 +1824,6 @@ rte_hash_free_key_with_position(const struct rte_hash *h,\n \n \tRETURN_IF_TRUE(((h == NULL) || (key_idx == EMPTY_SLOT)), -EINVAL);\n \n-\tunsigned int lcore_id, n_slots;\n-\tstruct lcore_cache *cached_free_slots;\n \tconst uint32_t total_entries = h->use_local_cache ?\n \t\th->entries + (RTE_MAX_LCORE - 1) * (LCORE_CACHE_SIZE - 1) + 1\n \t\t\t\t\t\t\t: h->entries + 1;\n@@ -1656,28 +1841,9 @@ rte_hash_free_key_with_position(const struct rte_hash *h,\n \t\t}\n \t}\n \n-\tif (h->use_local_cache) {\n-\t\tlcore_id = rte_lcore_id();\n-\t\tcached_free_slots = &h->local_free_slots[lcore_id];\n-\t\t/* Cache full, need to free it. */\n-\t\tif (cached_free_slots->len == LCORE_CACHE_SIZE) {\n-\t\t\t/* Need to enqueue the free slots in global ring. */\n-\t\t\tn_slots = rte_ring_mp_enqueue_burst_elem(h->free_slots,\n-\t\t\t\t\t\tcached_free_slots->objs,\n-\t\t\t\t\t\tsizeof(uint32_t),\n-\t\t\t\t\t\tLCORE_CACHE_SIZE, NULL);\n-\t\t\tRETURN_IF_TRUE((n_slots == 0), -EFAULT);\n-\t\t\tcached_free_slots->len -= n_slots;\n-\t\t}\n-\t\t/* Put index of new free slot in cache. */\n-\t\tcached_free_slots->objs[cached_free_slots->len] = key_idx;\n-\t\tcached_free_slots->len++;\n-\t} else {\n-\t\trte_ring_sp_enqueue_elem(h->free_slots, &key_idx,\n-\t\t\t\t\t\tsizeof(uint32_t));\n-\t}\n+\t/* Enqueue slot to cache/ring of free slots. */\n+\treturn free_slot(h, key_idx);\n \n-\treturn 0;\n }\n \n static inline void\ndiff --git a/lib/librte_hash/rte_cuckoo_hash.h b/lib/librte_hash/rte_cuckoo_hash.h\nindex 345de6bf9cfd..85be49d3bbe7 100644\n--- a/lib/librte_hash/rte_cuckoo_hash.h\n+++ b/lib/librte_hash/rte_cuckoo_hash.h\n@@ -168,6 +168,11 @@ struct rte_hash {\n \tstruct lcore_cache *local_free_slots;\n \t/**< Local cache per lcore, storing some indexes of the free slots */\n \n+\t/* RCU config */\n+\tstruct rte_hash_rcu_config *hash_rcu_cfg;\n+\t/**< HASH RCU QSBR configuration structure */\n+\tstruct rte_rcu_qsbr_dq *dq;\t/**< RCU QSBR defer queue. */\n+\n \t/* Fields used in lookup */\n \n \tuint32_t key_len __rte_cache_aligned;\n@@ -230,4 +235,7 @@ struct queue_node {\n \tint prev_slot;               /* Parent(slot) in search path */\n };\n \n+/** @internal Default RCU defer queue entries to reclaim in one go. */\n+#define RTE_HASH_RCU_DQ_RECLAIM_MAX\t16\n+\n #endif\ndiff --git a/lib/librte_hash/rte_hash.h b/lib/librte_hash/rte_hash.h\nindex bff40251bc98..3d28f177f14a 100644\n--- a/lib/librte_hash/rte_hash.h\n+++ b/lib/librte_hash/rte_hash.h\n@@ -15,6 +15,7 @@\n #include <stddef.h>\n \n #include <rte_compat.h>\n+#include <rte_rcu_qsbr.h>\n \n #ifdef __cplusplus\n extern \"C\" {\n@@ -45,7 +46,8 @@ extern \"C\" {\n /** Flag to disable freeing of key index on hash delete.\n  * Refer to rte_hash_del_xxx APIs for more details.\n  * This is enabled by default when RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF\n- * is enabled.\n+ * is enabled. However, if internal RCU is enabled, freeing of internal\n+ * memory/index is done on delete\n  */\n #define RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL 0x10\n \n@@ -67,6 +69,13 @@ typedef uint32_t (*rte_hash_function)(const void *key, uint32_t key_len,\n /** Type of function used to compare the hash key. */\n typedef int (*rte_hash_cmp_eq_t)(const void *key1, const void *key2, size_t key_len);\n \n+/**\n+ * Type of function used to free data stored in the key.\n+ * Required when using internal RCU to allow application to free key-data once\n+ * the key is returned to the the ring of free key-slots.\n+ */\n+typedef void (*rte_hash_free_key_data)(void *p, void *key_data);\n+\n /**\n  * Parameters used when creating the hash table.\n  */\n@@ -81,6 +90,39 @@ struct rte_hash_parameters {\n \tuint8_t extra_flag;\t\t/**< Indicate if additional parameters are present. */\n };\n \n+/** RCU reclamation modes */\n+enum rte_hash_qsbr_mode {\n+\t/** Create defer queue for reclaim. */\n+\tRTE_HASH_QSBR_MODE_DQ = 0,\n+\t/** Use blocking mode reclaim. No defer queue created. */\n+\tRTE_HASH_QSBR_MODE_SYNC\n+};\n+\n+/** HASH RCU QSBR configuration structure. */\n+struct rte_hash_rcu_config {\n+\tstruct rte_rcu_qsbr *v;\t\t/**< RCU QSBR variable. */\n+\tenum rte_hash_qsbr_mode mode;\n+\t/**< Mode of RCU QSBR. RTE_HASH_QSBR_MODE_xxx\n+\t * '0' for default: create defer queue for reclaim.\n+\t */\n+\tuint32_t dq_size;\n+\t/**< RCU defer queue size.\n+\t * default: total hash table entries.\n+\t */\n+\tuint32_t trigger_reclaim_limit;\t/**< Threshold to trigger auto reclaim. */\n+\tuint32_t max_reclaim_size;\n+\t/**< Max entries to reclaim in one go.\n+\t * default: RTE_HASH_RCU_DQ_RECLAIM_MAX.\n+\t */\n+\tvoid *key_data_ptr;\n+\t/**< Pointer passed to the free function. Typically, this is the\n+\t * pointer to the data structure to which the resource to free\n+\t * (key-data) belongs. This can be NULL.\n+\t */\n+\trte_hash_free_key_data free_key_data_func;\n+\t/**< Function to call to free the resource (key-data). */\n+};\n+\n /** @internal A hash table structure. */\n struct rte_hash;\n \n@@ -287,7 +329,8 @@ rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, hash_sig_t\n  * Thread safety can be enabled by setting flag during\n  * table creation.\n  * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or\n- * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,\n+ * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and\n+ * internal RCU is NOT enabled,\n  * the key index returned by rte_hash_add_key_xxx APIs will not be\n  * freed by this API. rte_hash_free_key_with_position API must be called\n  * additionally to free the index associated with the key.\n@@ -316,7 +359,8 @@ rte_hash_del_key(const struct rte_hash *h, const void *key);\n  * Thread safety can be enabled by setting flag during\n  * table creation.\n  * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or\n- * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,\n+ * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and\n+ * internal RCU is NOT enabled,\n  * the key index returned by rte_hash_add_key_xxx APIs will not be\n  * freed by this API. rte_hash_free_key_with_position API must be called\n  * additionally to free the index associated with the key.\n@@ -370,7 +414,8 @@ rte_hash_get_key_with_position(const struct rte_hash *h, const int32_t position,\n  * only be called from one thread by default. Thread safety\n  * can be enabled by setting flag during table creation.\n  * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or\n- * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,\n+ * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and\n+ * internal RCU is NOT enabled,\n  * the key index returned by rte_hash_del_key_xxx APIs must be freed\n  * using this API. This API should be called after all the readers\n  * have stopped referencing the entry corresponding to this key.\n@@ -625,6 +670,30 @@ rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,\n  */\n int32_t\n rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32_t *next);\n+\n+/**\n+ * @warning\n+ * @b EXPERIMENTAL: this API may change without prior notice\n+ *\n+ * Associate RCU QSBR variable with an Hash object.\n+ * This API should be called to enable the integrated RCU QSBR support and\n+ * should be called immediately after creating the Hash object.\n+ *\n+ * @param h\n+ *   the hash object to add RCU QSBR\n+ * @param cfg\n+ *   RCU QSBR configuration\n+ * @return\n+ *   On success - 0\n+ *   On error - 1 with error code set in rte_errno.\n+ *   Possible rte_errno codes are:\n+ *   - EINVAL - invalid pointer\n+ *   - EEXIST - already added QSBR\n+ *   - ENOMEM - memory allocation failure\n+ */\n+__rte_experimental\n+int rte_hash_rcu_qsbr_add(struct rte_hash *h,\n+\t\t\t\tstruct rte_hash_rcu_config *cfg);\n #ifdef __cplusplus\n }\n #endif\ndiff --git a/lib/librte_hash/rte_hash_version.map b/lib/librte_hash/rte_hash_version.map\nindex c0db81014ff9..c6d73080f478 100644\n--- a/lib/librte_hash/rte_hash_version.map\n+++ b/lib/librte_hash/rte_hash_version.map\n@@ -36,5 +36,5 @@ EXPERIMENTAL {\n \trte_hash_lookup_with_hash_bulk;\n \trte_hash_lookup_with_hash_bulk_data;\n \trte_hash_max_key_id;\n-\n+\trte_hash_rcu_qsbr_add;\n };\n",
    "prefixes": [
        "v4",
        "1/3"
    ]
}