From patchwork Mon Apr 20 12:28:22 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Ananyev, Konstantin" X-Patchwork-Id: 68952 X-Patchwork-Delegate: david.marchand@redhat.com Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id 4B079A0561; Mon, 20 Apr 2020 14:28:51 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 3E50E1D5CD; Mon, 20 Apr 2020 14:28:48 +0200 (CEST) Received: from mga17.intel.com (mga17.intel.com [192.55.52.151]) by dpdk.org (Postfix) with ESMTP id 0EABB1D5C9 for ; Mon, 20 Apr 2020 14:28:43 +0200 (CEST) IronPort-SDR: tK/3sU2YriMVVkWMWMspip0PrOVizP5GXnjxaCA+g9vtG3FMEwID5xMrT30Ba1FpCkNXComQ4l /qMhizEbTCbQ== X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga003.jf.intel.com ([10.7.209.27]) by fmsmga107.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 20 Apr 2020 05:28:43 -0700 IronPort-SDR: HmF++8n1RRzkjF9Eotnmolp6X10Uh234xKXaG9fH+m+slOUVtuUhJ0RhHwMd88dhvdvmNWv1ae RFIdrn8ZUWdQ== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.72,406,1580803200"; d="scan'208";a="254923871" Received: from sivswdev08.ir.intel.com ([10.237.217.47]) by orsmga003.jf.intel.com with ESMTP; 20 Apr 2020 05:28:42 -0700 From: Konstantin Ananyev To: dev@dpdk.org Cc: honnappa.nagarahalli@arm.com, david.marchand@redhat.com, jielong.zjl@antfin.com, Konstantin Ananyev Date: Mon, 20 Apr 2020 13:28:22 +0100 Message-Id: <20200420122831.16973-2-konstantin.ananyev@intel.com> X-Mailer: git-send-email 2.18.0 In-Reply-To: <20200420122831.16973-1-konstantin.ananyev@intel.com> References: <20200420121113.9327-1-konstantin.ananyev@intel.com> <20200420122831.16973-1-konstantin.ananyev@intel.com> Subject: [dpdk-dev] [PATCH v7 01/10] test/ring: add contention stress test X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Introduce stress test for ring enqueue/dequeue operations. Performs the following pattern on each slave worker: dequeue/read-write data from the dequeued objects/enqueue. Serves as both functional and performance test of ring enqueue/dequeue operations under high contention (for both over committed and non-over committed scenarios). Signed-off-by: Konstantin Ananyev Acked-by: Honnappa Nagarahalli --- app/test/Makefile | 2 + app/test/meson.build | 2 + app/test/test_ring_mpmc_stress.c | 31 +++ app/test/test_ring_stress.c | 48 ++++ app/test/test_ring_stress.h | 35 +++ app/test/test_ring_stress_impl.h | 396 +++++++++++++++++++++++++++++++ 6 files changed, 514 insertions(+) create mode 100644 app/test/test_ring_mpmc_stress.c create mode 100644 app/test/test_ring_stress.c create mode 100644 app/test/test_ring_stress.h create mode 100644 app/test/test_ring_stress_impl.h diff --git a/app/test/Makefile b/app/test/Makefile index be53d33c3..a23a011df 100644 --- a/app/test/Makefile +++ b/app/test/Makefile @@ -77,7 +77,9 @@ SRCS-y += test_external_mem.c SRCS-y += test_rand_perf.c SRCS-y += test_ring.c +SRCS-y += test_ring_mpmc_stress.c SRCS-y += test_ring_perf.c +SRCS-y += test_ring_stress.c SRCS-y += test_pmd_perf.c ifeq ($(CONFIG_RTE_LIBRTE_TABLE),y) diff --git a/app/test/meson.build b/app/test/meson.build index 04b59cffa..8824f366c 100644 --- a/app/test/meson.build +++ b/app/test/meson.build @@ -100,7 +100,9 @@ test_sources = files('commands.c', 'test_rib.c', 'test_rib6.c', 'test_ring.c', + 'test_ring_mpmc_stress.c', 'test_ring_perf.c', + 'test_ring_stress.c', 'test_rwlock.c', 'test_sched.c', 'test_service_cores.c', diff --git a/app/test/test_ring_mpmc_stress.c b/app/test/test_ring_mpmc_stress.c new file mode 100644 index 000000000..1524b0248 --- /dev/null +++ b/app/test/test_ring_mpmc_stress.c @@ -0,0 +1,31 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2020 Intel Corporation + */ + +#include "test_ring_stress_impl.h" + +static inline uint32_t +_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n, + uint32_t *avail) +{ + return rte_ring_mc_dequeue_bulk(r, obj, n, avail); +} + +static inline uint32_t +_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n, + uint32_t *free) +{ + return rte_ring_mp_enqueue_bulk(r, obj, n, free); +} + +static int +_st_ring_init(struct rte_ring *r, const char *name, uint32_t num) +{ + return rte_ring_init(r, name, num, 0); +} + +const struct test test_ring_mpmc_stress = { + .name = "MP/MC", + .nb_case = RTE_DIM(tests), + .cases = tests, +}; diff --git a/app/test/test_ring_stress.c b/app/test/test_ring_stress.c new file mode 100644 index 000000000..60706f799 --- /dev/null +++ b/app/test/test_ring_stress.c @@ -0,0 +1,48 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2020 Intel Corporation + */ + +#include "test_ring_stress.h" + +static int +run_test(const struct test *test) +{ + int32_t rc; + uint32_t i, k; + + for (i = 0, k = 0; i != test->nb_case; i++) { + + printf("TEST-CASE %s %s START\n", + test->name, test->cases[i].name); + + rc = test->cases[i].func(test->cases[i].wfunc); + k += (rc == 0); + + if (rc != 0) + printf("TEST-CASE %s %s FAILED\n", + test->name, test->cases[i].name); + else + printf("TEST-CASE %s %s OK\n", + test->name, test->cases[i].name); + } + + return k; +} + +static int +test_ring_stress(void) +{ + uint32_t n, k; + + n = 0; + k = 0; + + n += test_ring_mpmc_stress.nb_case; + k += run_test(&test_ring_mpmc_stress); + + printf("Number of tests:\t%u\nSuccess:\t%u\nFailed:\t%u\n", + n, k, n - k); + return (k != n); +} + +REGISTER_TEST_COMMAND(ring_stress_autotest, test_ring_stress); diff --git a/app/test/test_ring_stress.h b/app/test/test_ring_stress.h new file mode 100644 index 000000000..60eac6216 --- /dev/null +++ b/app/test/test_ring_stress.h @@ -0,0 +1,35 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2020 Intel Corporation + */ + + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "test.h" + +struct test_case { + const char *name; + int (*func)(int (*)(void *)); + int (*wfunc)(void *arg); +}; + +struct test { + const char *name; + uint32_t nb_case; + const struct test_case *cases; +}; + +extern const struct test test_ring_mpmc_stress; diff --git a/app/test/test_ring_stress_impl.h b/app/test/test_ring_stress_impl.h new file mode 100644 index 000000000..222d62bc4 --- /dev/null +++ b/app/test/test_ring_stress_impl.h @@ -0,0 +1,396 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2020 Intel Corporation + */ + +#include "test_ring_stress.h" + +/** + * Stress test for ring enqueue/dequeue operations. + * Performs the following pattern on each slave worker: + * dequeue/read-write data from the dequeued objects/enqueue. + * Serves as both functional and performance test of ring + * enqueue/dequeue operations under high contention + * (for both over committed and non-over committed scenarios). + */ + +#define RING_NAME "RING_STRESS" +#define BULK_NUM 32 +#define RING_SIZE (2 * BULK_NUM * RTE_MAX_LCORE) + +enum { + WRK_CMD_STOP, + WRK_CMD_RUN, +}; + +static volatile uint32_t wrk_cmd __rte_cache_aligned; + +/* test run-time in seconds */ +static const uint32_t run_time = 60; +static const uint32_t verbose; + +struct lcore_stat { + uint64_t nb_cycle; + struct { + uint64_t nb_call; + uint64_t nb_obj; + uint64_t nb_cycle; + uint64_t max_cycle; + uint64_t min_cycle; + } op; +}; + +struct lcore_arg { + struct rte_ring *rng; + struct lcore_stat stats; +} __rte_cache_aligned; + +struct ring_elem { + uint32_t cnt[RTE_CACHE_LINE_SIZE / sizeof(uint32_t)]; +} __rte_cache_aligned; + +/* + * redefinable functions + */ +static uint32_t +_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n, + uint32_t *avail); + +static uint32_t +_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n, + uint32_t *free); + +static int +_st_ring_init(struct rte_ring *r, const char *name, uint32_t num); + + +static void +lcore_stat_update(struct lcore_stat *ls, uint64_t call, uint64_t obj, + uint64_t tm, int32_t prcs) +{ + ls->op.nb_call += call; + ls->op.nb_obj += obj; + ls->op.nb_cycle += tm; + if (prcs) { + ls->op.max_cycle = RTE_MAX(ls->op.max_cycle, tm); + ls->op.min_cycle = RTE_MIN(ls->op.min_cycle, tm); + } +} + +static void +lcore_op_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls) +{ + + ms->op.nb_call += ls->op.nb_call; + ms->op.nb_obj += ls->op.nb_obj; + ms->op.nb_cycle += ls->op.nb_cycle; + ms->op.max_cycle = RTE_MAX(ms->op.max_cycle, ls->op.max_cycle); + ms->op.min_cycle = RTE_MIN(ms->op.min_cycle, ls->op.min_cycle); +} + +static void +lcore_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls) +{ + ms->nb_cycle = RTE_MAX(ms->nb_cycle, ls->nb_cycle); + lcore_op_stat_aggr(ms, ls); +} + +static void +lcore_stat_dump(FILE *f, uint32_t lc, const struct lcore_stat *ls) +{ + long double st; + + st = (long double)rte_get_timer_hz() / US_PER_S; + + if (lc == UINT32_MAX) + fprintf(f, "%s(AGGREGATE)={\n", __func__); + else + fprintf(f, "%s(lcore=%u)={\n", __func__, lc); + + fprintf(f, "\tnb_cycle=%" PRIu64 "(%.2Lf usec),\n", + ls->nb_cycle, (long double)ls->nb_cycle / st); + + fprintf(f, "\tDEQ+ENQ={\n"); + + fprintf(f, "\t\tnb_call=%" PRIu64 ",\n", ls->op.nb_call); + fprintf(f, "\t\tnb_obj=%" PRIu64 ",\n", ls->op.nb_obj); + fprintf(f, "\t\tnb_cycle=%" PRIu64 ",\n", ls->op.nb_cycle); + fprintf(f, "\t\tobj/call(avg): %.2Lf\n", + (long double)ls->op.nb_obj / ls->op.nb_call); + fprintf(f, "\t\tcycles/obj(avg): %.2Lf\n", + (long double)ls->op.nb_cycle / ls->op.nb_obj); + fprintf(f, "\t\tcycles/call(avg): %.2Lf\n", + (long double)ls->op.nb_cycle / ls->op.nb_call); + + /* if min/max cycles per call stats was collected */ + if (ls->op.min_cycle != UINT64_MAX) { + fprintf(f, "\t\tmax cycles/call=%" PRIu64 "(%.2Lf usec),\n", + ls->op.max_cycle, + (long double)ls->op.max_cycle / st); + fprintf(f, "\t\tmin cycles/call=%" PRIu64 "(%.2Lf usec),\n", + ls->op.min_cycle, + (long double)ls->op.min_cycle / st); + } + + fprintf(f, "\t},\n"); + fprintf(f, "};\n"); +} + +static void +fill_ring_elm(struct ring_elem *elm, uint32_t fill) +{ + uint32_t i; + + for (i = 0; i != RTE_DIM(elm->cnt); i++) + elm->cnt[i] = fill; +} + +static int32_t +check_updt_elem(struct ring_elem *elm[], uint32_t num, + const struct ring_elem *check, const struct ring_elem *fill) +{ + uint32_t i; + + static rte_spinlock_t dump_lock; + + for (i = 0; i != num; i++) { + if (memcmp(check, elm[i], sizeof(*check)) != 0) { + rte_spinlock_lock(&dump_lock); + printf("%s(lc=%u, num=%u) failed at %u-th iter, " + "offending object: %p\n", + __func__, rte_lcore_id(), num, i, elm[i]); + rte_memdump(stdout, "expected", check, sizeof(*check)); + rte_memdump(stdout, "result", elm[i], sizeof(elm[i])); + rte_spinlock_unlock(&dump_lock); + return -EINVAL; + } + memcpy(elm[i], fill, sizeof(*elm[i])); + } + + return 0; +} + +static int +check_ring_op(uint32_t exp, uint32_t res, uint32_t lc, + const char *fname, const char *opname) +{ + if (exp != res) { + printf("%s(lc=%u) failure: %s expected: %u, returned %u\n", + fname, lc, opname, exp, res); + return -ENOSPC; + } + return 0; +} + +static int +test_worker(void *arg, const char *fname, int32_t prcs) +{ + int32_t rc; + uint32_t lc, n, num; + uint64_t cl, tm0, tm1; + struct lcore_arg *la; + struct ring_elem def_elm, loc_elm; + struct ring_elem *obj[2 * BULK_NUM]; + + la = arg; + lc = rte_lcore_id(); + + fill_ring_elm(&def_elm, UINT32_MAX); + fill_ring_elm(&loc_elm, lc); + + while (wrk_cmd != WRK_CMD_RUN) { + rte_smp_rmb(); + rte_pause(); + } + + cl = rte_rdtsc_precise(); + + do { + /* num in interval [7/8, 11/8] of BULK_NUM */ + num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2); + + /* reset all pointer values */ + memset(obj, 0, sizeof(obj)); + + /* dequeue num elems */ + tm0 = (prcs != 0) ? rte_rdtsc_precise() : 0; + n = _st_ring_dequeue_bulk(la->rng, (void **)obj, num, NULL); + tm0 = (prcs != 0) ? rte_rdtsc_precise() - tm0 : 0; + + /* check return value and objects */ + rc = check_ring_op(num, n, lc, fname, + RTE_STR(_st_ring_dequeue_bulk)); + if (rc == 0) + rc = check_updt_elem(obj, num, &def_elm, &loc_elm); + if (rc != 0) + break; + + /* enqueue num elems */ + rte_compiler_barrier(); + rc = check_updt_elem(obj, num, &loc_elm, &def_elm); + if (rc != 0) + break; + + tm1 = (prcs != 0) ? rte_rdtsc_precise() : 0; + n = _st_ring_enqueue_bulk(la->rng, (void **)obj, num, NULL); + tm1 = (prcs != 0) ? rte_rdtsc_precise() - tm1 : 0; + + /* check return value */ + rc = check_ring_op(num, n, lc, fname, + RTE_STR(_st_ring_enqueue_bulk)); + if (rc != 0) + break; + + lcore_stat_update(&la->stats, 1, num, tm0 + tm1, prcs); + + } while (wrk_cmd == WRK_CMD_RUN); + + cl = rte_rdtsc_precise() - cl; + if (prcs == 0) + lcore_stat_update(&la->stats, 0, 0, cl, 0); + la->stats.nb_cycle = cl; + return rc; +} +static int +test_worker_prcs(void *arg) +{ + return test_worker(arg, __func__, 1); +} + +static int +test_worker_avg(void *arg) +{ + return test_worker(arg, __func__, 0); +} + +static void +mt1_fini(struct rte_ring *rng, void *data) +{ + rte_free(rng); + rte_free(data); +} + +static int +mt1_init(struct rte_ring **rng, void **data, uint32_t num) +{ + int32_t rc; + size_t sz; + uint32_t i, nr; + struct rte_ring *r; + struct ring_elem *elm; + void *p; + + *rng = NULL; + *data = NULL; + + sz = num * sizeof(*elm); + elm = rte_zmalloc(NULL, sz, __alignof__(*elm)); + if (elm == NULL) { + printf("%s: alloc(%zu) for %u elems data failed", + __func__, sz, num); + return -ENOMEM; + } + + *data = elm; + + /* alloc ring */ + nr = 2 * num; + sz = rte_ring_get_memsize(nr); + r = rte_zmalloc(NULL, sz, __alignof__(*r)); + if (r == NULL) { + printf("%s: alloc(%zu) for FIFO with %u elems failed", + __func__, sz, nr); + return -ENOMEM; + } + + *rng = r; + + rc = _st_ring_init(r, RING_NAME, nr); + if (rc != 0) { + printf("%s: _st_ring_init(%p, %u) failed, error: %d(%s)\n", + __func__, r, nr, rc, strerror(-rc)); + return rc; + } + + for (i = 0; i != num; i++) { + fill_ring_elm(elm + i, UINT32_MAX); + p = elm + i; + if (_st_ring_enqueue_bulk(r, &p, 1, NULL) != 1) + break; + } + + if (i != num) { + printf("%s: _st_ring_enqueue(%p, %u) returned %u\n", + __func__, r, num, i); + return -ENOSPC; + } + + return 0; +} + +static int +test_mt1(int (*test)(void *)) +{ + int32_t rc; + uint32_t lc, mc; + struct rte_ring *r; + void *data; + struct lcore_arg arg[RTE_MAX_LCORE]; + + static const struct lcore_stat init_stat = { + .op.min_cycle = UINT64_MAX, + }; + + rc = mt1_init(&r, &data, RING_SIZE); + if (rc != 0) { + mt1_fini(r, data); + return rc; + } + + memset(arg, 0, sizeof(arg)); + + /* launch on all slaves */ + RTE_LCORE_FOREACH_SLAVE(lc) { + arg[lc].rng = r; + arg[lc].stats = init_stat; + rte_eal_remote_launch(test, &arg[lc], lc); + } + + /* signal worker to start test */ + wrk_cmd = WRK_CMD_RUN; + rte_smp_wmb(); + + usleep(run_time * US_PER_S); + + /* signal worker to start test */ + wrk_cmd = WRK_CMD_STOP; + rte_smp_wmb(); + + /* wait for slaves and collect stats. */ + mc = rte_lcore_id(); + arg[mc].stats = init_stat; + + rc = 0; + RTE_LCORE_FOREACH_SLAVE(lc) { + rc |= rte_eal_wait_lcore(lc); + lcore_stat_aggr(&arg[mc].stats, &arg[lc].stats); + if (verbose != 0) + lcore_stat_dump(stdout, lc, &arg[lc].stats); + } + + lcore_stat_dump(stdout, UINT32_MAX, &arg[mc].stats); + mt1_fini(r, data); + return rc; +} + +static const struct test_case tests[] = { + { + .name = "MT-WRK_ENQ_DEQ-MST_NONE-PRCS", + .func = test_mt1, + .wfunc = test_worker_prcs, + }, + { + .name = "MT-WRK_ENQ_DEQ-MST_NONE-AVG", + .func = test_mt1, + .wfunc = test_worker_avg, + }, +}; From patchwork Mon Apr 20 12:28:23 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Ananyev, Konstantin" X-Patchwork-Id: 68953 X-Patchwork-Delegate: david.marchand@redhat.com Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id 467EEA0561; Mon, 20 Apr 2020 14:29:04 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id E71A91D5C9; Mon, 20 Apr 2020 14:28:54 +0200 (CEST) Received: from mga17.intel.com (mga17.intel.com [192.55.52.151]) by dpdk.org (Postfix) with ESMTP id 1DD821D5C9 for ; Mon, 20 Apr 2020 14:28:46 +0200 (CEST) IronPort-SDR: +gsj0lFL7VAtx7TMJz8+DQH6GqduNpVvfuhDJ9tAuLne7ayou6O7RFjLJKAnD6teMj6N1ajmGZ IaA4lvQv5G9A== X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga003.jf.intel.com ([10.7.209.27]) by fmsmga107.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 20 Apr 2020 05:28:46 -0700 IronPort-SDR: Zv06YEYgq2CVv3iechiZdp1U9IU9JiCa0/luIC5itygvj7IHJky9tkeH4VzcG0PRlRGtyUrI8e V8q+YjLfMXqw== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.72,406,1580803200"; d="scan'208";a="254923878" Received: from sivswdev08.ir.intel.com ([10.237.217.47]) by orsmga003.jf.intel.com with ESMTP; 20 Apr 2020 05:28:44 -0700 From: Konstantin Ananyev To: dev@dpdk.org Cc: honnappa.nagarahalli@arm.com, david.marchand@redhat.com, jielong.zjl@antfin.com, Konstantin Ananyev Date: Mon, 20 Apr 2020 13:28:23 +0100 Message-Id: <20200420122831.16973-3-konstantin.ananyev@intel.com> X-Mailer: git-send-email 2.18.0 In-Reply-To: <20200420122831.16973-1-konstantin.ananyev@intel.com> References: <20200420121113.9327-1-konstantin.ananyev@intel.com> <20200420122831.16973-1-konstantin.ananyev@intel.com> Subject: [dpdk-dev] [PATCH v7 02/10] ring: prepare ring to allow new sync schemes X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" To make these preparations two main things are done: - Change from *single* to *sync_type* to allow different synchronisation schemes to be applied. Mark *single* as deprecated in comments. Add new functions to allow user to query ring sync types. Replace direct access to *single* with appropriate function call. - Move actual rte_ring and related structures definitions into a separate file: . It allows to refer contents of from without introducing a circular dependency. Signed-off-by: Konstantin Ananyev Acked-by: Honnappa Nagarahalli --- app/test/test_pdump.c | 6 +- lib/librte_pdump/rte_pdump.c | 2 +- lib/librte_port/rte_port_ring.c | 12 +-- lib/librte_ring/Makefile | 1 + lib/librte_ring/meson.build | 1 + lib/librte_ring/rte_ring.c | 6 +- lib/librte_ring/rte_ring.h | 170 ++++++++++++++------------------ lib/librte_ring/rte_ring_core.h | 132 +++++++++++++++++++++++++ lib/librte_ring/rte_ring_elem.h | 42 +++----- 9 files changed, 234 insertions(+), 138 deletions(-) create mode 100644 lib/librte_ring/rte_ring_core.h diff --git a/app/test/test_pdump.c b/app/test/test_pdump.c index ad183184c..6a1180bcb 100644 --- a/app/test/test_pdump.c +++ b/app/test/test_pdump.c @@ -57,8 +57,7 @@ run_pdump_client_tests(void) if (ret < 0) return -1; mp->flags = 0x0000; - ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(), - RING_F_SP_ENQ | RING_F_SC_DEQ); + ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(), 0); if (ring_client == NULL) { printf("rte_ring_create SR0 failed"); return -1; @@ -71,9 +70,6 @@ run_pdump_client_tests(void) } rte_eth_dev_probing_finish(eth_dev); - ring_client->prod.single = 0; - ring_client->cons.single = 0; - printf("\n***** flags = RTE_PDUMP_FLAG_TX *****\n"); for (itr = 0; itr < NUM_ITR; itr++) { diff --git a/lib/librte_pdump/rte_pdump.c b/lib/librte_pdump/rte_pdump.c index 8a01ac510..f96709f95 100644 --- a/lib/librte_pdump/rte_pdump.c +++ b/lib/librte_pdump/rte_pdump.c @@ -380,7 +380,7 @@ pdump_validate_ring_mp(struct rte_ring *ring, struct rte_mempool *mp) rte_errno = EINVAL; return -1; } - if (ring->prod.single || ring->cons.single) { + if (rte_ring_is_prod_single(ring) || rte_ring_is_cons_single(ring)) { PDUMP_LOG(ERR, "ring with either SP or SC settings" " is not valid for pdump, should have MP and MC settings\n"); rte_errno = EINVAL; diff --git a/lib/librte_port/rte_port_ring.c b/lib/librte_port/rte_port_ring.c index 47fcdd06a..52b2d8e55 100644 --- a/lib/librte_port/rte_port_ring.c +++ b/lib/librte_port/rte_port_ring.c @@ -44,8 +44,8 @@ rte_port_ring_reader_create_internal(void *params, int socket_id, /* Check input parameters */ if ((conf == NULL) || (conf->ring == NULL) || - (conf->ring->cons.single && is_multi) || - (!(conf->ring->cons.single) && !is_multi)) { + (rte_ring_is_cons_single(conf->ring) && is_multi) || + (!rte_ring_is_cons_single(conf->ring) && !is_multi)) { RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__); return NULL; } @@ -171,8 +171,8 @@ rte_port_ring_writer_create_internal(void *params, int socket_id, /* Check input parameters */ if ((conf == NULL) || (conf->ring == NULL) || - (conf->ring->prod.single && is_multi) || - (!(conf->ring->prod.single) && !is_multi) || + (rte_ring_is_prod_single(conf->ring) && is_multi) || + (!rte_ring_is_prod_single(conf->ring) && !is_multi) || (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) { RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__); return NULL; @@ -440,8 +440,8 @@ rte_port_ring_writer_nodrop_create_internal(void *params, int socket_id, /* Check input parameters */ if ((conf == NULL) || (conf->ring == NULL) || - (conf->ring->prod.single && is_multi) || - (!(conf->ring->prod.single) && !is_multi) || + (rte_ring_is_prod_single(conf->ring) && is_multi) || + (!rte_ring_is_prod_single(conf->ring) && !is_multi) || (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) { RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__); return NULL; diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index 28368e6d1..6572768c9 100644 --- a/lib/librte_ring/Makefile +++ b/lib/librte_ring/Makefile @@ -16,6 +16,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c # install includes SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ + rte_ring_core.h \ rte_ring_elem.h \ rte_ring_generic.h \ rte_ring_c11_mem.h diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index 05402e4f0..c656781da 100644 --- a/lib/librte_ring/meson.build +++ b/lib/librte_ring/meson.build @@ -3,6 +3,7 @@ sources = files('rte_ring.c') headers = files('rte_ring.h', + 'rte_ring_core.h', 'rte_ring_elem.h', 'rte_ring_c11_mem.h', 'rte_ring_generic.h') diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index 77e5de099..fa5733907 100644 --- a/lib/librte_ring/rte_ring.c +++ b/lib/librte_ring/rte_ring.c @@ -106,8 +106,10 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count, if (ret < 0 || ret >= (int)sizeof(r->name)) return -ENAMETOOLONG; r->flags = flags; - r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP; - r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC; + r->prod.sync_type = (flags & RING_F_SP_ENQ) ? + RTE_RING_SYNC_ST : RTE_RING_SYNC_MT; + r->cons.sync_type = (flags & RING_F_SC_DEQ) ? + RTE_RING_SYNC_ST : RTE_RING_SYNC_MT; if (flags & RING_F_EXACT_SZ) { r->size = rte_align32pow2(count + 1); diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index 18fc5d845..35ee4491c 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -36,91 +36,7 @@ extern "C" { #endif -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#define RTE_TAILQ_RING_NAME "RTE_RING" - -enum rte_ring_queue_behavior { - RTE_RING_QUEUE_FIXED = 0, /* Enq/Deq a fixed number of items from a ring */ - RTE_RING_QUEUE_VARIABLE /* Enq/Deq as many items as possible from ring */ -}; - -#define RTE_RING_MZ_PREFIX "RG_" -/** The maximum length of a ring name. */ -#define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \ - sizeof(RTE_RING_MZ_PREFIX) + 1) - -/* structure to hold a pair of head/tail values and other metadata */ -struct rte_ring_headtail { - volatile uint32_t head; /**< Prod/consumer head. */ - volatile uint32_t tail; /**< Prod/consumer tail. */ - uint32_t single; /**< True if single prod/cons */ -}; - -/** - * An RTE ring structure. - * - * The producer and the consumer have a head and a tail index. The particularity - * of these index is that they are not between 0 and size(ring). These indexes - * are between 0 and 2^32, and we mask their value when we access the ring[] - * field. Thanks to this assumption, we can do subtractions between 2 index - * values in a modulo-32bit base: that's why the overflow of the indexes is not - * a problem. - */ -struct rte_ring { - /* - * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI - * compatibility requirements, it could be changed to RTE_RING_NAMESIZE - * next time the ABI changes - */ - char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; /**< Name of the ring. */ - int flags; /**< Flags supplied at creation. */ - const struct rte_memzone *memzone; - /**< Memzone, if any, containing the rte_ring */ - uint32_t size; /**< Size of ring. */ - uint32_t mask; /**< Mask (size-1) of ring. */ - uint32_t capacity; /**< Usable size of ring */ - - char pad0 __rte_cache_aligned; /**< empty cache line */ - - /** Ring producer status. */ - struct rte_ring_headtail prod __rte_cache_aligned; - char pad1 __rte_cache_aligned; /**< empty cache line */ - - /** Ring consumer status. */ - struct rte_ring_headtail cons __rte_cache_aligned; - char pad2 __rte_cache_aligned; /**< empty cache line */ -}; - -#define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */ -#define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */ -/** - * Ring is to hold exactly requested number of entries. - * Without this flag set, the ring size requested must be a power of 2, and the - * usable space will be that size - 1. With the flag, the requested size will - * be rounded up to the next power of two, but the usable space will be exactly - * that requested. Worst case, if a power-of-2 size is requested, half the - * ring space will be wasted. - */ -#define RING_F_EXACT_SZ 0x0004 -#define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */ - -/* @internal defines for passing to the enqueue dequeue worker functions */ -#define __IS_SP 1 -#define __IS_MP 0 -#define __IS_SC 1 -#define __IS_MC 0 +#include /** * Calculate the memory size needed for a ring @@ -420,7 +336,7 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - __IS_MP, free_space); + RTE_RING_SYNC_MT, free_space); } /** @@ -443,9 +359,13 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - __IS_SP, free_space); + RTE_RING_SYNC_ST, free_space); } +#ifdef ALLOW_EXPERIMENTAL_API +#include +#endif + /** * Enqueue several objects on a ring. * @@ -470,7 +390,7 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - r->prod.single, free_space); + r->prod.sync_type, free_space); } /** @@ -554,7 +474,7 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - __IS_MC, available); + RTE_RING_SYNC_MT, available); } /** @@ -578,7 +498,7 @@ rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - __IS_SC, available); + RTE_RING_SYNC_ST, available); } /** @@ -605,7 +525,7 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - r->cons.single, available); + r->cons.sync_type, available); } /** @@ -777,6 +697,62 @@ rte_ring_get_capacity(const struct rte_ring *r) return r->capacity; } +/** + * Return sync type used by producer in the ring. + * + * @param r + * A pointer to the ring structure. + * @return + * Producer sync type value. + */ +static inline enum rte_ring_sync_type +rte_ring_get_prod_sync_type(const struct rte_ring *r) +{ + return r->prod.sync_type; +} + +/** + * Check is the ring for single producer. + * + * @param r + * A pointer to the ring structure. + * @return + * true if ring is SP, zero otherwise. + */ +static inline int +rte_ring_is_prod_single(const struct rte_ring *r) +{ + return (rte_ring_get_prod_sync_type(r) == RTE_RING_SYNC_ST); +} + +/** + * Return sync type used by consumer in the ring. + * + * @param r + * A pointer to the ring structure. + * @return + * Consumer sync type value. + */ +static inline enum rte_ring_sync_type +rte_ring_get_cons_sync_type(const struct rte_ring *r) +{ + return r->cons.sync_type; +} + +/** + * Check is the ring for single consumer. + * + * @param r + * A pointer to the ring structure. + * @return + * true if ring is SC, zero otherwise. + */ +static inline int +rte_ring_is_cons_single(const struct rte_ring *r) +{ + return (rte_ring_get_cons_sync_type(r) == RTE_RING_SYNC_ST); +} + /** * Dump the status of all rings on the console * @@ -820,7 +796,7 @@ rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space); + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, free_space); } /** @@ -843,7 +819,7 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space); + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, free_space); } /** @@ -870,7 +846,7 @@ rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE, - r->prod.single, free_space); + r->prod.sync_type, free_space); } /** @@ -898,7 +874,7 @@ rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, __IS_MC, available); + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, available); } /** @@ -923,7 +899,7 @@ rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, __IS_SC, available); + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, available); } /** @@ -951,7 +927,7 @@ rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, { return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE, - r->cons.single, available); + r->cons.sync_type, available); } #ifdef __cplusplus diff --git a/lib/librte_ring/rte_ring_core.h b/lib/librte_ring/rte_ring_core.h new file mode 100644 index 000000000..d9cef763f --- /dev/null +++ b/lib/librte_ring/rte_ring_core.h @@ -0,0 +1,132 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2020 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_CORE_H_ +#define _RTE_RING_CORE_H_ + +/** + * @file + * This file contains definion of RTE ring structure itself, + * init flags and some related macros. + * For majority of DPDK entities, it is not recommended to include + * this file directly, use include or + * instead. + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define RTE_TAILQ_RING_NAME "RTE_RING" + +/** enqueue/dequeue behavior types */ +enum rte_ring_queue_behavior { + /** Enq/Deq a fixed number of items from a ring */ + RTE_RING_QUEUE_FIXED = 0, + /** Enq/Deq as many items as possible from ring */ + RTE_RING_QUEUE_VARIABLE +}; + +#define RTE_RING_MZ_PREFIX "RG_" +/** The maximum length of a ring name. */ +#define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \ + sizeof(RTE_RING_MZ_PREFIX) + 1) + +/** prod/cons sync types */ +enum rte_ring_sync_type { + RTE_RING_SYNC_MT, /**< multi-thread safe (default mode) */ + RTE_RING_SYNC_ST, /**< single thread only */ +}; + +/** + * structures to hold a pair of head/tail values and other metadata. + * Depending on sync_type format of that structure might be different, + * but offset for *sync_type* and *tail* values should remain the same. + */ +struct rte_ring_headtail { + volatile uint32_t head; /**< prod/consumer head. */ + volatile uint32_t tail; /**< prod/consumer tail. */ + RTE_STD_C11 + union { + /** sync type of prod/cons */ + enum rte_ring_sync_type sync_type; + /** deprecated - True if single prod/cons */ + uint32_t single; + }; +}; + +/** + * An RTE ring structure. + * + * The producer and the consumer have a head and a tail index. The particularity + * of these index is that they are not between 0 and size(ring). These indexes + * are between 0 and 2^32, and we mask their value when we access the ring[] + * field. Thanks to this assumption, we can do subtractions between 2 index + * values in a modulo-32bit base: that's why the overflow of the indexes is not + * a problem. + */ +struct rte_ring { + /* + * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI + * compatibility requirements, it could be changed to RTE_RING_NAMESIZE + * next time the ABI changes + */ + char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; + /**< Name of the ring. */ + int flags; /**< Flags supplied at creation. */ + const struct rte_memzone *memzone; + /**< Memzone, if any, containing the rte_ring */ + uint32_t size; /**< Size of ring. */ + uint32_t mask; /**< Mask (size-1) of ring. */ + uint32_t capacity; /**< Usable size of ring */ + + char pad0 __rte_cache_aligned; /**< empty cache line */ + + /** Ring producer status. */ + struct rte_ring_headtail prod __rte_cache_aligned; + char pad1 __rte_cache_aligned; /**< empty cache line */ + + /** Ring consumer status. */ + struct rte_ring_headtail cons __rte_cache_aligned; + char pad2 __rte_cache_aligned; /**< empty cache line */ +}; + +#define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */ +#define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */ +/** + * Ring is to hold exactly requested number of entries. + * Without this flag set, the ring size requested must be a power of 2, and the + * usable space will be that size - 1. With the flag, the requested size will + * be rounded up to the next power of two, but the usable space will be exactly + * that requested. Worst case, if a power-of-2 size is requested, half the + * ring space will be wasted. + */ +#define RING_F_EXACT_SZ 0x0004 +#define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */ + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_RING_CORE_H_ */ diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h index 663addc73..7406c0b0f 100644 --- a/lib/librte_ring/rte_ring_elem.h +++ b/lib/librte_ring/rte_ring_elem.h @@ -20,21 +20,7 @@ extern "C" { #endif -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "rte_ring.h" +#include /** * @warning @@ -510,7 +496,7 @@ rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, __IS_MP, free_space); + RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT, free_space); } /** @@ -539,7 +525,7 @@ rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, __IS_SP, free_space); + RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST, free_space); } /** @@ -570,7 +556,7 @@ rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, r->prod.single, free_space); + RTE_RING_QUEUE_FIXED, r->prod.sync_type, free_space); } /** @@ -675,7 +661,7 @@ rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, __IS_MC, available); + RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT, available); } /** @@ -703,7 +689,7 @@ rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, __IS_SC, available); + RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST, available); } /** @@ -734,7 +720,7 @@ rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, r->cons.single, available); + RTE_RING_QUEUE_FIXED, r->cons.sync_type, available); } /** @@ -842,7 +828,7 @@ rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space); + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, free_space); } /** @@ -871,7 +857,7 @@ rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space); + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, free_space); } /** @@ -902,7 +888,7 @@ rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, r->prod.single, free_space); + RTE_RING_QUEUE_VARIABLE, r->prod.sync_type, free_space); } /** @@ -934,7 +920,7 @@ rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, __IS_MC, available); + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, available); } /** @@ -963,7 +949,7 @@ rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, __IS_SC, available); + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, available); } /** @@ -995,9 +981,11 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table, { return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, RTE_RING_QUEUE_VARIABLE, - r->cons.single, available); + r->cons.sync_type, available); } +#include + #ifdef __cplusplus } #endif From patchwork Mon Apr 20 12:28:24 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Ananyev, Konstantin" X-Patchwork-Id: 68954 X-Patchwork-Delegate: david.marchand@redhat.com Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id C0EDFA0561; Mon, 20 Apr 2020 14:29:13 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 2C8B61D5E8; Mon, 20 Apr 2020 14:28:56 +0200 (CEST) Received: from mga17.intel.com (mga17.intel.com [192.55.52.151]) by dpdk.org (Postfix) with ESMTP id 0C1FD1D5E1 for ; Mon, 20 Apr 2020 14:28:49 +0200 (CEST) IronPort-SDR: 2crEq0quR5gnDOYVJSFC6eDWY33Xnp053YWwYxzjOvWIe9rW2jlpT5tifQ11O0CQM4Oip/0zmm bLZxApTfirQg== X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga003.jf.intel.com ([10.7.209.27]) by fmsmga107.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 20 Apr 2020 05:28:49 -0700 IronPort-SDR: vEq25GnHD9UuMLvyUVvumUnv1/57G+ja4KDX3KRQSsuBwzYzmtMi/ll6BN0JUsxCjtM8mDAAQr ubxOHMA744lg== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.72,406,1580803200"; d="scan'208";a="254923883" Received: from sivswdev08.ir.intel.com ([10.237.217.47]) by orsmga003.jf.intel.com with ESMTP; 20 Apr 2020 05:28:47 -0700 From: Konstantin Ananyev To: dev@dpdk.org Cc: honnappa.nagarahalli@arm.com, david.marchand@redhat.com, jielong.zjl@antfin.com, Konstantin Ananyev Date: Mon, 20 Apr 2020 13:28:24 +0100 Message-Id: <20200420122831.16973-4-konstantin.ananyev@intel.com> X-Mailer: git-send-email 2.18.0 In-Reply-To: <20200420122831.16973-1-konstantin.ananyev@intel.com> References: <20200420121113.9327-1-konstantin.ananyev@intel.com> <20200420122831.16973-1-konstantin.ananyev@intel.com> Subject: [dpdk-dev] [PATCH v7 03/10] ring: introduce RTS ring mode X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Introduce relaxed tail sync (RTS) mode for MT ring synchronization. Aim to reduce stall times in case when ring is used on overcommited cpus (multiple active threads on the same cpu). The main difference from original MP/MC algorithm is that tail value is increased not by every thread that finished enqueue/dequeue, but only by the last one. That allows threads to avoid spinning on ring tail value, leaving actual tail value change to the last thread in the update queue. Signed-off-by: Konstantin Ananyev Acked-by: Honnappa Nagarahalli --- This patch depends on following patch: "meson: add libatomic as a global dependency for i686 clang" (http://patches.dpdk.org/patch/68876/) check-abi.sh reports what I believe is a false-positive about ring cons/prod changes. As a workaround, devtools/libabigail.abignore is updated to suppress *struct ring* related errors. devtools/libabigail.abignore | 7 + doc/guides/rel_notes/release_20_05.rst | 7 + lib/librte_ring/Makefile | 4 +- lib/librte_ring/meson.build | 7 +- lib/librte_ring/rte_ring.c | 100 +++++- lib/librte_ring/rte_ring.h | 118 +++++-- lib/librte_ring/rte_ring_core.h | 36 +- lib/librte_ring/rte_ring_elem.h | 114 ++++++- lib/librte_ring/rte_ring_rts.h | 439 +++++++++++++++++++++++++ lib/librte_ring/rte_ring_rts_c11_mem.h | 179 ++++++++++ 10 files changed, 963 insertions(+), 48 deletions(-) create mode 100644 lib/librte_ring/rte_ring_rts.h create mode 100644 lib/librte_ring/rte_ring_rts_c11_mem.h diff --git a/devtools/libabigail.abignore b/devtools/libabigail.abignore index a59df8f13..cd86d89ca 100644 --- a/devtools/libabigail.abignore +++ b/devtools/libabigail.abignore @@ -11,3 +11,10 @@ type_kind = enum name = rte_crypto_asym_xform_type changed_enumerators = RTE_CRYPTO_ASYM_XFORM_TYPE_LIST_END +; Ignore updates of ring prod/cons +[suppress_type] + type_kind = struct + name = rte_ring +[suppress_type] + type_kind = struct + name = rte_event_ring diff --git a/doc/guides/rel_notes/release_20_05.rst b/doc/guides/rel_notes/release_20_05.rst index 184967844..eedf960d0 100644 --- a/doc/guides/rel_notes/release_20_05.rst +++ b/doc/guides/rel_notes/release_20_05.rst @@ -81,6 +81,13 @@ New Features by making use of the event device capabilities. The event mode currently supports only inline IPsec protocol offload. +* **New synchronization modes for rte_ring.** + + Introduced new optional MT synchronization mode for rte_ring: + Relaxed Tail Sync (RTS). With this mode selected, rte_ring shows + significant improvements for average enqueue/dequeue times on + overcommitted systems. + Removed Items ------------- diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index 6572768c9..04e446e37 100644 --- a/lib/librte_ring/Makefile +++ b/lib/librte_ring/Makefile @@ -19,6 +19,8 @@ SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ rte_ring_core.h \ rte_ring_elem.h \ rte_ring_generic.h \ - rte_ring_c11_mem.h + rte_ring_c11_mem.h \ + rte_ring_rts.h \ + rte_ring_rts_c11_mem.h include $(RTE_SDK)/mk/rte.lib.mk diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index c656781da..a95598032 100644 --- a/lib/librte_ring/meson.build +++ b/lib/librte_ring/meson.build @@ -6,4 +6,9 @@ headers = files('rte_ring.h', 'rte_ring_core.h', 'rte_ring_elem.h', 'rte_ring_c11_mem.h', - 'rte_ring_generic.h') + 'rte_ring_generic.h', + 'rte_ring_rts.h', + 'rte_ring_rts_c11_mem.h') + +# rte_ring_create_elem and rte_ring_get_memsize_elem are experimental +allow_experimental_apis = true diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index fa5733907..222eec0fb 100644 --- a/lib/librte_ring/rte_ring.c +++ b/lib/librte_ring/rte_ring.c @@ -45,6 +45,9 @@ EAL_REGISTER_TAILQ(rte_ring_tailq) /* true if x is a power of 2 */ #define POWEROF2(x) ((((x)-1) & (x)) == 0) +/* by default set head/tail distance as 1/8 of ring capacity */ +#define HTD_MAX_DEF 8 + /* return the size of memory occupied by a ring */ ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count) @@ -79,11 +82,84 @@ rte_ring_get_memsize(unsigned int count) return rte_ring_get_memsize_elem(sizeof(void *), count); } +/* + * internal helper function to reset prod/cons head-tail values. + */ +static void +reset_headtail(void *p) +{ + struct rte_ring_headtail *ht; + struct rte_ring_rts_headtail *ht_rts; + + ht = p; + ht_rts = p; + + switch (ht->sync_type) { + case RTE_RING_SYNC_MT: + case RTE_RING_SYNC_ST: + ht->head = 0; + ht->tail = 0; + break; + case RTE_RING_SYNC_MT_RTS: + ht_rts->head.raw = 0; + ht_rts->tail.raw = 0; + break; + default: + /* unknown sync mode */ + RTE_ASSERT(0); + } +} + void rte_ring_reset(struct rte_ring *r) { - r->prod.head = r->cons.head = 0; - r->prod.tail = r->cons.tail = 0; + reset_headtail(&r->prod); + reset_headtail(&r->cons); +} + +/* + * helper function, calculates sync_type values for prod and cons + * based on input flags. Returns zero at success or negative + * errno value otherwise. + */ +static int +get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st, + enum rte_ring_sync_type *cons_st) +{ + static const uint32_t prod_st_flags = + (RING_F_SP_ENQ | RING_F_MP_RTS_ENQ); + static const uint32_t cons_st_flags = + (RING_F_SC_DEQ | RING_F_MC_RTS_DEQ); + + switch (flags & prod_st_flags) { + case 0: + *prod_st = RTE_RING_SYNC_MT; + break; + case RING_F_SP_ENQ: + *prod_st = RTE_RING_SYNC_ST; + break; + case RING_F_MP_RTS_ENQ: + *prod_st = RTE_RING_SYNC_MT_RTS; + break; + default: + return -EINVAL; + } + + switch (flags & cons_st_flags) { + case 0: + *cons_st = RTE_RING_SYNC_MT; + break; + case RING_F_SC_DEQ: + *cons_st = RTE_RING_SYNC_ST; + break; + case RING_F_MC_RTS_DEQ: + *cons_st = RTE_RING_SYNC_MT_RTS; + break; + default: + return -EINVAL; + } + + return 0; } int @@ -100,16 +176,20 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count, RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) & RTE_CACHE_LINE_MASK) != 0); + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) != + offsetof(struct rte_ring_rts_headtail, sync_type)); + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) != + offsetof(struct rte_ring_rts_headtail, tail.val.pos)); + /* init the ring structure */ memset(r, 0, sizeof(*r)); ret = strlcpy(r->name, name, sizeof(r->name)); if (ret < 0 || ret >= (int)sizeof(r->name)) return -ENAMETOOLONG; r->flags = flags; - r->prod.sync_type = (flags & RING_F_SP_ENQ) ? - RTE_RING_SYNC_ST : RTE_RING_SYNC_MT; - r->cons.sync_type = (flags & RING_F_SC_DEQ) ? - RTE_RING_SYNC_ST : RTE_RING_SYNC_MT; + ret = get_sync_type(flags, &r->prod.sync_type, &r->cons.sync_type); + if (ret != 0) + return ret; if (flags & RING_F_EXACT_SZ) { r->size = rte_align32pow2(count + 1); @@ -126,8 +206,12 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count, r->mask = count - 1; r->capacity = r->mask; } - r->prod.head = r->cons.head = 0; - r->prod.tail = r->cons.tail = 0; + + /* set default values for head-tail distance */ + if (flags & RING_F_MP_RTS_ENQ) + rte_ring_set_prod_htd_max(r, r->capacity / HTD_MAX_DEF); + if (flags & RING_F_MC_RTS_DEQ) + rte_ring_set_cons_htd_max(r, r->capacity / HTD_MAX_DEF); return 0; } diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index 35ee4491c..c42e1cfc4 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -1,6 +1,6 @@ /* SPDX-License-Identifier: BSD-3-Clause * - * Copyright (c) 2010-2017 Intel Corporation + * Copyright (c) 2010-2020 Intel Corporation * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org * All rights reserved. * Derived from FreeBSD's bufring.h @@ -79,12 +79,24 @@ ssize_t rte_ring_get_memsize(unsigned count); * The number of elements in the ring (must be a power of 2). * @param flags * An OR of the following: - * - RING_F_SP_ENQ: If this flag is set, the default behavior when - * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` - * is "single-producer". Otherwise, it is "multi-producers". - * - RING_F_SC_DEQ: If this flag is set, the default behavior when - * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` - * is "single-consumer". Otherwise, it is "multi-consumers". + * - One of mutually exclusive flags that define producer behavior: + * - RING_F_SP_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "single-producer". + * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer RTS mode". + * If none of these flags is set, then default "multi-producer" + * behavior is selected. + * - One of mutually exclusive flags that define consumer behavior: + * - RING_F_SC_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "single-consumer". Otherwise, it is "multi-consumers". + * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer RTS mode". + * If none of these flags is set, then default "multi-consumer" + * behavior is selected. * @return * 0 on success, or a negative value on error. */ @@ -114,12 +126,24 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned count, * constraint for the reserved zone. * @param flags * An OR of the following: - * - RING_F_SP_ENQ: If this flag is set, the default behavior when - * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` - * is "single-producer". Otherwise, it is "multi-producers". - * - RING_F_SC_DEQ: If this flag is set, the default behavior when - * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` - * is "single-consumer". Otherwise, it is "multi-consumers". + * - One of mutually exclusive flags that define producer behavior: + * - RING_F_SP_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "single-producer". + * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer RTS mode". + * If none of these flags is set, then default "multi-producer" + * behavior is selected. + * - One of mutually exclusive flags that define consumer behavior: + * - RING_F_SC_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "single-consumer". Otherwise, it is "multi-consumers". + * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer RTS mode". + * If none of these flags is set, then default "multi-consumer" + * behavior is selected. * @return * On success, the pointer to the new allocated ring. NULL on error with * rte_errno set appropriately. Possible errno values include: @@ -389,8 +413,21 @@ static __rte_always_inline unsigned int rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - r->prod.sync_type, free_space); + switch (r->prod.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mp_enqueue_bulk(r, obj_table, n, free_space); + case RTE_RING_SYNC_ST: + return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mp_rts_enqueue_bulk(r, obj_table, n, + free_space); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + return 0; } /** @@ -524,8 +561,20 @@ static __rte_always_inline unsigned int rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - r->cons.sync_type, available); + switch (r->cons.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mc_dequeue_bulk(r, obj_table, n, available); + case RTE_RING_SYNC_ST: + return rte_ring_sc_dequeue_bulk(r, obj_table, n, available); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mc_rts_dequeue_bulk(r, obj_table, n, available); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + return 0; } /** @@ -845,8 +894,21 @@ static __rte_always_inline unsigned rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE, - r->prod.sync_type, free_space); + switch (r->prod.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mp_enqueue_burst(r, obj_table, n, free_space); + case RTE_RING_SYNC_ST: + return rte_ring_sp_enqueue_burst(r, obj_table, n, free_space); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mp_rts_enqueue_burst(r, obj_table, n, + free_space); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + return 0; } /** @@ -925,9 +987,21 @@ static __rte_always_inline unsigned rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, - r->cons.sync_type, available); + switch (r->cons.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mc_dequeue_burst(r, obj_table, n, available); + case RTE_RING_SYNC_ST: + return rte_ring_sc_dequeue_burst(r, obj_table, n, available); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mc_rts_dequeue_burst(r, obj_table, n, + available); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + return 0; } #ifdef __cplusplus diff --git a/lib/librte_ring/rte_ring_core.h b/lib/librte_ring/rte_ring_core.h index d9cef763f..bd21fa535 100644 --- a/lib/librte_ring/rte_ring_core.h +++ b/lib/librte_ring/rte_ring_core.h @@ -57,6 +57,9 @@ enum rte_ring_queue_behavior { enum rte_ring_sync_type { RTE_RING_SYNC_MT, /**< multi-thread safe (default mode) */ RTE_RING_SYNC_ST, /**< single thread only */ +#ifdef ALLOW_EXPERIMENTAL_API + RTE_RING_SYNC_MT_RTS, /**< multi-thread relaxed tail sync */ +#endif }; /** @@ -76,6 +79,22 @@ struct rte_ring_headtail { }; }; +union __rte_ring_rts_poscnt { + /** raw 8B value to read/write *cnt* and *pos* as one atomic op */ + uint64_t raw __rte_aligned(8); + struct { + uint32_t cnt; /**< head/tail reference counter */ + uint32_t pos; /**< head/tail position */ + } val; +}; + +struct rte_ring_rts_headtail { + volatile union __rte_ring_rts_poscnt tail; + enum rte_ring_sync_type sync_type; /**< sync type of prod/cons */ + uint32_t htd_max; /**< max allowed distance between head/tail */ + volatile union __rte_ring_rts_poscnt head; +}; + /** * An RTE ring structure. * @@ -104,11 +123,21 @@ struct rte_ring { char pad0 __rte_cache_aligned; /**< empty cache line */ /** Ring producer status. */ - struct rte_ring_headtail prod __rte_cache_aligned; + RTE_STD_C11 + union { + struct rte_ring_headtail prod; + struct rte_ring_rts_headtail rts_prod; + } __rte_cache_aligned; + char pad1 __rte_cache_aligned; /**< empty cache line */ /** Ring consumer status. */ - struct rte_ring_headtail cons __rte_cache_aligned; + RTE_STD_C11 + union { + struct rte_ring_headtail cons; + struct rte_ring_rts_headtail rts_cons; + } __rte_cache_aligned; + char pad2 __rte_cache_aligned; /**< empty cache line */ }; @@ -125,6 +154,9 @@ struct rte_ring { #define RING_F_EXACT_SZ 0x0004 #define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */ +#define RING_F_MP_RTS_ENQ 0x0008 /**< The default enqueue is "MP RTS". */ +#define RING_F_MC_RTS_DEQ 0x0010 /**< The default dequeue is "MC RTS". */ + #ifdef __cplusplus } #endif diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h index 7406c0b0f..4030753b6 100644 --- a/lib/librte_ring/rte_ring_elem.h +++ b/lib/librte_ring/rte_ring_elem.h @@ -74,12 +74,24 @@ ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count); * constraint for the reserved zone. * @param flags * An OR of the following: - * - RING_F_SP_ENQ: If this flag is set, the default behavior when - * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` - * is "single-producer". Otherwise, it is "multi-producers". - * - RING_F_SC_DEQ: If this flag is set, the default behavior when - * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` - * is "single-consumer". Otherwise, it is "multi-consumers". + * - One of mutually exclusive flags that define producer behavior: + * - RING_F_SP_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "single-producer". + * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer RTS mode". + * If none of these flags is set, then default "multi-producer" + * behavior is selected. + * - One of mutually exclusive flags that define consumer behavior: + * - RING_F_SC_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "single-consumer". Otherwise, it is "multi-consumers". + * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer RTS mode". + * If none of these flags is set, then default "multi-consumer" + * behavior is selected. * @return * On success, the pointer to the new allocated ring. NULL on error with * rte_errno set appropriately. Possible errno values include: @@ -528,6 +540,10 @@ rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST, free_space); } +#ifdef ALLOW_EXPERIMENTAL_API +#include +#endif + /** * Enqueue several objects on a ring. * @@ -557,6 +573,26 @@ rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, { return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, RTE_RING_QUEUE_FIXED, r->prod.sync_type, free_space); + + switch (r->prod.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mp_enqueue_bulk_elem(r, obj_table, esize, n, + free_space); + case RTE_RING_SYNC_ST: + return rte_ring_sp_enqueue_bulk_elem(r, obj_table, esize, n, + free_space); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table, esize, n, + free_space); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + if (free_space != NULL) + *free_space = 0; + return 0; } /** @@ -661,7 +697,7 @@ rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT, available); + RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT, available); } /** @@ -719,8 +755,25 @@ static __rte_always_inline unsigned int rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, r->cons.sync_type, available); + switch (r->cons.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mc_dequeue_bulk_elem(r, obj_table, esize, n, + available); + case RTE_RING_SYNC_ST: + return rte_ring_sc_dequeue_bulk_elem(r, obj_table, esize, n, + available); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table, esize, + n, available); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + if (available != NULL) + *available = 0; + return 0; } /** @@ -887,8 +940,25 @@ static __rte_always_inline unsigned rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, r->prod.sync_type, free_space); + switch (r->prod.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mp_enqueue_burst_elem(r, obj_table, esize, n, + free_space); + case RTE_RING_SYNC_ST: + return rte_ring_sp_enqueue_burst_elem(r, obj_table, esize, n, + free_space); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table, esize, + n, free_space); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + if (free_space != NULL) + *free_space = 0; + return 0; } /** @@ -979,9 +1049,25 @@ static __rte_always_inline unsigned int rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, - r->cons.sync_type, available); + switch (r->cons.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mc_dequeue_burst_elem(r, obj_table, esize, n, + available); + case RTE_RING_SYNC_ST: + return rte_ring_sc_dequeue_burst_elem(r, obj_table, esize, n, + available); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table, esize, + n, available); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + if (available != NULL) + *available = 0; + return 0; } #include diff --git a/lib/librte_ring/rte_ring_rts.h b/lib/librte_ring/rte_ring_rts.h new file mode 100644 index 000000000..8ced07096 --- /dev/null +++ b/lib/librte_ring/rte_ring_rts.h @@ -0,0 +1,439 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2020 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_RTS_H_ +#define _RTE_RING_RTS_H_ + +/** + * @file rte_ring_rts.h + * @b EXPERIMENTAL: this API may change without prior notice + * It is not recommended to include this file directly. + * Please include instead. + * + * Contains functions for Relaxed Tail Sync (RTS) ring mode. + * The main idea remains the same as for our original MP/MC synchronization + * mechanism. + * The main difference is that tail value is increased not + * by every thread that finished enqueue/dequeue, + * but only by the current last one doing enqueue/dequeue. + * That allows threads to skip spinning on tail value, + * leaving actual tail value change to last thread at a given instance. + * RTS requires 2 64-bit CAS for each enqueue(/dequeue) operation: + * one for head update, second for tail update. + * As a gain it allows thread to avoid spinning/waiting on tail value. + * In comparision original MP/MC algorithm requires one 32-bit CAS + * for head update and waiting/spinning on tail value. + * + * Brief outline: + * - introduce update counter (cnt) for both head and tail. + * - increment head.cnt for each head.value update + * - write head.value and head.cnt atomically (64-bit CAS) + * - move tail.value ahead only when tail.cnt + 1 == head.cnt + * (indicating that this is the last thread updating the tail) + * - increment tail.cnt when each enqueue/dequeue op finishes + * (no matter if tail.value going to change or not) + * - write tail.value and tail.cnt atomically (64-bit CAS) + * + * To avoid producer/consumer starvation: + * - limit max allowed distance between head and tail value (HTD_MAX). + * I.E. thread is allowed to proceed with changing head.value, + * only when: head.value - tail.value <= HTD_MAX + * HTD_MAX is an optional parameter. + * With HTD_MAX == 0 we'll have fully serialized ring - + * i.e. only one thread at a time will be able to enqueue/dequeue + * to/from the ring. + * With HTD_MAX >= ring.capacity - no limitation. + * By default HTD_MAX == ring.capacity / 8. + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +/** + * @internal Enqueue several objects on the RTS ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_rts_enqueue_elem(struct rte_ring *r, const void *obj_table, + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *free_space) +{ + uint32_t free, head; + + n = __rte_ring_rts_move_prod_head(r, n, behavior, &head, &free); + + if (n != 0) { + __rte_ring_enqueue_elems(r, head, obj_table, esize, n); + __rte_ring_rts_update_tail(&r->rts_prod); + } + + if (free_space != NULL) + *free_space = free - n; + return n; +} + +/** + * @internal Dequeue several objects from the RTS ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_rts_dequeue_elem(struct rte_ring *r, void *obj_table, + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *available) +{ + uint32_t entries, head; + + n = __rte_ring_rts_move_cons_head(r, n, behavior, &head, &entries); + + if (n != 0) { + __rte_ring_dequeue_elems(r, head, obj_table, esize, n); + __rte_ring_rts_update_tail(&r->rts_cons); + } + + if (available != NULL) + *available = entries - n; + return n; +} + +/** + * Enqueue several objects on the RTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mp_rts_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, free_space); +} + +/** + * Dequeue several objects from an RTS ring (multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mc_rts_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, available); +} + +/** + * Enqueue several objects on the RTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mp_rts_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, free_space); +} + +/** + * Dequeue several objects from an RTS ring (multi-consumers safe). + * When the requested objects are more than the available objects, + * only dequeue the actual number of objects. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mc_rts_dequeue_burst_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, available); +} + +/** + * Enqueue several objects on the RTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mp_rts_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table, + sizeof(uintptr_t), n, free_space); +} + +/** + * Dequeue several objects from an RTS ring (multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mc_rts_dequeue_bulk(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table, + sizeof(uintptr_t), n, available); +} + +/** + * Enqueue several objects on the RTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mp_rts_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table, + sizeof(uintptr_t), n, free_space); +} + +/** + * Dequeue several objects from an RTS ring (multi-consumers safe). + * When the requested objects are more than the available objects, + * only dequeue the actual number of objects. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mc_rts_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table, + sizeof(uintptr_t), n, available); +} + +/** + * Return producer max Head-Tail-Distance (HTD). + * + * @param r + * A pointer to the ring structure. + * @return + * Producer HTD value, if producer is set in appropriate sync mode, + * or UINT32_MAX otherwise. + */ +__rte_experimental +static inline uint32_t +rte_ring_get_prod_htd_max(const struct rte_ring *r) +{ + if (r->prod.sync_type == RTE_RING_SYNC_MT_RTS) + return r->rts_prod.htd_max; + return UINT32_MAX; +} + +/** + * Set producer max Head-Tail-Distance (HTD). + * Note that producer has to use appropriate sync mode (RTS). + * + * @param r + * A pointer to the ring structure. + * @param v + * new HTD value to setup. + * @return + * Zero on success, or negative error code otherwise. + */ +__rte_experimental +static inline int +rte_ring_set_prod_htd_max(struct rte_ring *r, uint32_t v) +{ + if (r->prod.sync_type != RTE_RING_SYNC_MT_RTS) + return -ENOTSUP; + + r->rts_prod.htd_max = v; + return 0; +} + +/** + * Return consumer max Head-Tail-Distance (HTD). + * + * @param r + * A pointer to the ring structure. + * @return + * Consumer HTD value, if consumer is set in appropriate sync mode, + * or UINT32_MAX otherwise. + */ +__rte_experimental +static inline uint32_t +rte_ring_get_cons_htd_max(const struct rte_ring *r) +{ + if (r->cons.sync_type == RTE_RING_SYNC_MT_RTS) + return r->rts_cons.htd_max; + return UINT32_MAX; +} + +/** + * Set consumer max Head-Tail-Distance (HTD). + * Note that consumer has to use appropriate sync mode (RTS). + * + * @param r + * A pointer to the ring structure. + * @param v + * new HTD value to setup. + * @return + * Zero on success, or negative error code otherwise. + */ +__rte_experimental +static inline int +rte_ring_set_cons_htd_max(struct rte_ring *r, uint32_t v) +{ + if (r->cons.sync_type != RTE_RING_SYNC_MT_RTS) + return -ENOTSUP; + + r->rts_cons.htd_max = v; + return 0; +} + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_RING_RTS_H_ */ diff --git a/lib/librte_ring/rte_ring_rts_c11_mem.h b/lib/librte_ring/rte_ring_rts_c11_mem.h new file mode 100644 index 000000000..327f22796 --- /dev/null +++ b/lib/librte_ring/rte_ring_rts_c11_mem.h @@ -0,0 +1,179 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2020 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_RTS_C11_MEM_H_ +#define _RTE_RING_RTS_C11_MEM_H_ + +/** + * @file rte_ring_rts_c11_mem.h + * It is not recommended to include this file directly, + * include instead. + * Contains internal helper functions for Relaxed Tail Sync (RTS) ring mode. + * For more information please refer to . + */ + +/** + * @internal This function updates tail values. + */ +static __rte_always_inline void +__rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht) +{ + union __rte_ring_rts_poscnt h, ot, nt; + + /* + * If there are other enqueues/dequeues in progress that + * might preceded us, then don't update tail with new value. + */ + + ot.raw = __atomic_load_n(&ht->tail.raw, __ATOMIC_ACQUIRE); + + do { + /* on 32-bit systems we have to do atomic read here */ + h.raw = __atomic_load_n(&ht->head.raw, __ATOMIC_RELAXED); + + nt.raw = ot.raw; + if (++nt.val.cnt == h.val.cnt) + nt.val.pos = h.val.pos; + + } while (__atomic_compare_exchange_n(&ht->tail.raw, &ot.raw, nt.raw, + 0, __ATOMIC_RELEASE, __ATOMIC_ACQUIRE) == 0); +} + +/** + * @internal This function waits till head/tail distance wouldn't + * exceed pre-defined max value. + */ +static __rte_always_inline void +__rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht, + union __rte_ring_rts_poscnt *h) +{ + uint32_t max; + + max = ht->htd_max; + + while (h->val.pos - ht->tail.val.pos > max) { + rte_pause(); + h->raw = __atomic_load_n(&ht->head.raw, __ATOMIC_ACQUIRE); + } +} + +/** + * @internal This function updates the producer head for enqueue. + */ +static __rte_always_inline uint32_t +__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num, + enum rte_ring_queue_behavior behavior, uint32_t *old_head, + uint32_t *free_entries) +{ + uint32_t n; + union __rte_ring_rts_poscnt nh, oh; + + const uint32_t capacity = r->capacity; + + oh.raw = __atomic_load_n(&r->rts_prod.head.raw, __ATOMIC_ACQUIRE); + + do { + /* Reset n to the initial burst count */ + n = num; + + /* + * wait for prod head/tail distance, + * make sure that we read prod head *before* + * reading cons tail. + */ + __rte_ring_rts_head_wait(&r->rts_prod, &oh); + + /* + * The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * *old_head > cons_tail). So 'free_entries' is always between 0 + * and capacity (which is < size). + */ + *free_entries = capacity + r->cons.tail - oh.val.pos; + + /* check that we have enough room in ring */ + if (unlikely(n > *free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : *free_entries; + + if (n == 0) + break; + + nh.val.pos = oh.val.pos + n; + nh.val.cnt = oh.val.cnt + 1; + + /* + * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent: + * - OOO reads of cons tail value + * - OOO copy of elems to the ring + */ + } while (__atomic_compare_exchange_n(&r->rts_prod.head.raw, + &oh.raw, nh.raw, + 0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0); + + *old_head = oh.val.pos; + return n; +} + +/** + * @internal This function updates the consumer head for dequeue + */ +static __rte_always_inline unsigned int +__rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num, + enum rte_ring_queue_behavior behavior, uint32_t *old_head, + uint32_t *entries) +{ + uint32_t n; + union __rte_ring_rts_poscnt nh, oh; + + oh.raw = __atomic_load_n(&r->rts_cons.head.raw, __ATOMIC_ACQUIRE); + + /* move cons.head atomically */ + do { + /* Restore n as it may change every loop */ + n = num; + + /* + * wait for cons head/tail distance, + * make sure that we read cons head *before* + * reading prod tail. + */ + __rte_ring_rts_head_wait(&r->rts_cons, &oh); + + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. + */ + *entries = r->prod.tail - oh.val.pos; + + /* Set the actual entries for dequeue */ + if (n > *entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (unlikely(n == 0)) + break; + + nh.val.pos = oh.val.pos + n; + nh.val.cnt = oh.val.cnt + 1; + + /* + * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent: + * - OOO reads of prod tail value + * - OOO copy of elems from the ring + */ + } while (__atomic_compare_exchange_n(&r->rts_cons.head.raw, + &oh.raw, nh.raw, + 0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0); + + *old_head = oh.val.pos; + return n; +} + +#endif /* _RTE_RING_RTS_C11_MEM_H_ */ From patchwork Mon Apr 20 12:28:25 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Ananyev, Konstantin" X-Patchwork-Id: 68955 X-Patchwork-Delegate: david.marchand@redhat.com Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id 068E9A0561; Mon, 20 Apr 2020 14:29:31 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 8B25B1D5F2; Mon, 20 Apr 2020 14:28:57 +0200 (CEST) Received: from mga17.intel.com (mga17.intel.com [192.55.52.151]) by dpdk.org (Postfix) with ESMTP id 3D0731D5A1 for ; Mon, 20 Apr 2020 14:28:52 +0200 (CEST) IronPort-SDR: Jp3BVSfHBbhRk4z5tE+8BDhB8v5vUpi5GCNYOqmHXDFzXQEyjue8FFRpOivfjAJzUGMas/Q7+G rpV/ahKzBnng== X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga003.jf.intel.com ([10.7.209.27]) by fmsmga107.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 20 Apr 2020 05:28:51 -0700 IronPort-SDR: GyrJA6T3n0RX+YLYt+vMu7myF9pwkpScGcKGZYfdWecWdqYAR+ctG4lsqtVj2L7AZjb7TkgRax wiiOxHuMQ2VQ== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.72,406,1580803200"; d="scan'208";a="254923889" Received: from sivswdev08.ir.intel.com ([10.237.217.47]) by orsmga003.jf.intel.com with ESMTP; 20 Apr 2020 05:28:50 -0700 From: Konstantin Ananyev To: dev@dpdk.org Cc: honnappa.nagarahalli@arm.com, david.marchand@redhat.com, jielong.zjl@antfin.com, Konstantin Ananyev Date: Mon, 20 Apr 2020 13:28:25 +0100 Message-Id: <20200420122831.16973-5-konstantin.ananyev@intel.com> X-Mailer: git-send-email 2.18.0 In-Reply-To: <20200420122831.16973-1-konstantin.ananyev@intel.com> References: <20200420121113.9327-1-konstantin.ananyev@intel.com> <20200420122831.16973-1-konstantin.ananyev@intel.com> Subject: [dpdk-dev] [PATCH v7 04/10] test/ring: add contention stress test for RTS ring X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Introduce new test case to test RTS ring mode under contention. Signed-off-by: Konstantin Ananyev Acked-by: Honnappa Nagarahalli --- app/test/Makefile | 1 + app/test/meson.build | 1 + app/test/test_ring_rts_stress.c | 32 ++++++++++++++++++++++++++++++++ app/test/test_ring_stress.c | 3 +++ app/test/test_ring_stress.h | 1 + 5 files changed, 38 insertions(+) create mode 100644 app/test/test_ring_rts_stress.c diff --git a/app/test/Makefile b/app/test/Makefile index a23a011df..00b74b5c9 100644 --- a/app/test/Makefile +++ b/app/test/Makefile @@ -79,6 +79,7 @@ SRCS-y += test_rand_perf.c SRCS-y += test_ring.c SRCS-y += test_ring_mpmc_stress.c SRCS-y += test_ring_perf.c +SRCS-y += test_ring_rts_stress.c SRCS-y += test_ring_stress.c SRCS-y += test_pmd_perf.c diff --git a/app/test/meson.build b/app/test/meson.build index 8824f366c..97ad822c1 100644 --- a/app/test/meson.build +++ b/app/test/meson.build @@ -102,6 +102,7 @@ test_sources = files('commands.c', 'test_ring.c', 'test_ring_mpmc_stress.c', 'test_ring_perf.c', + 'test_ring_rts_stress.c', 'test_ring_stress.c', 'test_rwlock.c', 'test_sched.c', diff --git a/app/test/test_ring_rts_stress.c b/app/test/test_ring_rts_stress.c new file mode 100644 index 000000000..f5255f24c --- /dev/null +++ b/app/test/test_ring_rts_stress.c @@ -0,0 +1,32 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2020 Intel Corporation + */ + +#include "test_ring_stress_impl.h" + +static inline uint32_t +_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n, + uint32_t *avail) +{ + return rte_ring_mc_rts_dequeue_bulk(r, obj, n, avail); +} + +static inline uint32_t +_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n, + uint32_t *free) +{ + return rte_ring_mp_rts_enqueue_bulk(r, obj, n, free); +} + +static int +_st_ring_init(struct rte_ring *r, const char *name, uint32_t num) +{ + return rte_ring_init(r, name, num, + RING_F_MP_RTS_ENQ | RING_F_MC_RTS_DEQ); +} + +const struct test test_ring_rts_stress = { + .name = "MT_RTS", + .nb_case = RTE_DIM(tests), + .cases = tests, +}; diff --git a/app/test/test_ring_stress.c b/app/test/test_ring_stress.c index 60706f799..eab395e30 100644 --- a/app/test/test_ring_stress.c +++ b/app/test/test_ring_stress.c @@ -40,6 +40,9 @@ test_ring_stress(void) n += test_ring_mpmc_stress.nb_case; k += run_test(&test_ring_mpmc_stress); + n += test_ring_rts_stress.nb_case; + k += run_test(&test_ring_rts_stress); + printf("Number of tests:\t%u\nSuccess:\t%u\nFailed:\t%u\n", n, k, n - k); return (k != n); diff --git a/app/test/test_ring_stress.h b/app/test/test_ring_stress.h index 60eac6216..32aae2072 100644 --- a/app/test/test_ring_stress.h +++ b/app/test/test_ring_stress.h @@ -33,3 +33,4 @@ struct test { }; extern const struct test test_ring_mpmc_stress; +extern const struct test test_ring_rts_stress; From patchwork Mon Apr 20 12:28:26 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Ananyev, Konstantin" X-Patchwork-Id: 68956 X-Patchwork-Delegate: david.marchand@redhat.com Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id 8A7A3A0561; Mon, 20 Apr 2020 14:29:48 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 5C72E1D5F1; Mon, 20 Apr 2020 14:29:10 +0200 (CEST) Received: from mga17.intel.com (mga17.intel.com [192.55.52.151]) by dpdk.org (Postfix) with ESMTP id AE06E1D5E4 for ; Mon, 20 Apr 2020 14:28:55 +0200 (CEST) IronPort-SDR: OXH+ofId6wW9vwhEDoIxDC3/CpQmgYcxeWirnGr/4johPbZ/5/O8GNSKkfSAaPBxbjifOTBDjG yDXe2TwaXIyQ== X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga003.jf.intel.com ([10.7.209.27]) by fmsmga107.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 20 Apr 2020 05:28:54 -0700 IronPort-SDR: itW7w6iIieo7Xs/P7mEkBguF5Jphnqy5ml+Z42OAsVrd9UvXXcEmSZk6NlNeXzE2SzgwZuW8vB LtPqZK7ruygw== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.72,406,1580803200"; d="scan'208";a="254923901" Received: from sivswdev08.ir.intel.com ([10.237.217.47]) by orsmga003.jf.intel.com with ESMTP; 20 Apr 2020 05:28:52 -0700 From: Konstantin Ananyev To: dev@dpdk.org Cc: honnappa.nagarahalli@arm.com, david.marchand@redhat.com, jielong.zjl@antfin.com, Konstantin Ananyev Date: Mon, 20 Apr 2020 13:28:26 +0100 Message-Id: <20200420122831.16973-6-konstantin.ananyev@intel.com> X-Mailer: git-send-email 2.18.0 In-Reply-To: <20200420122831.16973-1-konstantin.ananyev@intel.com> References: <20200420121113.9327-1-konstantin.ananyev@intel.com> <20200420122831.16973-1-konstantin.ananyev@intel.com> Subject: [dpdk-dev] [PATCH v7 05/10] ring: introduce HTS ring mode X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Introduce head/tail sync mode for MT ring synchronization. In that mode enqueue/dequeue operation is fully serialized: only one thread at a time is allowed to perform given op. Suppose to reduce stall times in case when ring is used on overcommitted cpus (multiple active threads on the same cpu). Signed-off-by: Konstantin Ananyev Acked-by: Honnappa Nagarahalli --- doc/guides/rel_notes/release_20_05.rst | 8 +- lib/librte_ring/Makefile | 2 + lib/librte_ring/meson.build | 2 + lib/librte_ring/rte_ring.c | 20 +- lib/librte_ring/rte_ring.h | 23 ++ lib/librte_ring/rte_ring_core.h | 20 ++ lib/librte_ring/rte_ring_elem.h | 19 ++ lib/librte_ring/rte_ring_hts.h | 332 +++++++++++++++++++++++++ lib/librte_ring/rte_ring_hts_c11_mem.h | 164 ++++++++++++ 9 files changed, 584 insertions(+), 6 deletions(-) create mode 100644 lib/librte_ring/rte_ring_hts.h create mode 100644 lib/librte_ring/rte_ring_hts_c11_mem.h diff --git a/doc/guides/rel_notes/release_20_05.rst b/doc/guides/rel_notes/release_20_05.rst index eedf960d0..db8a281db 100644 --- a/doc/guides/rel_notes/release_20_05.rst +++ b/doc/guides/rel_notes/release_20_05.rst @@ -83,10 +83,10 @@ New Features * **New synchronization modes for rte_ring.** - Introduced new optional MT synchronization mode for rte_ring: - Relaxed Tail Sync (RTS). With this mode selected, rte_ring shows - significant improvements for average enqueue/dequeue times on - overcommitted systems. + Introduced new optional MT synchronization modes for rte_ring: + Relaxed Tail Sync (RTS) mode and Head/Tail Sync (HTS) mode. + With these mode selected, rte_ring shows significant improvements for + average enqueue/dequeue times on overcommitted systems. Removed Items diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index 04e446e37..f75d8e530 100644 --- a/lib/librte_ring/Makefile +++ b/lib/librte_ring/Makefile @@ -20,6 +20,8 @@ SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ rte_ring_elem.h \ rte_ring_generic.h \ rte_ring_c11_mem.h \ + rte_ring_hts.h \ + rte_ring_hts_c11_mem.h \ rte_ring_rts.h \ rte_ring_rts_c11_mem.h diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index a95598032..ca37cb8cc 100644 --- a/lib/librte_ring/meson.build +++ b/lib/librte_ring/meson.build @@ -7,6 +7,8 @@ headers = files('rte_ring.h', 'rte_ring_elem.h', 'rte_ring_c11_mem.h', 'rte_ring_generic.h', + 'rte_ring_hts.h', + 'rte_ring_hts_c11_mem.h', 'rte_ring_rts.h', 'rte_ring_rts_c11_mem.h') diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index 222eec0fb..ebe5ccf0d 100644 --- a/lib/librte_ring/rte_ring.c +++ b/lib/librte_ring/rte_ring.c @@ -89,9 +89,11 @@ static void reset_headtail(void *p) { struct rte_ring_headtail *ht; + struct rte_ring_hts_headtail *ht_hts; struct rte_ring_rts_headtail *ht_rts; ht = p; + ht_hts = p; ht_rts = p; switch (ht->sync_type) { @@ -104,6 +106,9 @@ reset_headtail(void *p) ht_rts->head.raw = 0; ht_rts->tail.raw = 0; break; + case RTE_RING_SYNC_MT_HTS: + ht_hts->ht.raw = 0; + break; default: /* unknown sync mode */ RTE_ASSERT(0); @@ -127,9 +132,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st, enum rte_ring_sync_type *cons_st) { static const uint32_t prod_st_flags = - (RING_F_SP_ENQ | RING_F_MP_RTS_ENQ); + (RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | RING_F_MP_HTS_ENQ); static const uint32_t cons_st_flags = - (RING_F_SC_DEQ | RING_F_MC_RTS_DEQ); + (RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | RING_F_MC_HTS_DEQ); switch (flags & prod_st_flags) { case 0: @@ -141,6 +146,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st, case RING_F_MP_RTS_ENQ: *prod_st = RTE_RING_SYNC_MT_RTS; break; + case RING_F_MP_HTS_ENQ: + *prod_st = RTE_RING_SYNC_MT_HTS; + break; default: return -EINVAL; } @@ -155,6 +163,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st, case RING_F_MC_RTS_DEQ: *cons_st = RTE_RING_SYNC_MT_RTS; break; + case RING_F_MC_HTS_DEQ: + *cons_st = RTE_RING_SYNC_MT_HTS; + break; default: return -EINVAL; } @@ -176,6 +187,11 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count, RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) & RTE_CACHE_LINE_MASK) != 0); + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) != + offsetof(struct rte_ring_hts_headtail, sync_type)); + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) != + offsetof(struct rte_ring_hts_headtail, ht.pos.tail)); + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) != offsetof(struct rte_ring_rts_headtail, sync_type)); RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) != diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index c42e1cfc4..7cf046528 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -86,6 +86,9 @@ ssize_t rte_ring_get_memsize(unsigned count); * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` * is "multi-producer RTS mode". + * - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer HTS mode". * If none of these flags is set, then default "multi-producer" * behavior is selected. * - One of mutually exclusive flags that define consumer behavior: @@ -95,6 +98,9 @@ ssize_t rte_ring_get_memsize(unsigned count); * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` * is "multi-consumer RTS mode". + * - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer HTS mode". * If none of these flags is set, then default "multi-consumer" * behavior is selected. * @return @@ -133,6 +139,9 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned count, * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` * is "multi-producer RTS mode". + * - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer HTS mode". * If none of these flags is set, then default "multi-producer" * behavior is selected. * - One of mutually exclusive flags that define consumer behavior: @@ -142,6 +151,9 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned count, * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` * is "multi-consumer RTS mode". + * - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer HTS mode". * If none of these flags is set, then default "multi-consumer" * behavior is selected. * @return @@ -422,6 +434,9 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mp_rts_enqueue_bulk(r, obj_table, n, free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_bulk(r, obj_table, n, + free_space); #endif } @@ -569,6 +584,8 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, #ifdef ALLOW_EXPERIMENTAL_API case RTE_RING_SYNC_MT_RTS: return rte_ring_mc_rts_dequeue_bulk(r, obj_table, n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_bulk(r, obj_table, n, available); #endif } @@ -903,6 +920,9 @@ rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mp_rts_enqueue_burst(r, obj_table, n, free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_burst(r, obj_table, n, + free_space); #endif } @@ -996,6 +1016,9 @@ rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mc_rts_dequeue_burst(r, obj_table, n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_burst(r, obj_table, n, + available); #endif } diff --git a/lib/librte_ring/rte_ring_core.h b/lib/librte_ring/rte_ring_core.h index bd21fa535..16718ca7f 100644 --- a/lib/librte_ring/rte_ring_core.h +++ b/lib/librte_ring/rte_ring_core.h @@ -59,6 +59,7 @@ enum rte_ring_sync_type { RTE_RING_SYNC_ST, /**< single thread only */ #ifdef ALLOW_EXPERIMENTAL_API RTE_RING_SYNC_MT_RTS, /**< multi-thread relaxed tail sync */ + RTE_RING_SYNC_MT_HTS, /**< multi-thread head/tail sync */ #endif }; @@ -95,6 +96,20 @@ struct rte_ring_rts_headtail { volatile union __rte_ring_rts_poscnt head; }; +union __rte_ring_hts_pos { + /** raw 8B value to read/write *head* and *tail* as one atomic op */ + uint64_t raw __rte_aligned(8); + struct { + uint32_t head; /**< head position */ + uint32_t tail; /**< tail position */ + } pos; +}; + +struct rte_ring_hts_headtail { + volatile union __rte_ring_hts_pos ht; + enum rte_ring_sync_type sync_type; /**< sync type of prod/cons */ +}; + /** * An RTE ring structure. * @@ -126,6 +141,7 @@ struct rte_ring { RTE_STD_C11 union { struct rte_ring_headtail prod; + struct rte_ring_hts_headtail hts_prod; struct rte_ring_rts_headtail rts_prod; } __rte_cache_aligned; @@ -135,6 +151,7 @@ struct rte_ring { RTE_STD_C11 union { struct rte_ring_headtail cons; + struct rte_ring_hts_headtail hts_cons; struct rte_ring_rts_headtail rts_cons; } __rte_cache_aligned; @@ -157,6 +174,9 @@ struct rte_ring { #define RING_F_MP_RTS_ENQ 0x0008 /**< The default enqueue is "MP RTS". */ #define RING_F_MC_RTS_DEQ 0x0010 /**< The default dequeue is "MC RTS". */ +#define RING_F_MP_HTS_ENQ 0x0020 /**< The default enqueue is "MP HTS". */ +#define RING_F_MC_HTS_DEQ 0x0040 /**< The default dequeue is "MC HTS". */ + #ifdef __cplusplus } #endif diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h index 4030753b6..492eef936 100644 --- a/lib/librte_ring/rte_ring_elem.h +++ b/lib/librte_ring/rte_ring_elem.h @@ -81,6 +81,9 @@ ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count); * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` * is "multi-producer RTS mode". + * - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer HTS mode". * If none of these flags is set, then default "multi-producer" * behavior is selected. * - One of mutually exclusive flags that define consumer behavior: @@ -90,6 +93,9 @@ ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count); * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` * is "multi-consumer RTS mode". + * - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer HTS mode". * If none of these flags is set, then default "multi-consumer" * behavior is selected. * @return @@ -541,6 +547,7 @@ rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, } #ifdef ALLOW_EXPERIMENTAL_API +#include #include #endif @@ -585,6 +592,9 @@ rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table, esize, n, free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_bulk_elem(r, obj_table, esize, n, + free_space); #endif } @@ -766,6 +776,9 @@ rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table, esize, n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_bulk_elem(r, obj_table, esize, + n, available); #endif } @@ -951,6 +964,9 @@ rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table, esize, n, free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_burst_elem(r, obj_table, esize, + n, free_space); #endif } @@ -1060,6 +1076,9 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table, esize, n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_burst_elem(r, obj_table, esize, + n, available); #endif } diff --git a/lib/librte_ring/rte_ring_hts.h b/lib/librte_ring/rte_ring_hts.h new file mode 100644 index 000000000..c7701defc --- /dev/null +++ b/lib/librte_ring/rte_ring_hts.h @@ -0,0 +1,332 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2020 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_HTS_H_ +#define _RTE_RING_HTS_H_ + +/** + * @file rte_ring_hts.h + * @b EXPERIMENTAL: this API may change without prior notice + * It is not recommended to include this file directly. + * Please include instead. + * + * Contains functions for serialized, aka Head-Tail Sync (HTS) ring mode. + * In that mode enqueue/dequeue operation is fully serialized: + * at any given moment only one enqueue/dequeue operation can proceed. + * This is achieved by allowing a thread to proceed with changing head.value + * only when head.value == tail.value. + * Both head and tail values are updated atomically (as one 64-bit value). + * To achieve that 64-bit CAS is used by head update routine. + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +/** + * @internal Enqueue several objects on the HTS ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_hts_enqueue_elem(struct rte_ring *r, const void *obj_table, + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *free_space) +{ + uint32_t free, head; + + n = __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free); + + if (n != 0) { + __rte_ring_enqueue_elems(r, head, obj_table, esize, n); + __rte_ring_hts_update_tail(&r->hts_prod, head, n, 1); + } + + if (free_space != NULL) + *free_space = free - n; + return n; +} + +/** + * @internal Dequeue several objects from the HTS ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_hts_dequeue_elem(struct rte_ring *r, void *obj_table, + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *available) +{ + uint32_t entries, head; + + n = __rte_ring_hts_move_cons_head(r, n, behavior, &head, &entries); + + if (n != 0) { + __rte_ring_dequeue_elems(r, head, obj_table, esize, n); + __rte_ring_hts_update_tail(&r->hts_cons, head, n, 0); + } + + if (available != NULL) + *available = entries - n; + return n; +} + +/** + * Enqueue several objects on the HTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mp_hts_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_hts_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, free_space); +} + +/** + * Dequeue several objects from an HTS ring (multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mc_hts_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_hts_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, available); +} + +/** + * Enqueue several objects on the HTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mp_hts_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_hts_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, free_space); +} + +/** + * Dequeue several objects from an HTS ring (multi-consumers safe). + * When the requested objects are more than the available objects, + * only dequeue the actual number of objects. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mc_hts_dequeue_burst_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_hts_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, available); +} + +/** + * Enqueue several objects on the HTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mp_hts_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return rte_ring_mp_hts_enqueue_bulk_elem(r, obj_table, + sizeof(uintptr_t), n, free_space); +} + +/** + * Dequeue several objects from an HTS ring (multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mc_hts_dequeue_bulk(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return rte_ring_mc_hts_dequeue_bulk_elem(r, obj_table, + sizeof(uintptr_t), n, available); +} + +/** + * Enqueue several objects on the HTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mp_hts_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return rte_ring_mp_hts_enqueue_burst_elem(r, obj_table, + sizeof(uintptr_t), n, free_space); +} + +/** + * Dequeue several objects from an HTS ring (multi-consumers safe). + * When the requested objects are more than the available objects, + * only dequeue the actual number of objects. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mc_hts_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return rte_ring_mc_hts_dequeue_burst_elem(r, obj_table, + sizeof(uintptr_t), n, available); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_RING_HTS_H_ */ diff --git a/lib/librte_ring/rte_ring_hts_c11_mem.h b/lib/librte_ring/rte_ring_hts_c11_mem.h new file mode 100644 index 000000000..16e54b6ff --- /dev/null +++ b/lib/librte_ring/rte_ring_hts_c11_mem.h @@ -0,0 +1,164 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2020 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_HTS_C11_MEM_H_ +#define _RTE_RING_HTS_C11_MEM_H_ + +/** + * @file rte_ring_hts_c11_mem.h + * It is not recommended to include this file directly, + * include instead. + * Contains internal helper functions for head/tail sync (HTS) ring mode. + * For more information please refer to . + */ + +/** + * @internal update tail with new value. + */ +static __rte_always_inline void +__rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t old_tail, + uint32_t num, uint32_t enqueue) +{ + uint32_t tail; + + RTE_SET_USED(enqueue); + + tail = old_tail + num; + __atomic_store_n(&ht->ht.pos.tail, tail, __ATOMIC_RELEASE); +} + +/** + * @internal waits till tail will become equal to head. + * Means no writer/reader is active for that ring. + * Suppose to work as serialization point. + */ +static __rte_always_inline void +__rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht, + union __rte_ring_hts_pos *p) +{ + while (p->pos.head != p->pos.tail) { + rte_pause(); + p->raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_ACQUIRE); + } +} + +/** + * @internal This function updates the producer head for enqueue + */ +static __rte_always_inline unsigned int +__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num, + enum rte_ring_queue_behavior behavior, uint32_t *old_head, + uint32_t *free_entries) +{ + uint32_t n; + union __rte_ring_hts_pos np, op; + + const uint32_t capacity = r->capacity; + + op.raw = __atomic_load_n(&r->hts_prod.ht.raw, __ATOMIC_ACQUIRE); + + do { + /* Reset n to the initial burst count */ + n = num; + + /* + * wait for tail to be equal to head, + * make sure that we read prod head/tail *before* + * reading cons tail. + */ + __rte_ring_hts_head_wait(&r->hts_prod, &op); + + /* + * The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * *old_head > cons_tail). So 'free_entries' is always between 0 + * and capacity (which is < size). + */ + *free_entries = capacity + r->cons.tail - op.pos.head; + + /* check that we have enough room in ring */ + if (unlikely(n > *free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : *free_entries; + + if (n == 0) + break; + + np.pos.tail = op.pos.tail; + np.pos.head = op.pos.head + n; + + /* + * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent: + * - OOO reads of cons tail value + * - OOO copy of elems from the ring + */ + } while (__atomic_compare_exchange_n(&r->hts_prod.ht.raw, + &op.raw, np.raw, + 0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0); + + *old_head = op.pos.head; + return n; +} + +/** + * @internal This function updates the consumer head for dequeue + */ +static __rte_always_inline unsigned int +__rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num, + enum rte_ring_queue_behavior behavior, uint32_t *old_head, + uint32_t *entries) +{ + uint32_t n; + union __rte_ring_hts_pos np, op; + + op.raw = __atomic_load_n(&r->hts_cons.ht.raw, __ATOMIC_ACQUIRE); + + /* move cons.head atomically */ + do { + /* Restore n as it may change every loop */ + n = num; + + /* + * wait for tail to be equal to head, + * make sure that we read cons head/tail *before* + * reading prod tail. + */ + __rte_ring_hts_head_wait(&r->hts_cons, &op); + + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. + */ + *entries = r->prod.tail - op.pos.head; + + /* Set the actual entries for dequeue */ + if (n > *entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (unlikely(n == 0)) + break; + + np.pos.tail = op.pos.tail; + np.pos.head = op.pos.head + n; + + /* + * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent: + * - OOO reads of prod tail value + * - OOO copy of elems from the ring + */ + } while (__atomic_compare_exchange_n(&r->hts_cons.ht.raw, + &op.raw, np.raw, + 0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0); + + *old_head = op.pos.head; + return n; +} + +#endif /* _RTE_RING_HTS_C11_MEM_H_ */ From patchwork Mon Apr 20 12:28:27 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Ananyev, Konstantin" X-Patchwork-Id: 68957 X-Patchwork-Delegate: david.marchand@redhat.com Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id DBAF0A0561; Mon, 20 Apr 2020 14:30:00 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 8C76D1D608; Mon, 20 Apr 2020 14:29:11 +0200 (CEST) Received: from mga17.intel.com (mga17.intel.com [192.55.52.151]) by dpdk.org (Postfix) with ESMTP id 49D7F1D5EF for ; Mon, 20 Apr 2020 14:28:57 +0200 (CEST) IronPort-SDR: NXyLLTiiAo8HoUF+O8IggZ/6l3uASE2aSHBlJ3g3qySs102uQodUFqhCYAX6zritjnpLSMAYaR DK5SQh6ijiBA== X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga003.jf.intel.com ([10.7.209.27]) by fmsmga107.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 20 Apr 2020 05:28:56 -0700 IronPort-SDR: 5ZG2lsbKCqfdEQ/zF6G+S08znqCQdub2K9Wt62FdWtCyK+JZZOU7t9avUy8MKVHSE1UCKQF8eS /ypSBFPaHofw== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.72,406,1580803200"; d="scan'208";a="254923906" Received: from sivswdev08.ir.intel.com ([10.237.217.47]) by orsmga003.jf.intel.com with ESMTP; 20 Apr 2020 05:28:55 -0700 From: Konstantin Ananyev To: dev@dpdk.org Cc: honnappa.nagarahalli@arm.com, david.marchand@redhat.com, jielong.zjl@antfin.com, Konstantin Ananyev Date: Mon, 20 Apr 2020 13:28:27 +0100 Message-Id: <20200420122831.16973-7-konstantin.ananyev@intel.com> X-Mailer: git-send-email 2.18.0 In-Reply-To: <20200420122831.16973-1-konstantin.ananyev@intel.com> References: <20200420121113.9327-1-konstantin.ananyev@intel.com> <20200420122831.16973-1-konstantin.ananyev@intel.com> Subject: [dpdk-dev] [PATCH v7 06/10] test/ring: add contention stress test for HTS ring X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Introduce new test case to test HTS ring mode under contention. Signed-off-by: Konstantin Ananyev Acked-by: Honnappa Nagarahalli --- app/test/Makefile | 1 + app/test/meson.build | 1 + app/test/test_ring_hts_stress.c | 32 ++++++++++++++++++++++++++++++++ app/test/test_ring_stress.c | 3 +++ app/test/test_ring_stress.h | 1 + 5 files changed, 38 insertions(+) create mode 100644 app/test/test_ring_hts_stress.c diff --git a/app/test/Makefile b/app/test/Makefile index 00b74b5c9..28f0b9ac2 100644 --- a/app/test/Makefile +++ b/app/test/Makefile @@ -78,6 +78,7 @@ SRCS-y += test_rand_perf.c SRCS-y += test_ring.c SRCS-y += test_ring_mpmc_stress.c +SRCS-y += test_ring_hts_stress.c SRCS-y += test_ring_perf.c SRCS-y += test_ring_rts_stress.c SRCS-y += test_ring_stress.c diff --git a/app/test/meson.build b/app/test/meson.build index 97ad822c1..20c4978c2 100644 --- a/app/test/meson.build +++ b/app/test/meson.build @@ -101,6 +101,7 @@ test_sources = files('commands.c', 'test_rib6.c', 'test_ring.c', 'test_ring_mpmc_stress.c', + 'test_ring_hts_stress.c', 'test_ring_perf.c', 'test_ring_rts_stress.c', 'test_ring_stress.c', diff --git a/app/test/test_ring_hts_stress.c b/app/test/test_ring_hts_stress.c new file mode 100644 index 000000000..edc9175cb --- /dev/null +++ b/app/test/test_ring_hts_stress.c @@ -0,0 +1,32 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2020 Intel Corporation + */ + +#include "test_ring_stress_impl.h" + +static inline uint32_t +_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n, + uint32_t *avail) +{ + return rte_ring_mc_hts_dequeue_bulk(r, obj, n, avail); +} + +static inline uint32_t +_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n, + uint32_t *free) +{ + return rte_ring_mp_hts_enqueue_bulk(r, obj, n, free); +} + +static int +_st_ring_init(struct rte_ring *r, const char *name, uint32_t num) +{ + return rte_ring_init(r, name, num, + RING_F_MP_HTS_ENQ | RING_F_MC_HTS_DEQ); +} + +const struct test test_ring_hts_stress = { + .name = "MT_HTS", + .nb_case = RTE_DIM(tests), + .cases = tests, +}; diff --git a/app/test/test_ring_stress.c b/app/test/test_ring_stress.c index eab395e30..29a1368d7 100644 --- a/app/test/test_ring_stress.c +++ b/app/test/test_ring_stress.c @@ -43,6 +43,9 @@ test_ring_stress(void) n += test_ring_rts_stress.nb_case; k += run_test(&test_ring_rts_stress); + n += test_ring_hts_stress.nb_case; + k += run_test(&test_ring_hts_stress); + printf("Number of tests:\t%u\nSuccess:\t%u\nFailed:\t%u\n", n, k, n - k); return (k != n); diff --git a/app/test/test_ring_stress.h b/app/test/test_ring_stress.h index 32aae2072..9a87c7f7b 100644 --- a/app/test/test_ring_stress.h +++ b/app/test/test_ring_stress.h @@ -34,3 +34,4 @@ struct test { extern const struct test test_ring_mpmc_stress; extern const struct test test_ring_rts_stress; +extern const struct test test_ring_hts_stress; From patchwork Mon Apr 20 12:28:28 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Ananyev, Konstantin" X-Patchwork-Id: 68958 X-Patchwork-Delegate: david.marchand@redhat.com Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id 896F5A0561; Mon, 20 Apr 2020 14:30:14 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id C4CA31D612; Mon, 20 Apr 2020 14:29:12 +0200 (CEST) Received: from mga17.intel.com (mga17.intel.com [192.55.52.151]) by dpdk.org (Postfix) with ESMTP id D0F401D5DE for ; Mon, 20 Apr 2020 14:28:59 +0200 (CEST) IronPort-SDR: Izc48dRBWMwUC35oK7otRjdHKpusf8kImXYKAIG07dgmpRSlHLe/mzMdQmoCRTTy2TQ4oP7Zid aZvhWv/U0iYg== X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga003.jf.intel.com ([10.7.209.27]) by fmsmga107.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 20 Apr 2020 05:28:59 -0700 IronPort-SDR: cWxqFqZHmly4jPc+k2ucrXHKr0hEB37Q+yt59S2IGd6Ss6MZzsNMgKe4qZiU4BzSRgJaIJfsAj KWDvT1Y5ylsw== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.72,406,1580803200"; d="scan'208";a="254923912" Received: from sivswdev08.ir.intel.com ([10.237.217.47]) by orsmga003.jf.intel.com with ESMTP; 20 Apr 2020 05:28:57 -0700 From: Konstantin Ananyev To: dev@dpdk.org Cc: honnappa.nagarahalli@arm.com, david.marchand@redhat.com, jielong.zjl@antfin.com, Konstantin Ananyev Date: Mon, 20 Apr 2020 13:28:28 +0100 Message-Id: <20200420122831.16973-8-konstantin.ananyev@intel.com> X-Mailer: git-send-email 2.18.0 In-Reply-To: <20200420122831.16973-1-konstantin.ananyev@intel.com> References: <20200420121113.9327-1-konstantin.ananyev@intel.com> <20200420122831.16973-1-konstantin.ananyev@intel.com> Subject: [dpdk-dev] [PATCH v7 07/10] ring: introduce peek style API X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" For rings with producer/consumer in RTE_RING_SYNC_ST, RTE_RING_SYNC_MT_HTS mode, provide an ability to split enqueue/dequeue operation into two phases: - enqueue/dequeue start - enqueue/dequeue finish That allows user to inspect objects in the ring without removing them from it (aka MT safe peek). Signed-off-by: Konstantin Ananyev Acked-by: Honnappa Nagarahalli --- doc/guides/rel_notes/release_20_05.rst | 11 +- lib/librte_ring/Makefile | 2 + lib/librte_ring/meson.build | 2 + lib/librte_ring/rte_ring.h | 3 + lib/librte_ring/rte_ring_elem.h | 4 + lib/librte_ring/rte_ring_peek.h | 444 ++++++++++++++++++++++++ lib/librte_ring/rte_ring_peek_c11_mem.h | 110 ++++++ 7 files changed, 575 insertions(+), 1 deletion(-) create mode 100644 lib/librte_ring/rte_ring_peek.h create mode 100644 lib/librte_ring/rte_ring_peek_c11_mem.h diff --git a/doc/guides/rel_notes/release_20_05.rst b/doc/guides/rel_notes/release_20_05.rst index db8a281db..ec558c64f 100644 --- a/doc/guides/rel_notes/release_20_05.rst +++ b/doc/guides/rel_notes/release_20_05.rst @@ -81,13 +81,22 @@ New Features by making use of the event device capabilities. The event mode currently supports only inline IPsec protocol offload. -* **New synchronization modes for rte_ring.** +* **Added new API for rte_ring.** + + * New synchronization modes for rte_ring. Introduced new optional MT synchronization modes for rte_ring: Relaxed Tail Sync (RTS) mode and Head/Tail Sync (HTS) mode. With these mode selected, rte_ring shows significant improvements for average enqueue/dequeue times on overcommitted systems. + * Added peek style API for rte_ring. + + For rings with producer/consumer in RTE_RING_SYNC_ST, RTE_RING_SYNC_MT_HTS + mode, provide an ability to split enqueue/dequeue operation into two phases + (enqueue/dequeue start; enqueue/dequeue finish). That allows user to inspect + objects in the ring without removing them from it (aka MT safe peek). + Removed Items ------------- diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index f75d8e530..83a9d0840 100644 --- a/lib/librte_ring/Makefile +++ b/lib/librte_ring/Makefile @@ -22,6 +22,8 @@ SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ rte_ring_c11_mem.h \ rte_ring_hts.h \ rte_ring_hts_c11_mem.h \ + rte_ring_peek.h \ + rte_ring_peek_c11_mem.h \ rte_ring_rts.h \ rte_ring_rts_c11_mem.h diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index ca37cb8cc..4f77647cd 100644 --- a/lib/librte_ring/meson.build +++ b/lib/librte_ring/meson.build @@ -9,6 +9,8 @@ headers = files('rte_ring.h', 'rte_ring_generic.h', 'rte_ring_hts.h', 'rte_ring_hts_c11_mem.h', + 'rte_ring_peek.h', + 'rte_ring_peek_c11_mem.h', 'rte_ring_rts.h', 'rte_ring_rts_c11_mem.h') diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index 7cf046528..86faede81 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -25,6 +25,9 @@ * - Multi- or single-producer enqueue. * - Bulk dequeue. * - Bulk enqueue. + * - Ability to select different sync modes for producer/consumer. + * - Dequeue start/finish (depending on consumer sync modes). + * - Enqueue start/finish (depending on producer sync mode). * * Note: the ring implementation is not preemptible. Refer to Programmer's * guide/Environment Abstraction Layer/Multiple pthread/Known Issues/rte_ring diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h index 492eef936..a5a4c46f9 100644 --- a/lib/librte_ring/rte_ring_elem.h +++ b/lib/librte_ring/rte_ring_elem.h @@ -1089,6 +1089,10 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table, return 0; } +#ifdef ALLOW_EXPERIMENTAL_API +#include +#endif + #include #ifdef __cplusplus diff --git a/lib/librte_ring/rte_ring_peek.h b/lib/librte_ring/rte_ring_peek.h new file mode 100644 index 000000000..1ad8bba22 --- /dev/null +++ b/lib/librte_ring/rte_ring_peek.h @@ -0,0 +1,444 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2020 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_PEEK_H_ +#define _RTE_RING_PEEK_H_ + +/** + * @file + * @b EXPERIMENTAL: this API may change without prior notice + * It is not recommended to include this file directly. + * Please include instead. + * + * Ring Peek API + * Introduction of rte_ring with serialized producer/consumer (HTS sync mode) + * makes possible to split public enqueue/dequeue API into two phases: + * - enqueue/dequeue start + * - enqueue/dequeue finish + * That allows user to inspect objects in the ring without removing them + * from it (aka MT safe peek). + * Note that right now this new API is avaialble only for two sync modes: + * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST) + * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS). + * It is a user responsibility to create/init ring with appropriate sync + * modes selected. + * As an example: + * // read 1 elem from the ring: + * n = rte_ring_dequeue_bulk_start(ring, &obj, 1, NULL); + * if (n != 0) { + * //examine object + * if (object_examine(obj) == KEEP) + * //decided to keep it in the ring. + * rte_ring_dequeue_finish(ring, 0); + * else + * //decided to remove it from the ring. + * rte_ring_dequeue_finish(ring, n); + * } + * Note that between _start_ and _finish_ none other thread can proceed + * with enqueue(/dequeue) operation till _finish_ completes. + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +/** + * @internal This function moves prod head value. + */ +static __rte_always_inline unsigned int +__rte_ring_do_enqueue_start(struct rte_ring *r, uint32_t n, + enum rte_ring_queue_behavior behavior, uint32_t *free_space) +{ + uint32_t free, head, next; + + switch (r->prod.sync_type) { + case RTE_RING_SYNC_ST: + n = __rte_ring_move_prod_head(r, RTE_RING_SYNC_ST, n, + behavior, &head, &next, &free); + break; + case RTE_RING_SYNC_MT_HTS: + n = __rte_ring_hts_move_prod_head(r, n, behavior, + &head, &free); + break; + default: + /* unsupported mode, shouldn't be here */ + RTE_ASSERT(0); + n = 0; + } + + if (free_space != NULL) + *free_space = free - n; + return n; +} + +/** + * Start to enqueue several objects on the ring. + * Note that no actual objects are put in the queue by this function, + * it just reserves for user such ability. + * User has to call appropriate enqueue_elem_finish() to copy objects into the + * queue and complete given enqueue operation. + * + * @param r + * A pointer to the ring structure. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects that can be enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_enqueue_bulk_elem_start(struct rte_ring *r, unsigned int n, + unsigned int *free_space) +{ + return __rte_ring_do_enqueue_start(r, n, RTE_RING_QUEUE_FIXED, + free_space); +} + +/** + * Start to enqueue several objects on the ring. + * Note that no actual objects are put in the queue by this function, + * it just reserves for user such ability. + * User has to call appropriate enqueue_finish() to copy objects into the + * queue and complete given enqueue operation. + * + * @param r + * A pointer to the ring structure. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects that can be enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_enqueue_bulk_start(struct rte_ring *r, unsigned int n, + unsigned int *free_space) +{ + return rte_ring_enqueue_bulk_elem_start(r, n, free_space); +} + +/** + * Start to enqueue several objects on the ring. + * Note that no actual objects are put in the queue by this function, + * it just reserves for user such ability. + * User has to call appropriate enqueue_elem_finish() to copy objects into the + * queue and complete given enqueue operation. + * + * @param r + * A pointer to the ring structure. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * Actual number of objects that can be enqueued. + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_enqueue_burst_elem_start(struct rte_ring *r, unsigned int n, + unsigned int *free_space) +{ + return __rte_ring_do_enqueue_start(r, n, RTE_RING_QUEUE_VARIABLE, + free_space); +} + +/** + * Start to enqueue several objects on the ring. + * Note that no actual objects are put in the queue by this function, + * it just reserves for user such ability. + * User has to call appropriate enqueue_finish() to copy objects into the + * queue and complete given enqueue operation. + * + * @param r + * A pointer to the ring structure. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * Actual number of objects that can be enqueued. + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_enqueue_burst_start(struct rte_ring *r, unsigned int n, + unsigned int *free_space) +{ + return rte_ring_enqueue_burst_elem_start(r, n, free_space); +} + +/** + * Complete to enqueue several objects on the ring. + * Note that number of objects to enqueue should not exceed previous + * enqueue_start return value. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add to the ring from the obj_table. + */ +__rte_experimental +static __rte_always_inline void +rte_ring_enqueue_elem_finish(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n) +{ + uint32_t tail; + + switch (r->prod.sync_type) { + case RTE_RING_SYNC_ST: + n = __rte_ring_st_get_tail(&r->prod, &tail, n); + if (n != 0) + __rte_ring_enqueue_elems(r, tail, obj_table, esize, n); + __rte_ring_st_set_head_tail(&r->prod, tail, n, 1); + break; + case RTE_RING_SYNC_MT_HTS: + n = __rte_ring_hts_get_tail(&r->hts_prod, &tail, n); + if (n != 0) + __rte_ring_enqueue_elems(r, tail, obj_table, esize, n); + __rte_ring_hts_set_head_tail(&r->hts_prod, tail, n, 1); + break; + default: + /* unsupported mode, shouldn't be here */ + RTE_ASSERT(0); + } +} + +/** + * Complete to enqueue several objects on the ring. + * Note that number of objects to enqueue should not exceed previous + * enqueue_start return value. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param n + * The number of objects to add to the ring from the obj_table. + */ +__rte_experimental +static __rte_always_inline void +rte_ring_enqueue_finish(struct rte_ring *r, void * const *obj_table, + unsigned int n) +{ + rte_ring_enqueue_elem_finish(r, obj_table, sizeof(uintptr_t), n); +} + +/** + * @internal This function moves cons head value and copies up to *n* + * objects from the ring to the user provided obj_table. + */ +static __rte_always_inline unsigned int +__rte_ring_do_dequeue_start(struct rte_ring *r, void *obj_table, + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *available) +{ + uint32_t avail, head, next; + + switch (r->cons.sync_type) { + case RTE_RING_SYNC_ST: + n = __rte_ring_move_cons_head(r, RTE_RING_SYNC_ST, n, + behavior, &head, &next, &avail); + break; + case RTE_RING_SYNC_MT_HTS: + n = __rte_ring_hts_move_cons_head(r, n, behavior, + &head, &avail); + break; + default: + /* unsupported mode, shouldn't be here */ + RTE_ASSERT(0); + n = 0; + } + + if (n != 0) + __rte_ring_dequeue_elems(r, head, obj_table, esize, n); + + if (available != NULL) + *available = avail - n; + return n; +} + +/** + * Start to dequeue several objects from the ring. + * Note that user has to call appropriate dequeue_finish() + * to complete given dequeue operation and actually remove objects the ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n. + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_dequeue_bulk_elem_start(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue_start(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, available); +} + +/** + * Start to dequeue several objects from the ring. + * Note that user has to call appropriate dequeue_finish() + * to complete given dequeue operation and actually remove objects the ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * Actual number of objects dequeued. + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_dequeue_bulk_start(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return rte_ring_dequeue_bulk_elem_start(r, obj_table, sizeof(uintptr_t), + n, available); +} + +/** + * Start to dequeue several objects from the ring. + * Note that user has to call appropriate dequeue_finish() + * to complete given dequeue operation and actually remove objects the ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The actual number of objects dequeued. + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_dequeue_burst_elem_start(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue_start(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, available); +} + +/** + * Start to dequeue several objects from the ring. + * Note that user has to call appropriate dequeue_finish() + * to complete given dequeue operation and actually remove objects the ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The actual number of objects dequeued. + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_dequeue_burst_start(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return rte_ring_dequeue_burst_elem_start(r, obj_table, + sizeof(uintptr_t), n, available); +} + +/** + * Complete to dequeue several objects from the ring. + * Note that number of objects to dequeue should not exceed previous + * dequeue_start return value. + * + * @param r + * A pointer to the ring structure. + * @param n + * The number of objects to remove from the ring. + */ +__rte_experimental +static __rte_always_inline void +rte_ring_dequeue_elem_finish(struct rte_ring *r, unsigned int n) +{ + uint32_t tail; + + switch (r->cons.sync_type) { + case RTE_RING_SYNC_ST: + n = __rte_ring_st_get_tail(&r->cons, &tail, n); + __rte_ring_st_set_head_tail(&r->cons, tail, n, 0); + break; + case RTE_RING_SYNC_MT_HTS: + n = __rte_ring_hts_get_tail(&r->hts_cons, &tail, n); + __rte_ring_hts_set_head_tail(&r->hts_cons, tail, n, 0); + break; + default: + /* unsupported mode, shouldn't be here */ + RTE_ASSERT(0); + } +} + +/** + * Complete to dequeue several objects from the ring. + * Note that number of objects to dequeue should not exceed previous + * dequeue_start return value. + * + * @param r + * A pointer to the ring structure. + * @param n + * The number of objects to remove from the ring. + */ +__rte_experimental +static __rte_always_inline void +rte_ring_dequeue_finish(struct rte_ring *r, unsigned int n) +{ + rte_ring_dequeue_elem_finish(r, n); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_RING_PEEK_H_ */ diff --git a/lib/librte_ring/rte_ring_peek_c11_mem.h b/lib/librte_ring/rte_ring_peek_c11_mem.h new file mode 100644 index 000000000..99321f124 --- /dev/null +++ b/lib/librte_ring/rte_ring_peek_c11_mem.h @@ -0,0 +1,110 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2020 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_PEEK_C11_MEM_H_ +#define _RTE_RING_PEEK_C11_MEM_H_ + +/** + * @file rte_ring_peek_c11_mem.h + * It is not recommended to include this file directly, + * include instead. + * Contains internal helper functions for rte_ring peek API. + * For more information please refer to . + */ + +/** + * @internal get current tail value. + * This function should be used only for single thread producer/consumer. + * Check that user didn't request to move tail above the head. + * In that situation: + * - return zero, that will cause abort any pending changes and + * return head to its previous position. + * - throw an assert in debug mode. + */ +static __rte_always_inline uint32_t +__rte_ring_st_get_tail(struct rte_ring_headtail *ht, uint32_t *tail, + uint32_t num) +{ + uint32_t h, n, t; + + h = ht->head; + t = ht->tail; + n = h - t; + + RTE_ASSERT(n >= num); + num = (n >= num) ? num : 0; + + *tail = h; + return num; +} + +/** + * @internal set new values for head and tail. + * This function should be used only for single thread producer/consumer. + * Should be used only in conjunction with __rte_ring_st_get_tail. + */ +static __rte_always_inline void +__rte_ring_st_set_head_tail(struct rte_ring_headtail *ht, uint32_t tail, + uint32_t num, uint32_t enqueue) +{ + uint32_t pos; + + RTE_SET_USED(enqueue); + + pos = tail + num; + ht->head = pos; + __atomic_store_n(&ht->tail, pos, __ATOMIC_RELEASE); +} + +/** + * @internal get current tail value. + * This function should be used only for producer/consumer in MT_HTS mode. + * Check that user didn't request to move tail above the head. + * In that situation: + * - return zero, that will cause abort any pending changes and + * return head to its previous position. + * - throw an assert in debug mode. + */ +static __rte_always_inline uint32_t +__rte_ring_hts_get_tail(struct rte_ring_hts_headtail *ht, uint32_t *tail, + uint32_t num) +{ + uint32_t n; + union __rte_ring_hts_pos p; + + p.raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_RELAXED); + n = p.pos.head - p.pos.tail; + + RTE_ASSERT(n >= num); + num = (n >= num) ? num : 0; + + *tail = p.pos.tail; + return num; +} + +/** + * @internal set new values for head and tail as one atomic 64 bit operation. + * This function should be used only for producer/consumer in MT_HTS mode. + * Should be used only in conjunction with __rte_ring_hts_get_tail. + */ +static __rte_always_inline void +__rte_ring_hts_set_head_tail(struct rte_ring_hts_headtail *ht, uint32_t tail, + uint32_t num, uint32_t enqueue) +{ + union __rte_ring_hts_pos p; + + RTE_SET_USED(enqueue); + + p.pos.head = tail + num; + p.pos.tail = p.pos.head; + + __atomic_store_n(&ht->ht.raw, p.raw, __ATOMIC_RELEASE); +} + +#endif /* _RTE_RING_PEEK_C11_MEM_H_ */ From patchwork Mon Apr 20 12:28:29 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Ananyev, Konstantin" X-Patchwork-Id: 68959 X-Patchwork-Delegate: david.marchand@redhat.com Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id DD474A0561; Mon, 20 Apr 2020 14:30:29 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 0FBC01D61A; Mon, 20 Apr 2020 14:29:14 +0200 (CEST) Received: from mga17.intel.com (mga17.intel.com [192.55.52.151]) by dpdk.org (Postfix) with ESMTP id 8981E1D5E7 for ; Mon, 20 Apr 2020 14:29:02 +0200 (CEST) IronPort-SDR: zg8Ptmb86q/P1Gy+1nytbMWPE31xdwzmKvcS4nN19U3OuebzKCAbhDHaxMLnLS1d1H3J902PI6 sG0QuEXOE21A== X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga003.jf.intel.com ([10.7.209.27]) by fmsmga107.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 20 Apr 2020 05:29:02 -0700 IronPort-SDR: +6HiY84o6aHEQGzXtlLh+EGFjC0rUc+7p8HWqBwOQZNL224e8+mGihKoabySCkDj554y1OrJEp HW7LWCG5rQqw== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.72,406,1580803200"; d="scan'208";a="254923922" Received: from sivswdev08.ir.intel.com ([10.237.217.47]) by orsmga003.jf.intel.com with ESMTP; 20 Apr 2020 05:29:00 -0700 From: Konstantin Ananyev To: dev@dpdk.org Cc: honnappa.nagarahalli@arm.com, david.marchand@redhat.com, jielong.zjl@antfin.com, Konstantin Ananyev Date: Mon, 20 Apr 2020 13:28:29 +0100 Message-Id: <20200420122831.16973-9-konstantin.ananyev@intel.com> X-Mailer: git-send-email 2.18.0 In-Reply-To: <20200420122831.16973-1-konstantin.ananyev@intel.com> References: <20200420121113.9327-1-konstantin.ananyev@intel.com> <20200420122831.16973-1-konstantin.ananyev@intel.com> Subject: [dpdk-dev] [PATCH v7 08/10] test/ring: add stress test for MT peek API X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Introduce new test case to test MT peek API. Signed-off-by: Konstantin Ananyev Acked-by: Honnappa Nagarahalli --- app/test/Makefile | 1 + app/test/meson.build | 1 + app/test/test_ring_peek_stress.c | 43 ++++++++++++++++++++++++++++++++ app/test/test_ring_stress.c | 3 +++ app/test/test_ring_stress.h | 1 + 5 files changed, 49 insertions(+) create mode 100644 app/test/test_ring_peek_stress.c diff --git a/app/test/Makefile b/app/test/Makefile index 28f0b9ac2..631a21028 100644 --- a/app/test/Makefile +++ b/app/test/Makefile @@ -80,6 +80,7 @@ SRCS-y += test_ring.c SRCS-y += test_ring_mpmc_stress.c SRCS-y += test_ring_hts_stress.c SRCS-y += test_ring_perf.c +SRCS-y += test_ring_peek_stress.c SRCS-y += test_ring_rts_stress.c SRCS-y += test_ring_stress.c SRCS-y += test_pmd_perf.c diff --git a/app/test/meson.build b/app/test/meson.build index 20c4978c2..d15278cf9 100644 --- a/app/test/meson.build +++ b/app/test/meson.build @@ -102,6 +102,7 @@ test_sources = files('commands.c', 'test_ring.c', 'test_ring_mpmc_stress.c', 'test_ring_hts_stress.c', + 'test_ring_peek_stress.c', 'test_ring_perf.c', 'test_ring_rts_stress.c', 'test_ring_stress.c', diff --git a/app/test/test_ring_peek_stress.c b/app/test/test_ring_peek_stress.c new file mode 100644 index 000000000..cfc82d728 --- /dev/null +++ b/app/test/test_ring_peek_stress.c @@ -0,0 +1,43 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2020 Intel Corporation + */ + +#include "test_ring_stress_impl.h" +#include + +static inline uint32_t +_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n, + uint32_t *avail) +{ + uint32_t m; + + m = rte_ring_dequeue_bulk_start(r, obj, n, avail); + n = (m == n) ? n : 0; + rte_ring_dequeue_finish(r, n); + return n; +} + +static inline uint32_t +_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n, + uint32_t *free) +{ + uint32_t m; + + m = rte_ring_enqueue_bulk_start(r, n, free); + n = (m == n) ? n : 0; + rte_ring_enqueue_finish(r, obj, n); + return n; +} + +static int +_st_ring_init(struct rte_ring *r, const char *name, uint32_t num) +{ + return rte_ring_init(r, name, num, + RING_F_MP_HTS_ENQ | RING_F_MC_HTS_DEQ); +} + +const struct test test_ring_peek_stress = { + .name = "MT_PEEK", + .nb_case = RTE_DIM(tests), + .cases = tests, +}; diff --git a/app/test/test_ring_stress.c b/app/test/test_ring_stress.c index 29a1368d7..853fcc190 100644 --- a/app/test/test_ring_stress.c +++ b/app/test/test_ring_stress.c @@ -46,6 +46,9 @@ test_ring_stress(void) n += test_ring_hts_stress.nb_case; k += run_test(&test_ring_hts_stress); + n += test_ring_peek_stress.nb_case; + k += run_test(&test_ring_peek_stress); + printf("Number of tests:\t%u\nSuccess:\t%u\nFailed:\t%u\n", n, k, n - k); return (k != n); diff --git a/app/test/test_ring_stress.h b/app/test/test_ring_stress.h index 9a87c7f7b..60953ce47 100644 --- a/app/test/test_ring_stress.h +++ b/app/test/test_ring_stress.h @@ -35,3 +35,4 @@ struct test { extern const struct test test_ring_mpmc_stress; extern const struct test test_ring_rts_stress; extern const struct test test_ring_hts_stress; +extern const struct test test_ring_peek_stress; From patchwork Mon Apr 20 12:28:30 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Ananyev, Konstantin" X-Patchwork-Id: 68960 X-Patchwork-Delegate: david.marchand@redhat.com Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id A8B20A0561; Mon, 20 Apr 2020 14:30:46 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 687901D624; Mon, 20 Apr 2020 14:29:15 +0200 (CEST) Received: from mga17.intel.com (mga17.intel.com [192.55.52.151]) by dpdk.org (Postfix) with ESMTP id 002991D5E7 for ; Mon, 20 Apr 2020 14:29:04 +0200 (CEST) IronPort-SDR: ENN4Z4ubAND6G3rgf14gkZnKBepqay167mcj4erKSNFsWsbFoMGE/gbnMR7uqPTzr+z2d9JdYi PiaDFZ1Z06Ng== X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga003.jf.intel.com ([10.7.209.27]) by fmsmga107.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 20 Apr 2020 05:29:04 -0700 IronPort-SDR: ppxgIPspPvC1seYfUD3514kpdeBp5g3d4TkSH79BT5CUKLEizWMo9Lh1EPhxxj1+viWEELO5Z0 UhKHq+Eu1zWA== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.72,406,1580803200"; d="scan'208";a="254923930" Received: from sivswdev08.ir.intel.com ([10.237.217.47]) by orsmga003.jf.intel.com with ESMTP; 20 Apr 2020 05:29:03 -0700 From: Konstantin Ananyev To: dev@dpdk.org Cc: honnappa.nagarahalli@arm.com, david.marchand@redhat.com, jielong.zjl@antfin.com, Konstantin Ananyev Date: Mon, 20 Apr 2020 13:28:30 +0100 Message-Id: <20200420122831.16973-10-konstantin.ananyev@intel.com> X-Mailer: git-send-email 2.18.0 In-Reply-To: <20200420122831.16973-1-konstantin.ananyev@intel.com> References: <20200420121113.9327-1-konstantin.ananyev@intel.com> <20200420122831.16973-1-konstantin.ananyev@intel.com> Subject: [dpdk-dev] [PATCH v7 09/10] test/ring: add functional tests for new sync modes X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Extend test_ring_autotest with new test-cases for RTS/HTS sync modes. Signed-off-by: Konstantin Ananyev Acked-by: Honnappa Nagarahalli --- app/test/test_ring.c | 93 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 73 insertions(+), 20 deletions(-) diff --git a/app/test/test_ring.c b/app/test/test_ring.c index fbcd109b1..e21557cd9 100644 --- a/app/test/test_ring.c +++ b/app/test/test_ring.c @@ -203,7 +203,8 @@ test_ring_negative_tests(void) * Random number of elements are enqueued and dequeued. */ static int -test_ring_burst_bulk_tests1(unsigned int api_type) +test_ring_burst_bulk_tests1(unsigned int api_type, unsigned int create_flags, + const char *tname) { struct rte_ring *r; void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL; @@ -213,12 +214,11 @@ test_ring_burst_bulk_tests1(unsigned int api_type) const unsigned int rsz = RING_SIZE - 1; for (i = 0; i < RTE_DIM(esize); i++) { - test_ring_print_test_string("Test standard ring", api_type, - esize[i]); + test_ring_print_test_string(tname, api_type, esize[i]); /* Create the ring */ r = test_ring_create("test_ring_burst_bulk_tests", esize[i], - RING_SIZE, SOCKET_ID_ANY, 0); + RING_SIZE, SOCKET_ID_ANY, create_flags); /* alloc dummy object pointers */ src = test_ring_calloc(RING_SIZE * 2, esize[i]); @@ -294,7 +294,8 @@ test_ring_burst_bulk_tests1(unsigned int api_type) * dequeued data. */ static int -test_ring_burst_bulk_tests2(unsigned int api_type) +test_ring_burst_bulk_tests2(unsigned int api_type, unsigned int create_flags, + const char *tname) { struct rte_ring *r; void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL; @@ -302,12 +303,11 @@ test_ring_burst_bulk_tests2(unsigned int api_type) unsigned int i; for (i = 0; i < RTE_DIM(esize); i++) { - test_ring_print_test_string("Test standard ring", api_type, - esize[i]); + test_ring_print_test_string(tname, api_type, esize[i]); /* Create the ring */ r = test_ring_create("test_ring_burst_bulk_tests", esize[i], - RING_SIZE, SOCKET_ID_ANY, 0); + RING_SIZE, SOCKET_ID_ANY, create_flags); /* alloc dummy object pointers */ src = test_ring_calloc(RING_SIZE * 2, esize[i]); @@ -390,7 +390,8 @@ test_ring_burst_bulk_tests2(unsigned int api_type) * Enqueue and dequeue to cover the entire ring length. */ static int -test_ring_burst_bulk_tests3(unsigned int api_type) +test_ring_burst_bulk_tests3(unsigned int api_type, unsigned int create_flags, + const char *tname) { struct rte_ring *r; void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL; @@ -398,12 +399,11 @@ test_ring_burst_bulk_tests3(unsigned int api_type) unsigned int i, j; for (i = 0; i < RTE_DIM(esize); i++) { - test_ring_print_test_string("Test standard ring", api_type, - esize[i]); + test_ring_print_test_string(tname, api_type, esize[i]); /* Create the ring */ r = test_ring_create("test_ring_burst_bulk_tests", esize[i], - RING_SIZE, SOCKET_ID_ANY, 0); + RING_SIZE, SOCKET_ID_ANY, create_flags); /* alloc dummy object pointers */ src = test_ring_calloc(RING_SIZE * 2, esize[i]); @@ -465,7 +465,8 @@ test_ring_burst_bulk_tests3(unsigned int api_type) * Enqueue till the ring is full and dequeue till the ring becomes empty. */ static int -test_ring_burst_bulk_tests4(unsigned int api_type) +test_ring_burst_bulk_tests4(unsigned int api_type, unsigned int create_flags, + const char *tname) { struct rte_ring *r; void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL; @@ -474,12 +475,11 @@ test_ring_burst_bulk_tests4(unsigned int api_type) unsigned int num_elems; for (i = 0; i < RTE_DIM(esize); i++) { - test_ring_print_test_string("Test standard ring", api_type, - esize[i]); + test_ring_print_test_string(tname, api_type, esize[i]); /* Create the ring */ r = test_ring_create("test_ring_burst_bulk_tests", esize[i], - RING_SIZE, SOCKET_ID_ANY, 0); + RING_SIZE, SOCKET_ID_ANY, create_flags); /* alloc dummy object pointers */ src = test_ring_calloc(RING_SIZE * 2, esize[i]); @@ -815,7 +815,23 @@ test_ring_with_exact_size(void) static int test_ring(void) { + int32_t rc; unsigned int i, j; + const char *tname; + + static const struct { + uint32_t create_flags; + const char *name; + } test_sync_modes[] = { + { + RING_F_MP_RTS_ENQ | RING_F_MC_RTS_DEQ, + "Test MT_RTS ring", + }, + { + RING_F_MP_HTS_ENQ | RING_F_MC_HTS_DEQ, + "Test MT_HTS ring", + }, + }; /* Negative test cases */ if (test_ring_negative_tests() < 0) @@ -832,30 +848,67 @@ test_ring(void) * The test cases are split into smaller test cases to * help clang compile faster. */ + tname = "Test standard ring"; + for (j = TEST_RING_ELEM_BULK; j <= TEST_RING_ELEM_BURST; j <<= 1) for (i = TEST_RING_THREAD_DEF; i <= TEST_RING_THREAD_MPMC; i <<= 1) - if (test_ring_burst_bulk_tests1(i | j) < 0) + if (test_ring_burst_bulk_tests1(i | j, 0, tname) < 0) goto test_fail; for (j = TEST_RING_ELEM_BULK; j <= TEST_RING_ELEM_BURST; j <<= 1) for (i = TEST_RING_THREAD_DEF; i <= TEST_RING_THREAD_MPMC; i <<= 1) - if (test_ring_burst_bulk_tests2(i | j) < 0) + if (test_ring_burst_bulk_tests2(i | j, 0, tname) < 0) goto test_fail; for (j = TEST_RING_ELEM_BULK; j <= TEST_RING_ELEM_BURST; j <<= 1) for (i = TEST_RING_THREAD_DEF; i <= TEST_RING_THREAD_MPMC; i <<= 1) - if (test_ring_burst_bulk_tests3(i | j) < 0) + if (test_ring_burst_bulk_tests3(i | j, 0, tname) < 0) goto test_fail; for (j = TEST_RING_ELEM_BULK; j <= TEST_RING_ELEM_BURST; j <<= 1) for (i = TEST_RING_THREAD_DEF; i <= TEST_RING_THREAD_MPMC; i <<= 1) - if (test_ring_burst_bulk_tests4(i | j) < 0) + if (test_ring_burst_bulk_tests4(i | j, 0, tname) < 0) + goto test_fail; + + /* Burst and bulk operations with MT_RTS and MT_HTS sync modes */ + for (i = 0; i != RTE_DIM(test_sync_modes); i++) { + for (j = TEST_RING_ELEM_BULK; j <= TEST_RING_ELEM_BURST; + j <<= 1) { + + rc = test_ring_burst_bulk_tests1( + TEST_RING_THREAD_DEF | j, + test_sync_modes[i].create_flags, + test_sync_modes[i].name); + if (rc < 0) + goto test_fail; + + rc = test_ring_burst_bulk_tests2( + TEST_RING_THREAD_DEF | j, + test_sync_modes[i].create_flags, + test_sync_modes[i].name); + if (rc < 0) goto test_fail; + rc = test_ring_burst_bulk_tests3( + TEST_RING_THREAD_DEF | j, + test_sync_modes[i].create_flags, + test_sync_modes[i].name); + if (rc < 0) + goto test_fail; + + rc = test_ring_burst_bulk_tests3( + TEST_RING_THREAD_DEF | j, + test_sync_modes[i].create_flags, + test_sync_modes[i].name); + if (rc < 0) + goto test_fail; + } + } + /* dump the ring status */ rte_ring_list_dump(stdout); From patchwork Mon Apr 20 12:28:31 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Ananyev, Konstantin" X-Patchwork-Id: 68961 X-Patchwork-Delegate: david.marchand@redhat.com Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id 77EB5A0561; Mon, 20 Apr 2020 14:31:05 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id BE4051D631; Mon, 20 Apr 2020 14:29:16 +0200 (CEST) Received: from mga17.intel.com (mga17.intel.com [192.55.52.151]) by dpdk.org (Postfix) with ESMTP id 532481D5E7 for ; Mon, 20 Apr 2020 14:29:07 +0200 (CEST) IronPort-SDR: I5VfeZHP5HOkKzxXnB7utOOxIVOdHinaaX1kNw3ZOunOSEQffvM2aKUUon/3hLnmsWXSY6RvfC NMLMUBcndtOA== X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga003.jf.intel.com ([10.7.209.27]) by fmsmga107.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 20 Apr 2020 05:29:06 -0700 IronPort-SDR: +dQbE5PAJBBKWSikhRA9IwrRd26748E5xgZejj0b5aeUyx9VV7LwLQ4BjZGyovEdVlaQNyLzOr 24GCyy/O2PVw== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.72,406,1580803200"; d="scan'208";a="254923936" Received: from sivswdev08.ir.intel.com ([10.237.217.47]) by orsmga003.jf.intel.com with ESMTP; 20 Apr 2020 05:29:05 -0700 From: Konstantin Ananyev To: dev@dpdk.org Cc: honnappa.nagarahalli@arm.com, david.marchand@redhat.com, jielong.zjl@antfin.com, Konstantin Ananyev Date: Mon, 20 Apr 2020 13:28:31 +0100 Message-Id: <20200420122831.16973-11-konstantin.ananyev@intel.com> X-Mailer: git-send-email 2.18.0 In-Reply-To: <20200420122831.16973-1-konstantin.ananyev@intel.com> References: <20200420121113.9327-1-konstantin.ananyev@intel.com> <20200420122831.16973-1-konstantin.ananyev@intel.com> Subject: [dpdk-dev] [PATCH v7 10/10] doc: update ring guide X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Changed the rte_ring chapter in programmer's guide to reflect the addition of new sync modes and peek style API. Signed-off-by: Konstantin Ananyev --- doc/guides/prog_guide/ring_lib.rst | 95 ++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/doc/guides/prog_guide/ring_lib.rst b/doc/guides/prog_guide/ring_lib.rst index 8cb2b2dd4..22c48e50d 100644 --- a/doc/guides/prog_guide/ring_lib.rst +++ b/doc/guides/prog_guide/ring_lib.rst @@ -349,6 +349,101 @@ even if only the first term of subtraction has overflowed: uint32_t entries = (prod_tail - cons_head); uint32_t free_entries = (mask + cons_tail -prod_head); +Producer/consumer synchronization modes +--------------------------------------- + +rte_ring supports different synchronization modes for porducer and consumers. +These modes can be specified at ring creation/init time via ``flags`` parameter. +That should help user to configure ring in way most suitable for his +specific usage scenarios. +Currently supported modes: + +MP/MC (default one) +~~~~~~~~~~~~~~~~~~~ + +Multi-producer (/multi-consumer) mode. This is a default enqueue (/dequeue) +mode for the ring. In this mode multiple threads can enqueue (/dequeue) +objects to (/from) the ring. For 'classic' DPDK deployments (with one thread +per core) this is usually most suitable and fastest synchronization mode. +As a well known limitaion - it can perform quite pure on some overcommitted +scenarios. + +SP/SC +~~~~~ +Single-producer (/single-consumer) mode. In this mode only one thread at a time +is allowed to enqueue (/dequeue) objects to (/from) the ring. + +MP_RTS/MC_RTS +~~~~~~~~~~~~~ + +Multi-producer (/multi-consumer) with Relaxed Tail Sync (RTS) mode. +The main difference from original MP/MC algorithm is that +tail value is increased not by every thread that finished enqueue/dequeue, +but only by the last one. +That allows threads to avoid spinning on ring tail value, +leaving actual tail value change to the last thread at a given instance. +That technique helps to avoid Lock-Waiter-Preemtion (LWP) problem on tail +update and improves average enqueue/dequeue times on overcommitted systems. +To achieve that RTS requires 2 64-bit CAS for each enqueue(/dequeue) operation: +one for head update, second for tail update. +In comparison original MP/MC algorithm requires one 32-bit CAS +for head update and waiting/spinning on tail value. + +MP_HTS/MC_HTS +~~~~~~~~~~~~~ + +Multi-producer (/multi-consumer) with Head/Tail Sync (HTS) mode. +In that mode enqueue/dequeue operation is fully serialized: +at any given moment only one enqueue/dequeue operation can proceed. +This is achieved by allowing a thread to proceed with changing ``head.value`` +only when ``head.value == tail.value``. +Both head and tail values are updated atomically (as one 64-bit value). +To achieve that 64-bit CAS is used by head update routine. +That technique also avoids Lock-Waiter-Preemtion (LWP) problem on tail +update and helps to improve ring enqueue/dequeue behavior in overcommitted +scenarios. Another advantage of fully serialized producer/consumer - +it provides ability to implement MT safe peek API for rte_ring. + + +Ring Peek API +------------- + +For ring with serialized producer/consumer (HTS sync mode) it is possible +to split public enqueue/dequeue API into two phases: + +* enqueue/dequeue start + +* enqueue/dequeue finish + +That allows user to inspect objects in the ring without removing them +from it (aka MT safe peek) and reserve space for the objects in the ring +before actual enqueue. +Note that this API is available only for two sync modes: + +* Single Producer/Single Consumer (SP/SC) + +* Multi-producer/Multi-consumer with Head/Tail Sync (HTS) + +It is a user responsibility to create/init ring with appropriate sync modes +selected. As an example of usage: + +.. code-block:: c + + /* read 1 elem from the ring: */ + uint32_t n = rte_ring_dequeue_bulk_start(ring, &obj, 1, NULL); + if (n != 0) { + /* examine object */ + if (object_examine(obj) == KEEP) + /* decided to keep it in the ring. */ + rte_ring_dequeue_finish(ring, 0); + else + /* decided to remove it from the ring. */ + rte_ring_dequeue_finish(ring, n); + } + +Note that between ``_start_`` and ``_finish_`` none other thread can proceed +with enqueue(/dequeue) operation till ``_finish_`` completes. + References ----------