From patchwork Wed Aug 19 04:05:37 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Dharmik Thakkar X-Patchwork-Id: 75701 X-Patchwork-Delegate: david.marchand@redhat.com Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id 857B0A04AF; Wed, 19 Aug 2020 06:06:05 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id B7D0B5B30; Wed, 19 Aug 2020 06:06:04 +0200 (CEST) Received: from foss.arm.com (foss.arm.com [217.140.110.172]) by dpdk.org (Postfix) with ESMTP id 26EDE2C02 for ; Wed, 19 Aug 2020 06:06:03 +0200 (CEST) Received: from usa-sjc-imap-foss1.foss.arm.com (unknown [10.121.207.14]) by usa-sjc-mx-foss1.foss.arm.com (Postfix) with ESMTP id 7263431B; Tue, 18 Aug 2020 21:06:02 -0700 (PDT) Received: from localhost.localdomain (2p2660v4-1.austin.arm.com [10.118.12.95]) by usa-sjc-imap-foss1.foss.arm.com (Postfix) with ESMTPSA id 668CF3F71F; Tue, 18 Aug 2020 21:06:02 -0700 (PDT) From: Dharmik Thakkar To: Yipeng Wang , Sameh Gobriel , Bruce Richardson , Ray Kinsella , Neil Horman Cc: dev@dpdk.org, nd@arm.com, Dharmik Thakkar Date: Tue, 18 Aug 2020 23:05:37 -0500 Message-Id: <20200819040537.1792-1-dharmik.thakkar@arm.com> X-Mailer: git-send-email 2.17.1 In-Reply-To: <20190901065810.15137-1-dharmik.thakkar@arm.com> References: <20190901065810.15137-1-dharmik.thakkar@arm.com> Subject: [dpdk-dev] [RFC v2] lib/hash: integrate RCU QSBR X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Integrate RCU QSBR to make it easier for the applications to use lock free algorithm. Resource reclamation implementation was split from the original series, and has already been part of RCU library. Rework the series to base hash integration on RCU reclamation APIs. Refer 'Resource reclamation framework for DPDK' available at [1] to understand various aspects of integrating RCU library into other libraries. [1] https://doc.dpdk.org/guides/prog_guide/rcu_lib.html Introduce a new API rte_hash_rcu_qsbr_add for application to register a RCU variable that hash library will use. Suggested-by: Honnappa Nagarahalli Signed-off-by: Dharmik Thakkar Reviewed-by: Ruifeng Wang --- v2: - Remove defer queue related functions and use resource reclamation APIs from the RCU QSBR library instead - Remove patch (net/ixgbe: avoid multpile definitions of 'bool') from the series as it is already accepted --- lib/Makefile | 2 +- lib/librte_hash/Makefile | 2 +- lib/librte_hash/meson.build | 1 + lib/librte_hash/rte_cuckoo_hash.c | 291 +++++++++++++++++++++------ lib/librte_hash/rte_cuckoo_hash.h | 8 + lib/librte_hash/rte_hash.h | 75 ++++++- lib/librte_hash/rte_hash_version.map | 2 +- 7 files changed, 308 insertions(+), 73 deletions(-) diff --git a/lib/Makefile b/lib/Makefile index 8f5b68a2d469..dffe31c829f0 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -51,7 +51,7 @@ DIRS-$(CONFIG_RTE_LIBRTE_VHOST) += librte_vhost DEPDIRS-librte_vhost := librte_eal librte_mempool librte_mbuf librte_ethdev \ librte_net librte_hash librte_cryptodev DIRS-$(CONFIG_RTE_LIBRTE_HASH) += librte_hash -DEPDIRS-librte_hash := librte_eal librte_ring +DEPDIRS-librte_hash := librte_eal librte_ring librte_rcu DIRS-$(CONFIG_RTE_LIBRTE_EFD) += librte_efd DEPDIRS-librte_efd := librte_eal librte_ring librte_hash DIRS-$(CONFIG_RTE_LIBRTE_RIB) += librte_rib diff --git a/lib/librte_hash/Makefile b/lib/librte_hash/Makefile index ec9f86499262..10e697f48652 100644 --- a/lib/librte_hash/Makefile +++ b/lib/librte_hash/Makefile @@ -8,7 +8,7 @@ LIB = librte_hash.a CFLAGS += -O3 CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -LDLIBS += -lrte_eal -lrte_ring +LDLIBS += -lrte_eal -lrte_ring -lrte_rcu EXPORT_MAP := rte_hash_version.map diff --git a/lib/librte_hash/meson.build b/lib/librte_hash/meson.build index 6ab46ae9d768..0977a63fd279 100644 --- a/lib/librte_hash/meson.build +++ b/lib/librte_hash/meson.build @@ -10,3 +10,4 @@ headers = files('rte_crc_arm64.h', sources = files('rte_cuckoo_hash.c', 'rte_fbk_hash.c') deps += ['ring'] +deps += ['rcu'] diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c index 0a6d474713a2..01c2cbe0e38b 100644 --- a/lib/librte_hash/rte_cuckoo_hash.c +++ b/lib/librte_hash/rte_cuckoo_hash.c @@ -52,6 +52,11 @@ static struct rte_tailq_elem rte_hash_tailq = { }; EAL_REGISTER_TAILQ(rte_hash_tailq) +struct __rte_hash_rcu_dq_entry { + uint32_t key_idx; + uint32_t ext_bkt_idx; /**< Extended bkt index */ +}; + struct rte_hash * rte_hash_find_existing(const char *name) { @@ -210,7 +215,10 @@ rte_hash_create(const struct rte_hash_parameters *params) if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF) { readwrite_concur_lf_support = 1; - /* Enable not freeing internal memory/index on delete */ + /* Enable not freeing internal memory/index on delete. + * If internal RCU is enabled, freeing of internal memory/index + * is done on delete + */ no_free_on_del = 1; } @@ -505,6 +513,10 @@ rte_hash_free(struct rte_hash *h) rte_mcfg_tailq_write_unlock(); + /* RCU clean up. */ + if (h->dq) + rte_rcu_qsbr_dq_delete(h->dq); + if (h->use_local_cache) rte_free(h->local_free_slots); if (h->writer_takes_lock) @@ -612,6 +624,16 @@ rte_hash_reset(struct rte_hash *h) return; __hash_rw_writer_lock(h); + + /* RCU clean up. */ + if (h->hash_rcu_cfg->v) { + rte_rcu_qsbr_dq_delete(h->dq); + h->dq = NULL; + if (rte_hash_rcu_qsbr_add(h, h->hash_rcu_cfg) < 0) { + RTE_LOG(ERR, HASH, "RCU init failed\n"); + return; + } + } memset(h->buckets, 0, h->num_buckets * sizeof(struct rte_hash_bucket)); memset(h->key_store, 0, h->key_entry_size * (h->entries + 1)); *h->tbl_chng_cnt = 0; @@ -952,6 +974,37 @@ rte_hash_cuckoo_make_space_mw(const struct rte_hash *h, return -ENOSPC; } +static inline uint32_t +alloc_slot(const struct rte_hash *h, struct lcore_cache *cached_free_slots) +{ + unsigned int n_slots; + uint32_t slot_id; + if (h->use_local_cache) { + /* Try to get a free slot from the local cache */ + if (cached_free_slots->len == 0) { + /* Need to get another burst of free slots from global ring */ + n_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots, + cached_free_slots->objs, + sizeof(uint32_t), + LCORE_CACHE_SIZE, NULL); + if (n_slots == 0) + return EMPTY_SLOT; + + cached_free_slots->len += n_slots; + } + + /* Get a free slot from the local cache */ + cached_free_slots->len--; + slot_id = cached_free_slots->objs[cached_free_slots->len]; + } else { + if (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id, + sizeof(uint32_t)) != 0) + return EMPTY_SLOT; + } + + return slot_id; +} + static inline int32_t __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, hash_sig_t sig, void *data) @@ -963,7 +1016,6 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, uint32_t ext_bkt_id = 0; uint32_t slot_id; int ret; - unsigned n_slots; unsigned lcore_id; unsigned int i; struct lcore_cache *cached_free_slots = NULL; @@ -1001,28 +1053,19 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, if (h->use_local_cache) { lcore_id = rte_lcore_id(); cached_free_slots = &h->local_free_slots[lcore_id]; - /* Try to get a free slot from the local cache */ - if (cached_free_slots->len == 0) { - /* Need to get another burst of free slots from global ring */ - n_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots, - cached_free_slots->objs, - sizeof(uint32_t), - LCORE_CACHE_SIZE, NULL); - if (n_slots == 0) { - return -ENOSPC; + } + slot_id = alloc_slot(h, cached_free_slots); + if (slot_id == EMPTY_SLOT) { + if (h->hash_rcu_cfg->v) { + if (rte_rcu_qsbr_dq_reclaim(h->dq, + h->hash_rcu_cfg->reclaim_max, + NULL, NULL, NULL) + == 0) { + slot_id = alloc_slot(h, cached_free_slots); } - - cached_free_slots->len += n_slots; } - - /* Get a free slot from the local cache */ - cached_free_slots->len--; - slot_id = cached_free_slots->objs[cached_free_slots->len]; - } else { - if (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id, - sizeof(uint32_t)) != 0) { + if (slot_id == EMPTY_SLOT) return -ENOSPC; - } } new_k = RTE_PTR_ADD(keys, slot_id * h->key_entry_size); @@ -1118,8 +1161,20 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, if (rte_ring_sc_dequeue_elem(h->free_ext_bkts, &ext_bkt_id, sizeof(uint32_t)) != 0 || ext_bkt_id == 0) { - ret = -ENOSPC; - goto failure; + if (h->hash_rcu_cfg->v) { + if (rte_rcu_qsbr_dq_reclaim(h->dq, + h->hash_rcu_cfg->reclaim_max, + NULL, NULL, NULL) + == 0) { + rte_ring_sc_dequeue_elem(h->free_ext_bkts, + &ext_bkt_id, + sizeof(uint32_t)); + } + } + if (ext_bkt_id == 0) { + ret = -ENOSPC; + goto failure; + } } /* Use the first location of the new bucket */ @@ -1395,12 +1450,12 @@ rte_hash_lookup_data(const struct rte_hash *h, const void *key, void **data) return __rte_hash_lookup_with_hash(h, key, rte_hash_hash(h, key), data); } -static inline void -remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i) +static int +free_slot(const struct rte_hash *h, uint32_t slot_id) { unsigned lcore_id, n_slots; struct lcore_cache *cached_free_slots; - + /* Return key indexes to free slot ring */ if (h->use_local_cache) { lcore_id = rte_lcore_id(); cached_free_slots = &h->local_free_slots[lcore_id]; @@ -1411,18 +1466,122 @@ remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i) cached_free_slots->objs, sizeof(uint32_t), LCORE_CACHE_SIZE, NULL); - ERR_IF_TRUE((n_slots == 0), - "%s: could not enqueue free slots in global ring\n", - __func__); + RETURN_IF_TRUE((n_slots == 0), -EFAULT); cached_free_slots->len -= n_slots; } - /* Put index of new free slot in cache. */ - cached_free_slots->objs[cached_free_slots->len] = - bkt->key_idx[i]; - cached_free_slots->len++; + } + + enqueue_slot_back(h, cached_free_slots, slot_id); + return 0; +} + +static void +__hash_rcu_qsbr_free_resource(void *p, void *e, unsigned int n) +{ + void *key_data = NULL; + int ret; + struct rte_hash_key *keys, *k; + struct rte_hash *h = (struct rte_hash *)p; + struct __rte_hash_rcu_dq_entry rcu_dq_entry = + *((struct __rte_hash_rcu_dq_entry *)e); + + RTE_SET_USED(n); + keys = h->key_store; + + k = (struct rte_hash_key *) ((char *)keys + + rcu_dq_entry.key_idx * h->key_entry_size); + key_data = k->pdata; + if (h->hash_rcu_cfg->free_key_data_func) + h->hash_rcu_cfg->free_key_data_func(h->hash_rcu_cfg->key_data_ptr, + key_data); + + if (h->ext_table_support && rcu_dq_entry.ext_bkt_idx != EMPTY_SLOT) + /* Recycle empty ext bkt to free list. */ + rte_ring_sp_enqueue_elem(h->free_ext_bkts, + &rcu_dq_entry.ext_bkt_idx, sizeof(uint32_t)); + + /* Return key indexes to free slot ring */ + ret = free_slot(h, rcu_dq_entry.key_idx); + if (ret < 0) { + RTE_LOG(ERR, HASH, + "%s: could not enqueue free slots in global ring\n", + __func__); + } +} + +int +rte_hash_rcu_qsbr_add(struct rte_hash *h, + struct rte_hash_rcu_config *cfg) +{ + struct rte_rcu_qsbr_dq_parameters params = {0}; + char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE]; + struct rte_hash_rcu_config *hash_rcu_cfg = NULL; + + const uint32_t total_entries = h->use_local_cache ? + h->entries + (RTE_MAX_LCORE - 1) * (LCORE_CACHE_SIZE - 1) + 1 + : h->entries + 1; + + if ((h == NULL) || cfg == NULL) { + rte_errno = EINVAL; + return 1; + } + + hash_rcu_cfg = rte_zmalloc(NULL, sizeof(struct rte_hash_rcu_config), 0); + if (hash_rcu_cfg == NULL) { + RTE_LOG(ERR, HASH, "memory allocation failed\n"); + return 1; + } + + if (cfg->mode == RTE_HASH_QSBR_MODE_SYNC) { + /* No other things to do. */ + } else if (cfg->mode == RTE_HASH_QSBR_MODE_DQ) { + /* Init QSBR defer queue. */ + snprintf(rcu_dq_name, sizeof(rcu_dq_name), + "HASH_RCU_%s", h->name); + params.name = rcu_dq_name; + params.size = cfg->dq_size; + if (params.size == 0) + params.size = total_entries; + params.trigger_reclaim_limit = cfg->reclaim_thd; + if (params.max_reclaim_size == 0) + params.max_reclaim_size = RTE_HASH_RCU_DQ_RECLAIM_MAX; + params.esize = sizeof(struct __rte_hash_rcu_dq_entry); + params.free_fn = __hash_rcu_qsbr_free_resource; + params.p = h; + params.v = cfg->v; + h->dq = rte_rcu_qsbr_dq_create(¶ms); + if (h->dq == NULL) { + rte_free(hash_rcu_cfg); + RTE_LOG(ERR, HASH, "HASH defer queue creation failed\n"); + return 1; + } } else { - rte_ring_sp_enqueue_elem(h->free_slots, - &bkt->key_idx[i], sizeof(uint32_t)); + rte_free(hash_rcu_cfg); + rte_errno = EINVAL; + return 1; + } + + hash_rcu_cfg->v = cfg->v; + hash_rcu_cfg->mode = cfg->mode; + hash_rcu_cfg->dq_size = cfg->dq_size; + hash_rcu_cfg->reclaim_thd = cfg->reclaim_thd; + hash_rcu_cfg->reclaim_max = cfg->reclaim_max; + hash_rcu_cfg->free_key_data_func = cfg->free_key_data_func; + hash_rcu_cfg->key_data_ptr = cfg->key_data_ptr; + + h->hash_rcu_cfg = hash_rcu_cfg; + + return 0; +} + +static inline void +remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i) +{ + int ret = free_slot(h, bkt->key_idx[i]); + if (ret < 0) { + RTE_LOG(ERR, HASH, + "%s: could not enqueue free slots in global ring\n", + __func__); } } @@ -1521,6 +1680,8 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key, int pos; int32_t ret, i; uint16_t short_sig; + uint32_t index = EMPTY_SLOT; + struct __rte_hash_rcu_dq_entry rcu_dq_entry; short_sig = get_short_sig(sig); prim_bucket_idx = get_prim_bucket_index(h, sig); @@ -1555,10 +1716,9 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key, /* Search last bucket to see if empty to be recycled */ return_bkt: - if (!last_bkt) { - __hash_rw_writer_unlock(h); - return ret; - } + if (!last_bkt) + goto return_key; + while (last_bkt->next) { prev_bkt = last_bkt; last_bkt = last_bkt->next; @@ -1571,11 +1731,11 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key, /* found empty bucket and recycle */ if (i == RTE_HASH_BUCKET_ENTRIES) { prev_bkt->next = NULL; - uint32_t index = last_bkt - h->buckets_ext + 1; + index = last_bkt - h->buckets_ext + 1; /* Recycle the empty bkt if * no_free_on_del is disabled. */ - if (h->no_free_on_del) + if (h->no_free_on_del) { /* Store index of an empty ext bkt to be recycled * on calling rte_hash_del_xxx APIs. * When lock free read-write concurrency is enabled, @@ -1583,12 +1743,32 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key, * immediately (as readers might be using it still). * Hence freeing of the ext bkt is piggy-backed to * freeing of the key index. + * If using external RCU, store this index in an array. */ - h->ext_bkt_to_free[ret] = index; - else + if (h->hash_rcu_cfg->v == NULL) + h->ext_bkt_to_free[ret] = index; + } else rte_ring_sp_enqueue_elem(h->free_ext_bkts, &index, sizeof(uint32_t)); } + +return_key: + /* Using internal RCU QSBR */ + if (h->hash_rcu_cfg->v) { + /* Key index where key is stored, adding the first dummy index */ + rcu_dq_entry.key_idx = ret + 1; + rcu_dq_entry.ext_bkt_idx = index; + if (h->hash_rcu_cfg->mode == RTE_HASH_QSBR_MODE_SYNC) { + /* Wait for quiescent state change. */ + rte_rcu_qsbr_synchronize(h->hash_rcu_cfg->v, + RTE_QSBR_THRID_INVALID); + __hash_rcu_qsbr_free_resource((void *)((uintptr_t)h), + &rcu_dq_entry, 1); + } else if (h->hash_rcu_cfg->mode == RTE_HASH_QSBR_MODE_DQ) + /* Push into QSBR FIFO. */ + if (rte_rcu_qsbr_dq_enqueue(h->dq, &rcu_dq_entry) != 0) + RTE_LOG(ERR, HASH, "Failed to push QSBR FIFO\n"); + } __hash_rw_writer_unlock(h); return ret; } @@ -1637,8 +1817,6 @@ rte_hash_free_key_with_position(const struct rte_hash *h, RETURN_IF_TRUE(((h == NULL) || (key_idx == EMPTY_SLOT)), -EINVAL); - unsigned int lcore_id, n_slots; - struct lcore_cache *cached_free_slots; const uint32_t total_entries = h->use_local_cache ? h->entries + (RTE_MAX_LCORE - 1) * (LCORE_CACHE_SIZE - 1) + 1 : h->entries + 1; @@ -1656,28 +1834,9 @@ rte_hash_free_key_with_position(const struct rte_hash *h, } } - if (h->use_local_cache) { - lcore_id = rte_lcore_id(); - cached_free_slots = &h->local_free_slots[lcore_id]; - /* Cache full, need to free it. */ - if (cached_free_slots->len == LCORE_CACHE_SIZE) { - /* Need to enqueue the free slots in global ring. */ - n_slots = rte_ring_mp_enqueue_burst_elem(h->free_slots, - cached_free_slots->objs, - sizeof(uint32_t), - LCORE_CACHE_SIZE, NULL); - RETURN_IF_TRUE((n_slots == 0), -EFAULT); - cached_free_slots->len -= n_slots; - } - /* Put index of new free slot in cache. */ - cached_free_slots->objs[cached_free_slots->len] = key_idx; - cached_free_slots->len++; - } else { - rte_ring_sp_enqueue_elem(h->free_slots, &key_idx, - sizeof(uint32_t)); - } + /* Enqueue slot to cache/ring of free slots. */ + return free_slot(h, key_idx); - return 0; } static inline void diff --git a/lib/librte_hash/rte_cuckoo_hash.h b/lib/librte_hash/rte_cuckoo_hash.h index 345de6bf9cfd..85be49d3bbe7 100644 --- a/lib/librte_hash/rte_cuckoo_hash.h +++ b/lib/librte_hash/rte_cuckoo_hash.h @@ -168,6 +168,11 @@ struct rte_hash { struct lcore_cache *local_free_slots; /**< Local cache per lcore, storing some indexes of the free slots */ + /* RCU config */ + struct rte_hash_rcu_config *hash_rcu_cfg; + /**< HASH RCU QSBR configuration structure */ + struct rte_rcu_qsbr_dq *dq; /**< RCU QSBR defer queue. */ + /* Fields used in lookup */ uint32_t key_len __rte_cache_aligned; @@ -230,4 +235,7 @@ struct queue_node { int prev_slot; /* Parent(slot) in search path */ }; +/** @internal Default RCU defer queue entries to reclaim in one go. */ +#define RTE_HASH_RCU_DQ_RECLAIM_MAX 16 + #endif diff --git a/lib/librte_hash/rte_hash.h b/lib/librte_hash/rte_hash.h index bff40251bc98..5431bcf4aeb1 100644 --- a/lib/librte_hash/rte_hash.h +++ b/lib/librte_hash/rte_hash.h @@ -15,6 +15,7 @@ #include #include +#include #ifdef __cplusplus extern "C" { @@ -45,7 +46,8 @@ extern "C" { /** Flag to disable freeing of key index on hash delete. * Refer to rte_hash_del_xxx APIs for more details. * This is enabled by default when RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF - * is enabled. + * is enabled. However, if internal RCU is enabled, freeing of internal + * memory/index is done on delete */ #define RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL 0x10 @@ -67,6 +69,13 @@ typedef uint32_t (*rte_hash_function)(const void *key, uint32_t key_len, /** Type of function used to compare the hash key. */ typedef int (*rte_hash_cmp_eq_t)(const void *key1, const void *key2, size_t key_len); +/** + * Type of function used to free data stored in the key. + * Required when using internal RCU to allow application to free key-data once + * the key is returned to the the ring of free key-slots. + */ +typedef void (*rte_hash_free_key_data)(void *p, void *key_data); + /** * Parameters used when creating the hash table. */ @@ -81,6 +90,39 @@ struct rte_hash_parameters { uint8_t extra_flag; /**< Indicate if additional parameters are present. */ }; +/** RCU reclamation modes */ +enum rte_hash_qsbr_mode { + /** Create defer queue for reclaim. */ + RTE_HASH_QSBR_MODE_DQ = 0, + /** Use blocking mode reclaim. No defer queue created. */ + RTE_HASH_QSBR_MODE_SYNC +}; + +/** HASH RCU QSBR configuration structure. */ +struct rte_hash_rcu_config { + struct rte_rcu_qsbr *v; /**< RCU QSBR variable. */ + enum rte_hash_qsbr_mode mode; + /**< Mode of RCU QSBR. RTE_HASH_QSBR_MODE_xxx + * '0' for default: create defer queue for reclaim. + */ + uint32_t dq_size; + /**< RCU defer queue size. + * default: total hash table entries. + */ + uint32_t reclaim_thd; /**< Threshold to trigger auto reclaim. */ + uint32_t reclaim_max; + /**< Max entries to reclaim in one go. + * default: RTE_HASH_RCU_DQ_RECLAIM_MAX. + */ + void *key_data_ptr; + /**< Pointer passed to the free function. Typically, this is the + * pointer to the data structure to which the resource to free + * (key-data) belongs. This can be NULL. + */ + rte_hash_free_key_data free_key_data_func; + /**< Function to call to free the resource (key-data). */ +}; + /** @internal A hash table structure. */ struct rte_hash; @@ -287,7 +329,8 @@ rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, hash_sig_t * Thread safety can be enabled by setting flag during * table creation. * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or - * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled, + * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and + * internal RCU is NOT enabled, * the key index returned by rte_hash_add_key_xxx APIs will not be * freed by this API. rte_hash_free_key_with_position API must be called * additionally to free the index associated with the key. @@ -316,7 +359,8 @@ rte_hash_del_key(const struct rte_hash *h, const void *key); * Thread safety can be enabled by setting flag during * table creation. * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or - * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled, + * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and + * internal RCU is NOT enabled, * the key index returned by rte_hash_add_key_xxx APIs will not be * freed by this API. rte_hash_free_key_with_position API must be called * additionally to free the index associated with the key. @@ -370,7 +414,8 @@ rte_hash_get_key_with_position(const struct rte_hash *h, const int32_t position, * only be called from one thread by default. Thread safety * can be enabled by setting flag during table creation. * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or - * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled, + * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and + * internal RCU is NOT enabled, * the key index returned by rte_hash_del_key_xxx APIs must be freed * using this API. This API should be called after all the readers * have stopped referencing the entry corresponding to this key. @@ -625,6 +670,28 @@ rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys, */ int32_t rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32_t *next); + +/** + * @warning + * @b EXPERIMENTAL: this API may change without prior notice + * + * Associate RCU QSBR variable with an Hash object. + * + * @param h + * the hash object to add RCU QSBR + * @param cfg + * RCU QSBR configuration + * @return + * On success - 0 + * On error - 1 with error code set in rte_errno. + * Possible rte_errno codes are: + * - EINVAL - invalid pointer + * - EEXIST - already added QSBR + * - ENOMEM - memory allocation failure + */ +__rte_experimental +int rte_hash_rcu_qsbr_add(struct rte_hash *h, + struct rte_hash_rcu_config *cfg); #ifdef __cplusplus } #endif diff --git a/lib/librte_hash/rte_hash_version.map b/lib/librte_hash/rte_hash_version.map index c0db81014ff9..c6d73080f478 100644 --- a/lib/librte_hash/rte_hash_version.map +++ b/lib/librte_hash/rte_hash_version.map @@ -36,5 +36,5 @@ EXPERIMENTAL { rte_hash_lookup_with_hash_bulk; rte_hash_lookup_with_hash_bulk_data; rte_hash_max_key_id; - + rte_hash_rcu_qsbr_add; };