From patchwork Mon Feb 24 11:35:10 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Ananyev, Konstantin" X-Patchwork-Id: 66002 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id A4E38A0524; Mon, 24 Feb 2020 12:35:33 +0100 (CET) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id D7C5B1BFAC; Mon, 24 Feb 2020 12:35:25 +0100 (CET) Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id 5F5FA1BFA5 for ; Mon, 24 Feb 2020 12:35:23 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga005.fm.intel.com ([10.253.24.32]) by orsmga101.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 24 Feb 2020 03:35:22 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.70,480,1574150400"; d="scan'208";a="435878528" Received: from sivswdev08.ir.intel.com ([10.237.217.47]) by fmsmga005.fm.intel.com with ESMTP; 24 Feb 2020 03:35:19 -0800 From: Konstantin Ananyev To: dev@dpdk.org Cc: olivier.matz@6wind.com, Konstantin Ananyev Date: Mon, 24 Feb 2020 11:35:10 +0000 Message-Id: <20200224113515.1744-2-konstantin.ananyev@intel.com> X-Mailer: git-send-email 2.18.0 In-Reply-To: <20200224113515.1744-1-konstantin.ananyev@intel.com> References: <20200224113515.1744-1-konstantin.ananyev@intel.com> Subject: [dpdk-dev] [RFC 1/6] test/ring: add contention stress test X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Introduce new test-case to measure ring perfomance under contention (miltiple producers/consumers). Starts dequeue/enqueue loop on all available slave lcores. Signed-off-by: Konstantin Ananyev --- app/test/Makefile | 1 + app/test/meson.build | 1 + app/test/test_ring_stress.c | 27 ++ app/test/test_ring_stress.h | 477 ++++++++++++++++++++++++++++++++++++ 4 files changed, 506 insertions(+) create mode 100644 app/test/test_ring_stress.c create mode 100644 app/test/test_ring_stress.h diff --git a/app/test/Makefile b/app/test/Makefile index 1f080d162..4f586d95f 100644 --- a/app/test/Makefile +++ b/app/test/Makefile @@ -78,6 +78,7 @@ SRCS-y += test_rand_perf.c SRCS-y += test_ring.c SRCS-y += test_ring_perf.c +SRCS-y += test_ring_stress.c SRCS-y += test_pmd_perf.c ifeq ($(CONFIG_RTE_LIBRTE_TABLE),y) diff --git a/app/test/meson.build b/app/test/meson.build index 0a2ce710f..84dde28ad 100644 --- a/app/test/meson.build +++ b/app/test/meson.build @@ -101,6 +101,7 @@ test_sources = files('commands.c', 'test_rib6.c', 'test_ring.c', 'test_ring_perf.c', + 'test_ring_stress.c', 'test_rwlock.c', 'test_sched.c', 'test_service_cores.c', diff --git a/app/test/test_ring_stress.c b/app/test/test_ring_stress.c new file mode 100644 index 000000000..5689e06c8 --- /dev/null +++ b/app/test/test_ring_stress.c @@ -0,0 +1,27 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2020 Intel Corporation + */ + +#include "test_ring_stress.h" + +static inline uint32_t +_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n, + uint32_t *avail) +{ + return rte_ring_mc_dequeue_bulk(r, obj, n, avail); +} + +static inline uint32_t +_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n, + uint32_t *free) +{ + return rte_ring_mp_enqueue_bulk(r, obj, n, free); +} + +static int +_st_ring_init(struct rte_ring *r, const char *name, uint32_t num) +{ + return rte_ring_init(r, name, num, 0); +} + +REGISTER_TEST_COMMAND(ring_stress_autotest, test_ring_stress); diff --git a/app/test/test_ring_stress.h b/app/test/test_ring_stress.h new file mode 100644 index 000000000..c6f0bc9f1 --- /dev/null +++ b/app/test/test_ring_stress.h @@ -0,0 +1,477 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2020 Intel Corporation + */ + + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "test.h" + +/* + * Measures performance of ring enqueue/dequeue under high contention + */ + +#define RING_NAME "RING_STRESS" +#define BULK_NUM 32 +#define RING_SIZE (2 * BULK_NUM * RTE_MAX_LCORE) + +enum { + WRK_CMD_STOP, + WRK_CMD_RUN, +}; + +static volatile uint32_t wrk_cmd __rte_cache_aligned; + +/* test run-time in seconds */ +static const uint32_t run_time = 60; + +struct lcore_stat { + uint64_t nb_cycle; + struct { + uint64_t nb_call; + uint64_t nb_obj; + uint64_t nb_cycle; + uint64_t max_cycle; + uint64_t min_cycle; + } op; +}; + +struct lcore_arg { + struct rte_ring *rng; + struct lcore_stat stats; +} __rte_cache_aligned; + +struct ring_elem { + uint32_t cnt[RTE_CACHE_LINE_SIZE / sizeof(uint32_t)]; +} __rte_cache_aligned; + +/* + * redefinable functions + */ +static uint32_t +_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n, + uint32_t *avail); + +static uint32_t +_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n, + uint32_t *free); + +static int +_st_ring_init(struct rte_ring *r, const char *name, uint32_t num); + + +static void +lcore_stat_update(struct lcore_stat *ls, uint64_t call, uint64_t obj, + uint64_t tm, int32_t prcs) +{ + ls->op.nb_call += call; + ls->op.nb_obj += obj; + ls->op.nb_cycle += tm; + if (prcs) { + ls->op.max_cycle = RTE_MAX(ls->op.max_cycle, tm); + ls->op.min_cycle = RTE_MIN(ls->op.min_cycle, tm); + } +} + +static void +lcore_op_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls) +{ + + ms->op.nb_call += ls->op.nb_call; + ms->op.nb_obj += ls->op.nb_obj; + ms->op.nb_cycle += ls->op.nb_cycle; + ms->op.max_cycle = RTE_MAX(ms->op.max_cycle, ls->op.max_cycle); + ms->op.min_cycle = RTE_MIN(ms->op.min_cycle, ls->op.min_cycle); +} + +static void +lcore_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls) +{ + ms->nb_cycle = RTE_MAX(ms->nb_cycle, ls->nb_cycle); + lcore_op_stat_aggr(ms, ls); +} + +static void +lcore_stat_dump(FILE *f, uint32_t lc, const struct lcore_stat *ls) +{ + long double st; + + st = (long double)rte_get_timer_hz() / US_PER_S; + + if (lc == UINT32_MAX) + fprintf(f, "%s(AGGREGATE)={\n", __func__); + else + fprintf(f, "%s(lc=%u)={\n", __func__, lc); + + fprintf(f, "\tnb_cycle=%" PRIu64 "(%.2Lf usec),\n", + ls->nb_cycle, (long double)ls->nb_cycle / st); + + fprintf(f, "\tDEQ+ENQ={\n"); + + fprintf(f, "\t\tnb_call=%" PRIu64 ",\n", ls->op.nb_call); + fprintf(f, "\t\tnb_obj=%" PRIu64 ",\n", ls->op.nb_obj); + fprintf(f, "\t\tnb_cycle=%" PRIu64 ",\n", ls->op.nb_cycle); + fprintf(f, "\t\tobj/call(avg): %.2Lf\n", + (long double)ls->op.nb_obj / ls->op.nb_call); + fprintf(f, "\t\tcycles/obj(avg): %.2Lf\n", + (long double)ls->op.nb_cycle / ls->op.nb_obj); + fprintf(f, "\t\tcycles/call(avg): %.2Lf\n", + (long double)ls->op.nb_cycle / ls->op.nb_call); + + /* if min/max cycles per call stats was collected */ + if (ls->op.min_cycle != UINT64_MAX) { + fprintf(f, "\t\tmax cycles/call=%" PRIu64 "(%.2Lf usec),\n", + ls->op.max_cycle, + (long double)ls->op.max_cycle / st); + fprintf(f, "\t\tmin cycles/call=%" PRIu64 "(%.2Lf usec),\n", + ls->op.min_cycle, + (long double)ls->op.min_cycle / st); + } + + fprintf(f, "\t},\n"); + fprintf(f, "};\n"); +} + +static void +fill_ring_elm(struct ring_elem *elm, uint32_t fill) +{ + uint32_t i; + + for (i = 0; i != RTE_DIM(elm->cnt); i++) + elm->cnt[i] = fill; +} + +static int32_t +check_updt_elem(struct ring_elem *elm[], uint32_t num, + const struct ring_elem *check, const struct ring_elem *fill) +{ + uint32_t i; + + static rte_spinlock_t dump_lock; + + for (i = 0; i != num; i++) { + if (memcmp(check, elm[i], sizeof(*check)) != 0) { + rte_spinlock_lock(&dump_lock); + printf("%s(lc=%u, num=%u) failed at %u-th iter, " + "offending object: %p\n", + __func__, rte_lcore_id(), num, i, elm[i]); + rte_memdump(stdout, "expected", check, sizeof(*check)); + rte_memdump(stdout, "result", elm[i], sizeof(elm[i])); + rte_spinlock_unlock(&dump_lock); + return -EINVAL; + } + memcpy(elm[i], fill, sizeof(*elm[i])); + } + + return 0; +} + +static int +check_ring_op(uint32_t exp, uint32_t res, uint32_t lc, + const char *fname, const char *opname) +{ + if (exp != res) { + printf("%s(lc=%u) failure: %s expected: %u, returned %u\n", + fname, lc, opname, exp, res); + return -ENOSPC; + } + return 0; +} + +static int +test_worker_prcs(void *arg) +{ + int32_t rc; + uint32_t lc, n, num; + uint64_t cl, tm0, tm1; + struct lcore_arg *la; + struct ring_elem def_elm, loc_elm; + struct ring_elem *obj[2 * BULK_NUM]; + + la = arg; + lc = rte_lcore_id(); + + fill_ring_elm(&def_elm, UINT32_MAX); + fill_ring_elm(&loc_elm, lc); + + while (wrk_cmd != WRK_CMD_RUN) { + rte_smp_rmb(); + rte_pause(); + } + + cl = rte_rdtsc_precise(); + + do { + /* num in interval [7/8, 11/8] of BULK_NUM */ + num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2); + + /* reset all pointer values */ + memset(obj, 0, sizeof(obj)); + + /* dequeue num elems */ + tm0 = rte_rdtsc_precise(); + n = _st_ring_dequeue_bulk(la->rng, (void **)obj, num, NULL); + tm0 = rte_rdtsc_precise() - tm0; + + /* check return value and objects */ + rc = check_ring_op(num, n, lc, __func__, + RTE_STR(_st_ring_dequeue_bulk)); + if (rc == 0) + rc = check_updt_elem(obj, num, &def_elm, &loc_elm); + if (rc != 0) + break; + + /* enqueue num elems */ + rte_compiler_barrier(); + rc = check_updt_elem(obj, num, &loc_elm, &def_elm); + if (rc != 0) + break; + + tm1 = rte_rdtsc_precise(); + n = _st_ring_enqueue_bulk(la->rng, (void **)obj, num, NULL); + tm1 = rte_rdtsc_precise() - tm1; + + /* check return value */ + rc = check_ring_op(num, n, lc, __func__, + RTE_STR(_st_ring_enqueue_bulk)); + if (rc != 0) + break; + + lcore_stat_update(&la->stats, 1, num, tm0 + tm1, 1); + + } while (wrk_cmd == WRK_CMD_RUN); + + la->stats.nb_cycle = rte_rdtsc_precise() - cl; + return rc; +} + +static int +test_worker_avg(void *arg) +{ + int32_t rc; + uint32_t lc, n, num; + uint64_t cl; + struct lcore_arg *la; + struct ring_elem def_elm, loc_elm; + struct ring_elem *obj[2 * BULK_NUM]; + + la = arg; + lc = rte_lcore_id(); + + fill_ring_elm(&def_elm, UINT32_MAX); + fill_ring_elm(&loc_elm, lc); + + while (wrk_cmd != WRK_CMD_RUN) { + rte_smp_rmb(); + rte_pause(); + } + + cl = rte_rdtsc_precise(); + + do { + /* num in interval [7/8, 11/8] of BULK_NUM */ + num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2); + + /* reset all pointer values */ + memset(obj, 0, sizeof(obj)); + + /* dequeue num elems */ + n = _st_ring_dequeue_bulk(la->rng, (void **)obj, num, NULL); + + /* check return value and objects */ + rc = check_ring_op(num, n, lc, __func__, + RTE_STR(_st_ring_dequeue_bulk)); + if (rc == 0) + rc = check_updt_elem(obj, num, &def_elm, &loc_elm); + if (rc != 0) + break; + + /* enqueue num elems */ + rte_compiler_barrier(); + rc = check_updt_elem(obj, num, &loc_elm, &def_elm); + if (rc != 0) + break; + + n = _st_ring_enqueue_bulk(la->rng, (void **)obj, num, NULL); + + /* check return value */ + rc = check_ring_op(num, n, lc, __func__, + RTE_STR(_st_ring_enqueue_bulk)); + if (rc != 0) + break; + + lcore_stat_update(&la->stats, 1, num, 0, 0); + + } while (wrk_cmd == WRK_CMD_RUN); + + /* final stats update */ + cl = rte_rdtsc_precise() - cl; + lcore_stat_update(&la->stats, 0, 0, cl, 0); + la->stats.nb_cycle = cl; + + return rc; +} + +static void +mt1_fini(struct rte_ring *rng, void *data) +{ + rte_free(rng); + rte_free(data); +} + +static int +mt1_init(struct rte_ring **rng, void **data, uint32_t num) +{ + int32_t rc; + size_t sz; + uint32_t i, nr; + struct rte_ring *r; + struct ring_elem *elm; + void *p; + + *rng = NULL; + *data = NULL; + + sz = num * sizeof(*elm); + elm = rte_zmalloc(NULL, sz, alignof(*elm)); + + /* alloc ring */ + nr = 2 * num; + sz = rte_ring_get_memsize(nr); + r = rte_zmalloc(NULL, sz, alignof(*r)); + if (r == NULL) { + printf("%s: alloca(%zu) for FIFO with %u elems failed", + __func__, sz, nr); + return -ENOMEM; + } + rc = _st_ring_init(r, RING_NAME, nr); + if (rc != 0) { + printf("%s: _st_ring_init(%p, %u) failed, error: %d(%s)\n", + __func__, r, nr, rc, strerror(-rc)); + return rc; + } + + for (i = 0; i != num; i++) { + fill_ring_elm(elm + i, UINT32_MAX); + p = elm + i; + if (_st_ring_enqueue_bulk(r, &p, 1, NULL) != 1) + break; + } + + if (i != num) { + printf("%s: _st_ring_enqueue(%p, %u) returned %u\n", + __func__, r, num, i); + return -ENOSPC; + } + + *rng = r; + *data = elm; + return 0; +} + +static int +test_mt1(int (*test)(void *)) +{ + int32_t rc; + uint32_t lc, mc; + struct rte_ring *r; + void *data; + struct lcore_arg arg[RTE_MAX_LCORE]; + + static const struct lcore_stat init_stat = { + .op.min_cycle = UINT64_MAX, + }; + + rc = mt1_init(&r, &data, RING_SIZE); + if (rc != 0) { + mt1_fini(r, data); + return rc; + } + + memset(arg, 0, sizeof(arg)); + + /* launch on all slaves */ + RTE_LCORE_FOREACH_SLAVE(lc) { + arg[lc].rng = r; + arg[lc].stats = init_stat; + rte_eal_remote_launch(test, &arg[lc], lc); + } + + /* signal worker to start test */ + wrk_cmd = WRK_CMD_RUN; + rte_smp_wmb(); + + usleep(run_time * US_PER_S); + + /* signal worker to start test */ + wrk_cmd = WRK_CMD_STOP; + rte_smp_wmb(); + + /* wait for slaves and collect stats. */ + mc = rte_lcore_id(); + arg[mc].stats = init_stat; + + rc = 0; + RTE_LCORE_FOREACH_SLAVE(lc) { + rc |= rte_eal_wait_lcore(lc); + lcore_stat_dump(stdout, lc, &arg[lc].stats); + lcore_stat_aggr(&arg[mc].stats, &arg[lc].stats); + } + + lcore_stat_dump(stdout, UINT32_MAX, &arg[mc].stats); + mt1_fini(r, data); + return rc; +} + +static int +test_ring_stress(void) +{ + int32_t rc; + uint32_t i, k; + + const struct { + const char *name; + int (*func)(int (*)(void *)); + int (*wfunc)(void *arg); + } tests[] = { + { + .name = "MT-WRK_ENQ_DEQ-MST_NONE-PRCS", + .func = test_mt1, + .wfunc = test_worker_prcs, + }, + { + .name = "MT-WRK_ENQ_DEQ-MST_NONE-AVG", + .func = test_mt1, + .wfunc = test_worker_avg, + }, + }; + + for (i = 0, k = 0; i != RTE_DIM(tests); i++) { + + printf("TEST %s START\n", tests[i].name); + + rc = tests[i].func(tests[i].wfunc); + k += (rc == 0); + + if (rc != 0) + printf("TEST-CASE %s FAILED\n", tests[i].name); + else + printf("TEST-CASE %s OK\n", tests[i].name); + } + + printf("Number of tests:\t%u\nSuccess:\t%u\nFailed:\t%u\n", + i, k, i - k); + return (k != i); +} From patchwork Mon Feb 24 11:35:11 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Ananyev, Konstantin" X-Patchwork-Id: 66003 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id 8AE60A0524; Mon, 24 Feb 2020 12:35:44 +0100 (CET) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 518F81BFC8; Mon, 24 Feb 2020 12:35:27 +0100 (CET) Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id D40861BFAA for ; Mon, 24 Feb 2020 12:35:23 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga005.fm.intel.com ([10.253.24.32]) by orsmga101.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 24 Feb 2020 03:35:23 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.70,480,1574150400"; d="scan'208";a="435878533" Received: from sivswdev08.ir.intel.com ([10.237.217.47]) by fmsmga005.fm.intel.com with ESMTP; 24 Feb 2020 03:35:21 -0800 From: Konstantin Ananyev To: dev@dpdk.org Cc: olivier.matz@6wind.com, Konstantin Ananyev Date: Mon, 24 Feb 2020 11:35:11 +0000 Message-Id: <20200224113515.1744-3-konstantin.ananyev@intel.com> X-Mailer: git-send-email 2.18.0 In-Reply-To: <20200224113515.1744-1-konstantin.ananyev@intel.com> References: <20200224113515.1744-1-konstantin.ananyev@intel.com> Subject: [dpdk-dev] [RFC 2/6] ring: rework ring layout to allow new sync schemes X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Change from *single* to *sync_type* to allow different synchronisation schemes to be applied. Change layout to make sure that *sync_type* and *tail* will always reside on same offsets. Signed-off-by: Konstantin Ananyev --- app/test/test_pdump.c | 6 +-- lib/librte_pdump/rte_pdump.c | 2 +- lib/librte_port/rte_port_ring.c | 12 +++--- lib/librte_ring/rte_ring.c | 6 ++- lib/librte_ring/rte_ring.h | 76 ++++++++++++++++++++++++--------- lib/librte_ring/rte_ring_elem.h | 8 ++-- 6 files changed, 71 insertions(+), 39 deletions(-) diff --git a/app/test/test_pdump.c b/app/test/test_pdump.c index ad183184c..6a1180bcb 100644 --- a/app/test/test_pdump.c +++ b/app/test/test_pdump.c @@ -57,8 +57,7 @@ run_pdump_client_tests(void) if (ret < 0) return -1; mp->flags = 0x0000; - ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(), - RING_F_SP_ENQ | RING_F_SC_DEQ); + ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(), 0); if (ring_client == NULL) { printf("rte_ring_create SR0 failed"); return -1; @@ -71,9 +70,6 @@ run_pdump_client_tests(void) } rte_eth_dev_probing_finish(eth_dev); - ring_client->prod.single = 0; - ring_client->cons.single = 0; - printf("\n***** flags = RTE_PDUMP_FLAG_TX *****\n"); for (itr = 0; itr < NUM_ITR; itr++) { diff --git a/lib/librte_pdump/rte_pdump.c b/lib/librte_pdump/rte_pdump.c index 8a01ac510..65364f2c5 100644 --- a/lib/librte_pdump/rte_pdump.c +++ b/lib/librte_pdump/rte_pdump.c @@ -380,7 +380,7 @@ pdump_validate_ring_mp(struct rte_ring *ring, struct rte_mempool *mp) rte_errno = EINVAL; return -1; } - if (ring->prod.single || ring->cons.single) { + if (rte_ring_prod_single(ring) || rte_ring_cons_single(ring)) { PDUMP_LOG(ERR, "ring with either SP or SC settings" " is not valid for pdump, should have MP and MC settings\n"); rte_errno = EINVAL; diff --git a/lib/librte_port/rte_port_ring.c b/lib/librte_port/rte_port_ring.c index 47fcdd06a..2f6c050fa 100644 --- a/lib/librte_port/rte_port_ring.c +++ b/lib/librte_port/rte_port_ring.c @@ -44,8 +44,8 @@ rte_port_ring_reader_create_internal(void *params, int socket_id, /* Check input parameters */ if ((conf == NULL) || (conf->ring == NULL) || - (conf->ring->cons.single && is_multi) || - (!(conf->ring->cons.single) && !is_multi)) { + (rte_ring_cons_single(conf->ring) && is_multi) || + (!rte_ring_cons_single(conf->ring) && !is_multi)) { RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__); return NULL; } @@ -171,8 +171,8 @@ rte_port_ring_writer_create_internal(void *params, int socket_id, /* Check input parameters */ if ((conf == NULL) || (conf->ring == NULL) || - (conf->ring->prod.single && is_multi) || - (!(conf->ring->prod.single) && !is_multi) || + (rte_ring_prod_single(conf->ring) && is_multi) || + (!rte_ring_prod_single(conf->ring) && !is_multi) || (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) { RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__); return NULL; @@ -440,8 +440,8 @@ rte_port_ring_writer_nodrop_create_internal(void *params, int socket_id, /* Check input parameters */ if ((conf == NULL) || (conf->ring == NULL) || - (conf->ring->prod.single && is_multi) || - (!(conf->ring->prod.single) && !is_multi) || + (rte_ring_prod_single(conf->ring) && is_multi) || + (!rte_ring_prod_single(conf->ring) && !is_multi) || (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) { RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__); return NULL; diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index 77e5de099..fa5733907 100644 --- a/lib/librte_ring/rte_ring.c +++ b/lib/librte_ring/rte_ring.c @@ -106,8 +106,10 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count, if (ret < 0 || ret >= (int)sizeof(r->name)) return -ENAMETOOLONG; r->flags = flags; - r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP; - r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC; + r->prod.sync_type = (flags & RING_F_SP_ENQ) ? + RTE_RING_SYNC_ST : RTE_RING_SYNC_MT; + r->cons.sync_type = (flags & RING_F_SC_DEQ) ? + RTE_RING_SYNC_ST : RTE_RING_SYNC_MT; if (flags & RING_F_EXACT_SZ) { r->size = rte_align32pow2(count + 1); diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index 18fc5d845..aa8c628eb 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -61,11 +61,22 @@ enum rte_ring_queue_behavior { #define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \ sizeof(RTE_RING_MZ_PREFIX) + 1) -/* structure to hold a pair of head/tail values and other metadata */ +/** prod/cons sync types */ +enum { + RTE_RING_SYNC_MT, /**< multi-thread safe (default mode) */ + RTE_RING_SYNC_ST, /**< single thread only */ +}; + +/** + * structure to hold a pair of head/tail values and other metadata. + * used by RTE_RING_SYNC_MT, RTE_RING_SYNC_ST sync types. + * Depending on sync_type format of that structure might be different, + * but offset for *sync_type* and *tail* values should remain the same. + */ struct rte_ring_headtail { - volatile uint32_t head; /**< Prod/consumer head. */ - volatile uint32_t tail; /**< Prod/consumer tail. */ - uint32_t single; /**< True if single prod/cons */ + uint32_t sync_type; /**< sync type of prod/cons */ + volatile uint32_t tail __rte_aligned(8); /**< prod/consumer tail. */ + volatile uint32_t head; /**< prod/consumer head. */ }; /** @@ -116,11 +127,10 @@ struct rte_ring { #define RING_F_EXACT_SZ 0x0004 #define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */ -/* @internal defines for passing to the enqueue dequeue worker functions */ -#define __IS_SP 1 -#define __IS_MP 0 -#define __IS_SC 1 -#define __IS_MC 0 +#define __IS_SP RTE_RING_SYNC_ST +#define __IS_MP RTE_RING_SYNC_MT +#define __IS_SC RTE_RING_SYNC_ST +#define __IS_MC RTE_RING_SYNC_MT /** * Calculate the memory size needed for a ring @@ -420,7 +430,7 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - __IS_MP, free_space); + RTE_RING_SYNC_MT, free_space); } /** @@ -443,7 +453,7 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - __IS_SP, free_space); + RTE_RING_SYNC_ST, free_space); } /** @@ -470,7 +480,7 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - r->prod.single, free_space); + r->prod.sync_type, free_space); } /** @@ -554,7 +564,7 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - __IS_MC, available); + RTE_RING_SYNC_MT, available); } /** @@ -578,7 +588,7 @@ rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - __IS_SC, available); + RTE_RING_SYNC_ST, available); } /** @@ -605,7 +615,7 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - r->cons.single, available); + r->cons.sync_type, available); } /** @@ -777,6 +787,30 @@ rte_ring_get_capacity(const struct rte_ring *r) return r->capacity; } +static inline uint32_t +rte_ring_get_prod_sync_type(const struct rte_ring *r) +{ + return r->prod.sync_type; +} + +static inline int +rte_ring_prod_single(const struct rte_ring *r) +{ + return (rte_ring_get_prod_sync_type(r) == RTE_RING_SYNC_ST); +} + +static inline uint32_t +rte_ring_get_cons_sync_type(const struct rte_ring *r) +{ + return r->cons.sync_type; +} + +static inline int +rte_ring_cons_single(const struct rte_ring *r) +{ + return (rte_ring_get_cons_sync_type(r) == RTE_RING_SYNC_ST); +} + /** * Dump the status of all rings on the console * @@ -820,7 +854,7 @@ rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space); + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, free_space); } /** @@ -843,7 +877,7 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space); + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, free_space); } /** @@ -870,7 +904,7 @@ rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE, - r->prod.single, free_space); + r->prod.sync_type, free_space); } /** @@ -898,7 +932,7 @@ rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, __IS_MC, available); + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, available); } /** @@ -923,7 +957,7 @@ rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, __IS_SC, available); + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, available); } /** @@ -951,7 +985,7 @@ rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, { return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE, - r->cons.single, available); + r->cons.sync_type, available); } #ifdef __cplusplus diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h index 3976757ed..ff7a28ea5 100644 --- a/lib/librte_ring/rte_ring_elem.h +++ b/lib/librte_ring/rte_ring_elem.h @@ -570,7 +570,7 @@ rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, r->prod.single, free_space); + RTE_RING_QUEUE_FIXED, r->prod.sync_type, free_space); } /** @@ -734,7 +734,7 @@ rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, r->cons.single, available); + RTE_RING_QUEUE_FIXED, r->cons.sync_type, available); } /** @@ -902,7 +902,7 @@ rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, r->prod.single, free_space); + RTE_RING_QUEUE_VARIABLE, r->prod.sync_type, free_space); } /** @@ -995,7 +995,7 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table, { return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, RTE_RING_QUEUE_VARIABLE, - r->cons.single, available); + r->cons.sync_type, available); } #ifdef __cplusplus From patchwork Mon Feb 24 11:35:12 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Ananyev, Konstantin" X-Patchwork-Id: 66004 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id 4650EA0524; Mon, 24 Feb 2020 12:35:58 +0100 (CET) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 39D3D1BFDB; Mon, 24 Feb 2020 12:35:29 +0100 (CET) Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id 398FD1BFB2 for ; Mon, 24 Feb 2020 12:35:26 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga005.fm.intel.com ([10.253.24.32]) by orsmga101.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 24 Feb 2020 03:35:25 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.70,480,1574150400"; d="scan'208";a="435878536" Received: from sivswdev08.ir.intel.com ([10.237.217.47]) by fmsmga005.fm.intel.com with ESMTP; 24 Feb 2020 03:35:22 -0800 From: Konstantin Ananyev To: dev@dpdk.org Cc: olivier.matz@6wind.com, Konstantin Ananyev Date: Mon, 24 Feb 2020 11:35:12 +0000 Message-Id: <20200224113515.1744-4-konstantin.ananyev@intel.com> X-Mailer: git-send-email 2.18.0 In-Reply-To: <20200224113515.1744-1-konstantin.ananyev@intel.com> References: <20200224113515.1744-1-konstantin.ananyev@intel.com> Subject: [dpdk-dev] [RFC 3/6] ring: introduce RTS ring mode X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Introduce relaxed tail sync (RTS) mode for MT ring synchronization. Aim to reduce stall times in case when ring is used on overcommited cpus (multiple active threads on the same cpu). The main difference from original MP/MC algorithm is that tail value is increased not by every thread that finished enqueue/dequeue, but only by the last one. That allows threads to avoid spinning on ring tail value, leaving actual tail value change to the last thread in the update queue. Signed-off-by: Konstantin Ananyev --- lib/librte_ring/Makefile | 3 +- lib/librte_ring/meson.build | 3 +- lib/librte_ring/rte_ring.c | 75 ++++++- lib/librte_ring/rte_ring.h | 300 +++++++++++++++++++++++-- lib/librte_ring/rte_ring_rts_generic.h | 240 ++++++++++++++++++++ 5 files changed, 598 insertions(+), 23 deletions(-) create mode 100644 lib/librte_ring/rte_ring_rts_generic.h diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index 917c560ad..4f90344f4 100644 --- a/lib/librte_ring/Makefile +++ b/lib/librte_ring/Makefile @@ -18,6 +18,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ rte_ring_elem.h \ rte_ring_generic.h \ - rte_ring_c11_mem.h + rte_ring_c11_mem.h \ + rte_ring_rts_generic.h include $(RTE_SDK)/mk/rte.lib.mk diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index f2f3ccc88..dc8d7dbea 100644 --- a/lib/librte_ring/meson.build +++ b/lib/librte_ring/meson.build @@ -5,7 +5,8 @@ sources = files('rte_ring.c') headers = files('rte_ring.h', 'rte_ring_elem.h', 'rte_ring_c11_mem.h', - 'rte_ring_generic.h') + 'rte_ring_generic.h', + 'rte_ring_rts_generic.h') # rte_ring_create_elem and rte_ring_get_memsize_elem are experimental allow_experimental_apis = true diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index fa5733907..1ce0af3e5 100644 --- a/lib/librte_ring/rte_ring.c +++ b/lib/librte_ring/rte_ring.c @@ -45,6 +45,9 @@ EAL_REGISTER_TAILQ(rte_ring_tailq) /* true if x is a power of 2 */ #define POWEROF2(x) ((((x)-1) & (x)) == 0) +/* by default set head/tail distance as 1/8 of ring capacity */ +#define HTD_MAX_DEF 8 + /* return the size of memory occupied by a ring */ ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count) @@ -82,8 +85,56 @@ rte_ring_get_memsize(unsigned int count) void rte_ring_reset(struct rte_ring *r) { - r->prod.head = r->cons.head = 0; - r->prod.tail = r->cons.tail = 0; + memset((void *)(uintptr_t)&r->prod.tail, 0, + offsetof(struct rte_ring, pad1) - + offsetof(struct rte_ring, prod.tail)); + memset((void *)(uintptr_t)&r->cons.tail, 0, + offsetof(struct rte_ring, pad2) - + offsetof(struct rte_ring, cons.tail)); +} + +/* + * helper function, calculates sync_type values for prod and cons + * based on input flags. Returns zero at success or negative + * errno value otherwise. + */ +static int +get_sync_type(uint32_t flags, uint32_t *prod_st, uint32_t *cons_st) +{ + static const uint32_t prod_st_flags = + (RING_F_SP_ENQ | RING_F_MP_RTS_ENQ); + static const uint32_t cons_st_flags = + (RING_F_SC_DEQ | RING_F_MC_RTS_DEQ); + + switch (flags & prod_st_flags) { + case 0: + *prod_st = RTE_RING_SYNC_MT; + break; + case RING_F_SP_ENQ: + *prod_st = RTE_RING_SYNC_ST; + break; + case RING_F_MP_RTS_ENQ: + *prod_st = RTE_RING_SYNC_MT_RTS; + break; + default: + return -EINVAL; + } + + switch (flags & cons_st_flags) { + case 0: + *cons_st = RTE_RING_SYNC_MT; + break; + case RING_F_SC_DEQ: + *cons_st = RTE_RING_SYNC_ST; + break; + case RING_F_MC_RTS_DEQ: + *cons_st = RTE_RING_SYNC_MT_RTS; + break; + default: + return -EINVAL; + } + + return 0; } int @@ -100,16 +151,20 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count, RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) & RTE_CACHE_LINE_MASK) != 0); + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) != + offsetof(struct rte_ring_rts_headtail, sync_type)); + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) != + offsetof(struct rte_ring_rts_headtail, tail.val.pos)); + /* init the ring structure */ memset(r, 0, sizeof(*r)); ret = strlcpy(r->name, name, sizeof(r->name)); if (ret < 0 || ret >= (int)sizeof(r->name)) return -ENAMETOOLONG; r->flags = flags; - r->prod.sync_type = (flags & RING_F_SP_ENQ) ? - RTE_RING_SYNC_ST : RTE_RING_SYNC_MT; - r->cons.sync_type = (flags & RING_F_SC_DEQ) ? - RTE_RING_SYNC_ST : RTE_RING_SYNC_MT; + ret = get_sync_type(flags, &r->prod.sync_type, &r->cons.sync_type); + if (ret != 0) + return ret; if (flags & RING_F_EXACT_SZ) { r->size = rte_align32pow2(count + 1); @@ -126,8 +181,12 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count, r->mask = count - 1; r->capacity = r->mask; } - r->prod.head = r->cons.head = 0; - r->prod.tail = r->cons.tail = 0; + + /* set default values for head-tail distance */ + if (flags & RING_F_MP_RTS_ENQ) + rte_ring_set_prod_htd_max(r, r->capacity / HTD_MAX_DEF); + if (flags & RING_F_MC_RTS_DEQ) + rte_ring_set_cons_htd_max(r, r->capacity / HTD_MAX_DEF); return 0; } diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index aa8c628eb..a130aeb9d 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -65,13 +65,15 @@ enum rte_ring_queue_behavior { enum { RTE_RING_SYNC_MT, /**< multi-thread safe (default mode) */ RTE_RING_SYNC_ST, /**< single thread only */ + RTE_RING_SYNC_MT_RTS, /**< multi-thread relaxed tail sync */ }; /** * structure to hold a pair of head/tail values and other metadata. * used by RTE_RING_SYNC_MT, RTE_RING_SYNC_ST sync types. - * Depending on sync_type format of that structure might be different, - * but offset for *sync_type* and *tail* values should remain the same. + * Depending on sync_type format of that structure might differ + * depending on the sync mechanism selelcted, but offsets for + * *sync_type* and *tail* values should always remain the same. */ struct rte_ring_headtail { uint32_t sync_type; /**< sync type of prod/cons */ @@ -79,6 +81,21 @@ struct rte_ring_headtail { volatile uint32_t head; /**< prod/consumer head. */ }; +union rte_ring_ht_poscnt { + uint64_t raw; + struct { + uint32_t pos; /**< head/tail position */ + uint32_t cnt; /**< head/tail reference counter */ + } val; +}; + +struct rte_ring_rts_headtail { + uint32_t sync_type; /**< sync type of prod/cons */ + uint32_t htd_max; /**< max allowed distance between head/tail */ + volatile union rte_ring_ht_poscnt tail; + volatile union rte_ring_ht_poscnt head; +}; + /** * An RTE ring structure. * @@ -106,11 +123,21 @@ struct rte_ring { char pad0 __rte_cache_aligned; /**< empty cache line */ /** Ring producer status. */ - struct rte_ring_headtail prod __rte_cache_aligned; + RTE_STD_C11 + union { + struct rte_ring_headtail prod; + struct rte_ring_rts_headtail rts_prod; + } __rte_cache_aligned; + char pad1 __rte_cache_aligned; /**< empty cache line */ /** Ring consumer status. */ - struct rte_ring_headtail cons __rte_cache_aligned; + RTE_STD_C11 + union { + struct rte_ring_headtail cons; + struct rte_ring_rts_headtail rts_cons; + } __rte_cache_aligned; + char pad2 __rte_cache_aligned; /**< empty cache line */ }; @@ -127,6 +154,9 @@ struct rte_ring { #define RING_F_EXACT_SZ 0x0004 #define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */ +#define RING_F_MP_RTS_ENQ 0x0008 /**< The default enqueue is "MP RTS". */ +#define RING_F_MC_RTS_DEQ 0x0010 /**< The default dequeue is "MC RTS". */ + #define __IS_SP RTE_RING_SYNC_ST #define __IS_MP RTE_RING_SYNC_MT #define __IS_SC RTE_RING_SYNC_ST @@ -407,6 +437,82 @@ __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, return n; } +#include + +/** + * @internal Enqueue several objects on the RTS ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_rts_enqueue(struct rte_ring *r, void * const *obj_table, + uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *free_space) +{ + uint32_t free, head; + + n = __rte_ring_rts_move_prod_head(r, n, behavior, &head, &free); + + if (n != 0) { + ENQUEUE_PTRS(r, &r[1], head, obj_table, n, void *); + __rte_ring_rts_update_tail(&r->rts_prod); + } + + if (free_space != NULL) + *free_space = free - n; + return n; +} + +/** + * @internal Dequeue several objects from the RTS ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_rts_dequeue(struct rte_ring *r, void **obj_table, + unsigned int n, enum rte_ring_queue_behavior behavior, + unsigned int *available) +{ + uint32_t entries, head; + + n = __rte_ring_rts_move_cons_head(r, n, behavior, &head, &entries); + + if (n != 0) { + DEQUEUE_PTRS(r, &r[1], head, obj_table, n, void *); + __rte_ring_rts_update_tail(&r->rts_cons); + } + + if (available != NULL) + *available = entries - n; + return n; +} + /** * Enqueue several objects on the ring (multi-producers safe). * @@ -456,6 +562,29 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, RTE_RING_SYNC_ST, free_space); } +/** + * Enqueue several objects on the RTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_rts_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_rts_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + free_space); +} + /** * Enqueue several objects on a ring. * @@ -479,8 +608,18 @@ static __rte_always_inline unsigned int rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - r->prod.sync_type, free_space); + switch (r->prod.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mp_enqueue_bulk(r, obj_table, n, free_space); + case RTE_RING_SYNC_ST: + return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space); + case RTE_RING_SYNC_MT_RTS: + return rte_ring_rts_enqueue_bulk(r, obj_table, n, free_space); + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + return 0; } /** @@ -591,6 +730,29 @@ rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, RTE_RING_SYNC_ST, available); } +/** + * Dequeue several objects from an RTS ring (multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_rts_dequeue_bulk(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return __rte_ring_do_rts_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + available); +} + /** * Dequeue several objects from a ring. * @@ -614,8 +776,18 @@ static __rte_always_inline unsigned int rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, - r->cons.sync_type, available); + switch (r->cons.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mc_dequeue_bulk(r, obj_table, n, available); + case RTE_RING_SYNC_ST: + return rte_ring_sc_dequeue_bulk(r, obj_table, n, available); + case RTE_RING_SYNC_MT_RTS: + return rte_ring_rts_dequeue_bulk(r, obj_table, n, available); + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + return 0; } /** @@ -811,6 +983,42 @@ rte_ring_cons_single(const struct rte_ring *r) return (rte_ring_get_cons_sync_type(r) == RTE_RING_SYNC_ST); } +static inline uint32_t +rte_ring_get_prod_htd_max(const struct rte_ring *r) +{ + if (r->prod.sync_type == RTE_RING_SYNC_MT_RTS) + return r->rts_prod.htd_max; + return UINT32_MAX; +} + +static inline int +rte_ring_set_prod_htd_max(struct rte_ring *r, uint32_t v) +{ + if (r->prod.sync_type != RTE_RING_SYNC_MT_RTS) + return -ENOTSUP; + + r->rts_prod.htd_max = v; + return 0; +} + +static inline uint32_t +rte_ring_get_cons_htd_max(const struct rte_ring *r) +{ + if (r->cons.sync_type == RTE_RING_SYNC_MT_RTS) + return r->rts_prod.htd_max; + return UINT32_MAX; +} + +static inline int +rte_ring_set_cons_htd_max(struct rte_ring *r, uint32_t v) +{ + if (r->cons.sync_type != RTE_RING_SYNC_MT_RTS) + return -ENOTSUP; + + r->rts_cons.htd_max = v; + return 0; +} + /** * Dump the status of all rings on the console * @@ -880,6 +1088,29 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, free_space); } +/** + * Enqueue several objects on the RTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +static __rte_always_inline unsigned +rte_ring_rts_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_rts_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, free_space); +} + /** * Enqueue several objects on a ring. * @@ -903,8 +1134,18 @@ static __rte_always_inline unsigned rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE, - r->prod.sync_type, free_space); + switch (r->prod.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mp_enqueue_burst(r, obj_table, n, free_space); + case RTE_RING_SYNC_ST: + return rte_ring_sp_enqueue_burst(r, obj_table, n, free_space); + case RTE_RING_SYNC_MT_RTS: + return rte_ring_rts_enqueue_burst(r, obj_table, n, free_space); + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + return 0; } /** @@ -960,6 +1201,30 @@ rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, available); } +/** + * Dequeue several objects from an RTS ring (multi-consumers safe). + * When the requested objects are more than the available objects, + * only dequeue the actual number of objects. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +static __rte_always_inline unsigned +rte_ring_rts_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return __rte_ring_do_rts_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, available); +} /** * Dequeue multiple objects from a ring up to a maximum number. * @@ -983,9 +1248,18 @@ static __rte_always_inline unsigned rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue(r, obj_table, n, - RTE_RING_QUEUE_VARIABLE, - r->cons.sync_type, available); + switch (r->cons.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mc_dequeue_burst(r, obj_table, n, available); + case RTE_RING_SYNC_ST: + return rte_ring_sc_dequeue_burst(r, obj_table, n, available); + case RTE_RING_SYNC_MT_RTS: + return rte_ring_rts_dequeue_burst(r, obj_table, n, available); + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + return 0; } #ifdef __cplusplus diff --git a/lib/librte_ring/rte_ring_rts_generic.h b/lib/librte_ring/rte_ring_rts_generic.h new file mode 100644 index 000000000..add8630b2 --- /dev/null +++ b/lib/librte_ring/rte_ring_rts_generic.h @@ -0,0 +1,240 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2017 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_RTS_GENERIC_H_ +#define _RTE_RING_RTS_GENERIC_H_ + +/** + * @file rte_ring_rts_generic.h + * It is not recommended to include this file directly, + * include instead. + * Contains internal helper functions for Relaxed Tail Sync (RTS) ring mode. + * The main idea remains the same as for our original MP/MC synchronization + * mechanism. + * The main difference is that tail value is increased not + * by every thread that finished enqueue/dequeue, + * but only by the last one doing enqueue/dequeue. + * That allows threads to skip spinning on tail value, + * leaving actual tail value change to last thread in the update queue. + * RTS requires 2 64-bit CAS for each enqueue(/dequeue) operation: + * one for head update, second for tail update. + * As a gain it allows thread to avoid spinning/waiting on tail value. + * In comparision original MP/MC algorithm requires one 32-bit CAS + * for head update and waiting/spinning on tail value. + * + * Brief outline: + * - introduce refcnt for both head and tail. + * - increment head.refcnt for each head.value update + * - write head:value and head:refcnt atomically (64-bit CAS) + * - move tail.value ahead only when tail.refcnt + 1 == head.refcnt + * - increment tail.refcnt when each enqueue/dequeue op finishes + * (no matter is tail:value going to change or not) + * - write tail.value and tail.recnt atomically (64-bit CAS) + * + * To avoid head/tail starvation: + * - limit max allowed distance between head and tail value (HTD_MAX). + * I.E. thread is allowed to proceed with changing head.value, + * only when: head.value - tail.value <= HTD_MAX + * HTD_MAX is an optional parameter. + * With HTD_MAX == 0 we'll have ring with fully synchronized + * head/tail (like HTS). + * With HTD_MAX == UINT32_MAX - no limitation. + * By default HTD_MAX == ring.capacity / 8. + */ + +/** + * @internal This function updates tail values. + */ +static __rte_always_inline void +__rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht) +{ + union rte_ring_ht_poscnt h, ot, nt; + + /* + * If there are other enqueues/dequeues in progress that + * might preceded us, then don't update tail with new value. + */ + + do { + ot.raw = ht->tail.raw; + rte_smp_rmb(); + + /* on 32-bit systems we have to do atomic read here */ + h.raw = rte_atomic64_read((rte_atomic64_t *) + (uintptr_t)&ht->head.raw); + + nt.raw = ot.raw; + if (++nt.val.cnt == h.val.cnt) + nt.val.pos = h.val.pos; + + } while (rte_atomic64_cmpset(&ht->tail.raw, ot.raw, nt.raw) == 0); +} + +/** + * @internal This function waits till head/tail distance wouldn't + * exceed pre-defined max value. + */ +static __rte_always_inline void +__rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht, + union rte_ring_ht_poscnt *h) +{ + uint32_t max; + + max = ht->htd_max; + h->raw = ht->head.raw; + rte_smp_rmb(); + + while (h->val.pos - ht->tail.val.pos > max) { + rte_pause(); + h->raw = ht->head.raw; + rte_smp_rmb(); + } +} + +/** + * @internal This function updates the producer head for enqueue. + * + * @param r + * A pointer to the ring structure + * @param is_sp + * Indicates whether multi-producer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where enqueue starts + * @param new_head + * Returns the current/new head value i.e. where enqueue finishes + * @param free_entries + * Returns the amount of free space in the ring BEFORE head was moved + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline uint32_t +__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num, + enum rte_ring_queue_behavior behavior, uint32_t *old_head, + uint32_t *free_entries) +{ + uint32_t n; + union rte_ring_ht_poscnt nh, oh; + + const uint32_t capacity = r->capacity; + + do { + /* Reset n to the initial burst count */ + n = num; + + /* read prod head (may spin on prod tail) */ + __rte_ring_rts_head_wait(&r->rts_prod, &oh); + + /* add rmb barrier to avoid load/load reorder in weak + * memory model. It is noop on x86 + */ + rte_smp_rmb(); + + /* + * The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * *old_head > cons_tail). So 'free_entries' is always between 0 + * and capacity (which is < size). + */ + *free_entries = capacity + r->cons.tail - oh.val.pos; + + /* check that we have enough room in ring */ + if (unlikely(n > *free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : *free_entries; + + if (n == 0) + return 0; + + nh.val.pos = oh.val.pos + n; + nh.val.cnt = oh.val.cnt + 1; + + } while (rte_atomic64_cmpset(&r->rts_prod.head.raw, + oh.raw, nh.raw) == 0); + + *old_head = oh.val.pos; + return n; +} + +/** + * @internal This function updates the consumer head for dequeue + * + * @param r + * A pointer to the ring structure + * @param is_sc + * Indicates whether multi-consumer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where dequeue starts + * @param new_head + * Returns the current/new head value i.e. where dequeue finishes + * @param entries + * Returns the number of entries in the ring BEFORE head was moved + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num, + enum rte_ring_queue_behavior behavior, uint32_t *old_head, + uint32_t *entries) +{ + uint32_t n; + union rte_ring_ht_poscnt nh, oh; + + /* move cons.head atomically */ + do { + /* Restore n as it may change every loop */ + n = num; + + /* read cons head (may spin on cons tail) */ + __rte_ring_rts_head_wait(&r->rts_cons, &oh); + + + /* add rmb barrier to avoid load/load reorder in weak + * memory model. It is noop on x86 + */ + rte_smp_rmb(); + + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. + */ + *entries = r->prod.tail - oh.val.pos; + + /* Set the actual entries for dequeue */ + if (n > *entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (unlikely(n == 0)) + return 0; + + nh.val.pos = oh.val.pos + n; + nh.val.cnt = oh.val.cnt + 1; + + } while (rte_atomic64_cmpset(&r->rts_cons.head.raw, + oh.raw, nh.raw) == 0); + + *old_head = oh.val.pos; + return n; +} + +#endif /* _RTE_RING_RTS_GENERIC_H_ */ From patchwork Mon Feb 24 11:35:13 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Ananyev, Konstantin" X-Patchwork-Id: 66005 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id E5DE4A0524; Mon, 24 Feb 2020 12:36:09 +0100 (CET) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id D87B81BFE2; Mon, 24 Feb 2020 12:35:30 +0100 (CET) Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id 4A3891BFB6 for ; Mon, 24 Feb 2020 12:35:27 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga005.fm.intel.com ([10.253.24.32]) by orsmga101.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 24 Feb 2020 03:35:26 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.70,480,1574150400"; d="scan'208";a="435878540" Received: from sivswdev08.ir.intel.com ([10.237.217.47]) by fmsmga005.fm.intel.com with ESMTP; 24 Feb 2020 03:35:24 -0800 From: Konstantin Ananyev To: dev@dpdk.org Cc: olivier.matz@6wind.com, Konstantin Ananyev Date: Mon, 24 Feb 2020 11:35:13 +0000 Message-Id: <20200224113515.1744-5-konstantin.ananyev@intel.com> X-Mailer: git-send-email 2.18.0 In-Reply-To: <20200224113515.1744-1-konstantin.ananyev@intel.com> References: <20200224113515.1744-1-konstantin.ananyev@intel.com> Subject: [dpdk-dev] [RFC 4/6] test/ring: add contention stress test for RTS ring X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Introduce new test case to test RTS ring mode under contention. Signed-off-by: Konstantin Ananyev --- app/test/Makefile | 1 + app/test/meson.build | 1 + app/test/test_ring_rts_stress.c | 28 ++++++++++++++++++++++++++++ 3 files changed, 30 insertions(+) create mode 100644 app/test/test_ring_rts_stress.c diff --git a/app/test/Makefile b/app/test/Makefile index 4f586d95f..d22b9f702 100644 --- a/app/test/Makefile +++ b/app/test/Makefile @@ -78,6 +78,7 @@ SRCS-y += test_rand_perf.c SRCS-y += test_ring.c SRCS-y += test_ring_perf.c +SRCS-y += test_ring_rts_stress.c SRCS-y += test_ring_stress.c SRCS-y += test_pmd_perf.c diff --git a/app/test/meson.build b/app/test/meson.build index 84dde28ad..fa4fb4b51 100644 --- a/app/test/meson.build +++ b/app/test/meson.build @@ -101,6 +101,7 @@ test_sources = files('commands.c', 'test_rib6.c', 'test_ring.c', 'test_ring_perf.c', + 'test_ring_rts_stress.c', 'test_ring_stress.c', 'test_rwlock.c', 'test_sched.c', diff --git a/app/test/test_ring_rts_stress.c b/app/test/test_ring_rts_stress.c new file mode 100644 index 000000000..525d222b2 --- /dev/null +++ b/app/test/test_ring_rts_stress.c @@ -0,0 +1,28 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2020 Intel Corporation + */ + +#include "test_ring_stress.h" + +static inline uint32_t +_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n, + uint32_t *avail) +{ + return rte_ring_rts_dequeue_bulk(r, obj, n, avail); +} + +static inline uint32_t +_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n, + uint32_t *free) +{ + return rte_ring_rts_enqueue_bulk(r, obj, n, free); +} + +static int +_st_ring_init(struct rte_ring *r, const char *name, uint32_t num) +{ + return rte_ring_init(r, name, num, + RING_F_MP_RTS_ENQ | RING_F_MC_RTS_DEQ); +} + +REGISTER_TEST_COMMAND(ring_stress_rts_autotest, test_ring_stress); From patchwork Mon Feb 24 11:35:14 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Ananyev, Konstantin" X-Patchwork-Id: 66006 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id 36FF0A0524; Mon, 24 Feb 2020 12:36:20 +0100 (CET) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id A4CE31BFCE; Mon, 24 Feb 2020 12:35:32 +0100 (CET) Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id 554991BFCC for ; Mon, 24 Feb 2020 12:35:28 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga005.fm.intel.com ([10.253.24.32]) by orsmga101.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 24 Feb 2020 03:35:27 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.70,480,1574150400"; d="scan'208";a="435878544" Received: from sivswdev08.ir.intel.com ([10.237.217.47]) by fmsmga005.fm.intel.com with ESMTP; 24 Feb 2020 03:35:25 -0800 From: Konstantin Ananyev To: dev@dpdk.org Cc: olivier.matz@6wind.com, Konstantin Ananyev Date: Mon, 24 Feb 2020 11:35:14 +0000 Message-Id: <20200224113515.1744-6-konstantin.ananyev@intel.com> X-Mailer: git-send-email 2.18.0 In-Reply-To: <20200224113515.1744-1-konstantin.ananyev@intel.com> References: <20200224113515.1744-1-konstantin.ananyev@intel.com> Subject: [dpdk-dev] [RFC 5/6] ring: introduce HTS ring mode X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Introduce head/tail sync mode for MT ring synchronization. In that mode enqueue/dequeue operation is fully serialized: only one thread at a time is allowed to perform given op. Suppose to reduce stall times in case when ring is used on overcommitted cpus (multiple active threads on the same cpu). As another enhancement provide ability to split enqueue/dequeue operation into two phases: - enqueue/dequeue start - enqueue/dequeue finish That allows user to inspect objects in the ring without removing them from it (aka MT safe peek). Signed-off-by: Konstantin Ananyev --- lib/librte_ring/Makefile | 1 + lib/librte_ring/meson.build | 1 + lib/librte_ring/rte_ring.c | 15 +- lib/librte_ring/rte_ring.h | 259 ++++++++++++++++++++++++- lib/librte_ring/rte_ring_hts_generic.h | 228 ++++++++++++++++++++++ 5 files changed, 500 insertions(+), 4 deletions(-) create mode 100644 lib/librte_ring/rte_ring_hts_generic.h diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index 4f90344f4..0c7f8f918 100644 --- a/lib/librte_ring/Makefile +++ b/lib/librte_ring/Makefile @@ -19,6 +19,7 @@ SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ rte_ring_elem.h \ rte_ring_generic.h \ rte_ring_c11_mem.h \ + rte_ring_hts_generic.h \ rte_ring_rts_generic.h include $(RTE_SDK)/mk/rte.lib.mk diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index dc8d7dbea..5aa673199 100644 --- a/lib/librte_ring/meson.build +++ b/lib/librte_ring/meson.build @@ -6,6 +6,7 @@ headers = files('rte_ring.h', 'rte_ring_elem.h', 'rte_ring_c11_mem.h', 'rte_ring_generic.h', + 'rte_ring_hts_generic.h', 'rte_ring_rts_generic.h') # rte_ring_create_elem and rte_ring_get_memsize_elem are experimental diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index 1ce0af3e5..d3b948667 100644 --- a/lib/librte_ring/rte_ring.c +++ b/lib/librte_ring/rte_ring.c @@ -102,9 +102,9 @@ static int get_sync_type(uint32_t flags, uint32_t *prod_st, uint32_t *cons_st) { static const uint32_t prod_st_flags = - (RING_F_SP_ENQ | RING_F_MP_RTS_ENQ); + (RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | RING_F_MP_HTS_ENQ); static const uint32_t cons_st_flags = - (RING_F_SC_DEQ | RING_F_MC_RTS_DEQ); + (RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | RING_F_MC_HTS_DEQ); switch (flags & prod_st_flags) { case 0: @@ -116,6 +116,9 @@ get_sync_type(uint32_t flags, uint32_t *prod_st, uint32_t *cons_st) case RING_F_MP_RTS_ENQ: *prod_st = RTE_RING_SYNC_MT_RTS; break; + case RING_F_MP_HTS_ENQ: + *prod_st = RTE_RING_SYNC_MT_HTS; + break; default: return -EINVAL; } @@ -130,6 +133,9 @@ get_sync_type(uint32_t flags, uint32_t *prod_st, uint32_t *cons_st) case RING_F_MC_RTS_DEQ: *cons_st = RTE_RING_SYNC_MT_RTS; break; + case RING_F_MC_HTS_DEQ: + *cons_st = RTE_RING_SYNC_MT_HTS; + break; default: return -EINVAL; } @@ -151,6 +157,11 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count, RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) & RTE_CACHE_LINE_MASK) != 0); + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) != + offsetof(struct rte_ring_hts_headtail, sync_type)); + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) != + offsetof(struct rte_ring_hts_headtail, ht.pos.tail)); + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) != offsetof(struct rte_ring_rts_headtail, sync_type)); RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) != diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index a130aeb9d..52edcea11 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -66,11 +66,11 @@ enum { RTE_RING_SYNC_MT, /**< multi-thread safe (default mode) */ RTE_RING_SYNC_ST, /**< single thread only */ RTE_RING_SYNC_MT_RTS, /**< multi-thread relaxed tail sync */ + RTE_RING_SYNC_MT_HTS, /**< multi-thread head/tail sync */ }; /** - * structure to hold a pair of head/tail values and other metadata. - * used by RTE_RING_SYNC_MT, RTE_RING_SYNC_ST sync types. + * Structure to hold a pair of head/tail values and other metadata. * Depending on sync_type format of that structure might differ * depending on the sync mechanism selelcted, but offsets for * *sync_type* and *tail* values should always remain the same. @@ -96,6 +96,19 @@ struct rte_ring_rts_headtail { volatile union rte_ring_ht_poscnt head; }; +union rte_ring_ht_pos { + uint64_t raw; + struct { + uint32_t tail; /**< tail position */ + uint32_t head; /**< head position */ + } pos; +}; + +struct rte_ring_hts_headtail { + uint32_t sync_type; /**< sync type of prod/cons */ + volatile union rte_ring_ht_pos ht __rte_aligned(8); +}; + /** * An RTE ring structure. * @@ -126,6 +139,7 @@ struct rte_ring { RTE_STD_C11 union { struct rte_ring_headtail prod; + struct rte_ring_hts_headtail hts_prod; struct rte_ring_rts_headtail rts_prod; } __rte_cache_aligned; @@ -135,6 +149,7 @@ struct rte_ring { RTE_STD_C11 union { struct rte_ring_headtail cons; + struct rte_ring_hts_headtail hts_cons; struct rte_ring_rts_headtail rts_cons; } __rte_cache_aligned; @@ -157,6 +172,9 @@ struct rte_ring { #define RING_F_MP_RTS_ENQ 0x0008 /**< The default enqueue is "MP RTS". */ #define RING_F_MC_RTS_DEQ 0x0010 /**< The default dequeue is "MC RTS". */ +#define RING_F_MP_HTS_ENQ 0x0020 /**< The default enqueue is "MP HTS". */ +#define RING_F_MC_HTS_DEQ 0x0040 /**< The default dequeue is "MC HTS". */ + #define __IS_SP RTE_RING_SYNC_ST #define __IS_MP RTE_RING_SYNC_MT #define __IS_SC RTE_RING_SYNC_ST @@ -513,6 +531,82 @@ __rte_ring_do_rts_dequeue(struct rte_ring *r, void **obj_table, return n; } +#include + +/** + * @internal Start to enqueue several objects on the HTS ring. + * Note that user has to call appropriate enqueue_finish() + * to complete given enqueue operation. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_hts_enqueue_start(struct rte_ring *r, void * const *obj_table, + uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *free_space) +{ + uint32_t free, head; + + n = __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free); + + if (n != 0) + ENQUEUE_PTRS(r, &r[1], head, obj_table, n, void *); + + if (free_space != NULL) + *free_space = free - n; + return n; +} + +/** + * @internal Start to dequeue several objects from the HTS ring. + * Note that user has to call appropriate dequeue_finish() + * to complete given dequeue operation. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_hts_dequeue_start(struct rte_ring *r, void **obj_table, + unsigned int n, enum rte_ring_queue_behavior behavior, + unsigned int *available) +{ + uint32_t entries, head; + + n = __rte_ring_hts_move_cons_head(r, n, behavior, &head, &entries); + + if (n != 0) + DEQUEUE_PTRS(r, &r[1], head, obj_table, n, void *); + + if (available != NULL) + *available = entries - n; + return n; +} + /** * Enqueue several objects on the ring (multi-producers safe). * @@ -585,6 +679,47 @@ rte_ring_rts_enqueue_bulk(struct rte_ring *r, void * const *obj_table, free_space); } +/** + * Start to enqueue several objects on the HTS ring (multi-producers safe). + * Note that user has to call appropriate dequeue_finish() + * to complete given dequeue operation. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_hts_enqueue_bulk_start(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_hts_enqueue_start(r, obj_table, n, + RTE_RING_QUEUE_FIXED, free_space); +} + +static __rte_always_inline void +rte_ring_hts_enqueue_finish(struct rte_ring *r, unsigned int n) +{ + __rte_ring_hts_update_tail(&r->hts_prod, n, 1); +} + +static __rte_always_inline unsigned int +rte_ring_hts_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + n = rte_ring_hts_enqueue_bulk_start(r, obj_table, n, free_space); + if (n != 0) + rte_ring_hts_enqueue_finish(r, n); + return n; +} + /** * Enqueue several objects on a ring. * @@ -615,6 +750,8 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space); case RTE_RING_SYNC_MT_RTS: return rte_ring_rts_enqueue_bulk(r, obj_table, n, free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_hts_enqueue_bulk(r, obj_table, n, free_space); } /* valid ring should never reach this point */ @@ -753,6 +890,47 @@ rte_ring_rts_dequeue_bulk(struct rte_ring *r, void **obj_table, available); } +/** + * Start to dequeue several objects from an HTS ring (multi-consumers safe). + * Note that user has to call appropriate dequeue_finish() + * to complete given dequeue operation. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_hts_dequeue_bulk_start(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return __rte_ring_do_hts_dequeue_start(r, obj_table, n, + RTE_RING_QUEUE_FIXED, available); +} + +static __rte_always_inline void +rte_ring_hts_dequeue_finish(struct rte_ring *r, unsigned int n) +{ + __rte_ring_hts_update_tail(&r->hts_cons, n, 0); +} + +static __rte_always_inline unsigned int +rte_ring_hts_dequeue_bulk(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + n = rte_ring_hts_dequeue_bulk_start(r, obj_table, n, available); + if (n != 0) + rte_ring_hts_dequeue_finish(r, n); + return n; +} + /** * Dequeue several objects from a ring. * @@ -783,6 +961,8 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, return rte_ring_sc_dequeue_bulk(r, obj_table, n, available); case RTE_RING_SYNC_MT_RTS: return rte_ring_rts_dequeue_bulk(r, obj_table, n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_hts_dequeue_bulk(r, obj_table, n, available); } /* valid ring should never reach this point */ @@ -1111,6 +1291,41 @@ rte_ring_rts_enqueue_burst(struct rte_ring *r, void * const *obj_table, RTE_RING_QUEUE_VARIABLE, free_space); } +/** + * Start to enqueue several objects on the HTS ring (multi-producers safe). + * Note that user has to call appropriate dequeue_finish() + * to complete given dequeue operation. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_hts_enqueue_burst_start(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_hts_enqueue_start(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, free_space); +} + +static __rte_always_inline unsigned int +rte_ring_hts_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + n = rte_ring_hts_enqueue_burst_start(r, obj_table, n, free_space); + if (n != 0) + rte_ring_hts_enqueue_finish(r, n); + return n; +} + /** * Enqueue several objects on a ring. * @@ -1141,6 +1356,8 @@ rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, return rte_ring_sp_enqueue_burst(r, obj_table, n, free_space); case RTE_RING_SYNC_MT_RTS: return rte_ring_rts_enqueue_burst(r, obj_table, n, free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_hts_enqueue_burst(r, obj_table, n, free_space); } /* valid ring should never reach this point */ @@ -1225,6 +1442,42 @@ rte_ring_rts_dequeue_burst(struct rte_ring *r, void **obj_table, return __rte_ring_do_rts_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE, available); } + +/** + * Start to dequeue several objects from an HTS ring (multi-consumers safe). + * Note that user has to call appropriate dequeue_finish() + * to complete given dequeue operation. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_hts_dequeue_burst_start(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return __rte_ring_do_hts_dequeue_start(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, available); +} + +static __rte_always_inline unsigned int +rte_ring_hts_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + n = rte_ring_hts_dequeue_burst_start(r, obj_table, n, available); + if (n != 0) + rte_ring_hts_dequeue_finish(r, n); + return n; +} + /** * Dequeue multiple objects from a ring up to a maximum number. * @@ -1255,6 +1508,8 @@ rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, return rte_ring_sc_dequeue_burst(r, obj_table, n, available); case RTE_RING_SYNC_MT_RTS: return rte_ring_rts_dequeue_burst(r, obj_table, n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_hts_dequeue_burst(r, obj_table, n, available); } /* valid ring should never reach this point */ diff --git a/lib/librte_ring/rte_ring_hts_generic.h b/lib/librte_ring/rte_ring_hts_generic.h new file mode 100644 index 000000000..7e447e30b --- /dev/null +++ b/lib/librte_ring/rte_ring_hts_generic.h @@ -0,0 +1,228 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2020 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_HTS_GENERIC_H_ +#define _RTE_RING_HTS_GENERIC_H_ + +/** + * @file rte_ring_hts_generic.h + * It is not recommended to include this file directly, + * include instead. + * Contains internal helper functions for head/tail sync (HTS) ring mode. + * In that mode enqueue/dequeue operation is fully serialized: + * only one thread at a time is allowed to perform given op. + * This is achieved by thread is allowed to proceed with changing head.value + * only when head.value == tail.value. + * Both head and tail values are updated atomically (as one 64-bit value). + * As another enhancement that provides ability to split enqueue/dequeue + * operation into two phases: + * - enqueue/dequeue start + * - enqueue/dequeue finish + * That allows user to inspect objects in the ring without removing + * them from it (aka MT safe peek). + * As an example: + * // read 1 elem from the ring: + * n = rte_ring_hts_dequeue_bulk_start(ring, &obj, 1, NULL); + * if (n != 0) { + * //examined object + * if (object_examine(obj) == KEEP) + * //decided to keep it in the ring. + * rte_ring_hts_dequeue_finish(ring, 0); + * else + * //decided to remove it in the ring. + * rte_ring_hts_dequeue_finish(ring, n); + * } + * Note that between _start_ and _finish_ the ring is sort of locked - + * none other thread can proceed with enqueue(/dequeue) operation till + * _finish_ will complete. + */ + +static __rte_always_inline void +__rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t num, + uint32_t enqueue) +{ + uint32_t n; + union rte_ring_ht_pos p; + + if (enqueue) + rte_smp_wmb(); + else + rte_smp_rmb(); + + p.raw = rte_atomic64_read((rte_atomic64_t *)(uintptr_t)&ht->ht.raw); + + n = p.pos.head - p.pos.tail; + RTE_ASSERT(n >= num); + RTE_SET_USED(n); + + p.pos.head = p.pos.tail + num; + p.pos.tail = p.pos.head; + + rte_atomic64_set((rte_atomic64_t *)(uintptr_t)&ht->ht.raw, p.raw); +} + +/** + * @internal waits till tail will become equal to head. + * Means no writer/reader is active for that ring. + * Suppose to work as serialization point. + */ +static __rte_always_inline void +__rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht, + union rte_ring_ht_pos *p) +{ + p->raw = rte_atomic64_read((rte_atomic64_t *) + (uintptr_t)&ht->ht.raw); + + while (p->pos.head != p->pos.tail) { + rte_pause(); + p->raw = rte_atomic64_read((rte_atomic64_t *) + (uintptr_t)&ht->ht.raw); + } +} + +/** + * @internal This function updates the producer head for enqueue + * + * @param r + * A pointer to the ring structure + * @param is_sp + * Indicates whether multi-producer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where enqueue starts + * @param new_head + * Returns the current/new head value i.e. where enqueue finishes + * @param free_entries + * Returns the amount of free space in the ring BEFORE head was moved + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num, + enum rte_ring_queue_behavior behavior, uint32_t *old_head, + uint32_t *free_entries) +{ + uint32_t n; + union rte_ring_ht_pos np, op; + + const uint32_t capacity = r->capacity; + + do { + /* Reset n to the initial burst count */ + n = num; + + /* wait for tail to be equal to head */ + __rte_ring_hts_head_wait(&r->hts_prod, &op); + + /* add rmb barrier to avoid load/load reorder in weak + * memory model. It is noop on x86 + */ + rte_smp_rmb(); + + /* + * The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * *old_head > cons_tail). So 'free_entries' is always between 0 + * and capacity (which is < size). + */ + *free_entries = capacity + r->cons.tail - op.pos.head; + + /* check that we have enough room in ring */ + if (unlikely(n > *free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : *free_entries; + + if (n == 0) + return 0; + + np.pos.tail = op.pos.tail; + np.pos.head = op.pos.head + n; + + } while (rte_atomic64_cmpset(&r->hts_prod.ht.raw, + op.raw, np.raw) == 0); + + *old_head = op.pos.head; + return n; +} + +/** + * @internal This function updates the consumer head for dequeue + * + * @param r + * A pointer to the ring structure + * @param is_sc + * Indicates whether multi-consumer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where dequeue starts + * @param new_head + * Returns the current/new head value i.e. where dequeue finishes + * @param entries + * Returns the number of entries in the ring BEFORE head was moved + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num, + enum rte_ring_queue_behavior behavior, uint32_t *old_head, + uint32_t *entries) +{ + uint32_t n; + union rte_ring_ht_pos np, op; + + /* move cons.head atomically */ + do { + /* Restore n as it may change every loop */ + n = num; + + /* wait for tail to be equal to head */ + __rte_ring_hts_head_wait(&r->hts_cons, &op); + + /* add rmb barrier to avoid load/load reorder in weak + * memory model. It is noop on x86 + */ + rte_smp_rmb(); + + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. + */ + *entries = r->prod.tail - op.pos.head; + + /* Set the actual entries for dequeue */ + if (n > *entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (unlikely(n == 0)) + return 0; + + np.pos.tail = op.pos.tail; + np.pos.head = op.pos.head + n; + + } while (rte_atomic64_cmpset(&r->hts_cons.ht.raw, + op.raw, np.raw) == 0); + + *old_head = op.pos.head; + return n; +} + +#endif /* _RTE_RING_HTS_GENERIC_H_ */ From patchwork Mon Feb 24 11:35:15 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Ananyev, Konstantin" X-Patchwork-Id: 66007 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id CE928A0524; Mon, 24 Feb 2020 12:36:28 +0100 (CET) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id DE54F1BFF3; Mon, 24 Feb 2020 12:35:33 +0100 (CET) Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id 9A0621BFE0 for ; Mon, 24 Feb 2020 12:35:29 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga005.fm.intel.com ([10.253.24.32]) by orsmga101.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 24 Feb 2020 03:35:29 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.70,480,1574150400"; d="scan'208";a="435878550" Received: from sivswdev08.ir.intel.com ([10.237.217.47]) by fmsmga005.fm.intel.com with ESMTP; 24 Feb 2020 03:35:26 -0800 From: Konstantin Ananyev To: dev@dpdk.org Cc: olivier.matz@6wind.com, Konstantin Ananyev Date: Mon, 24 Feb 2020 11:35:15 +0000 Message-Id: <20200224113515.1744-7-konstantin.ananyev@intel.com> X-Mailer: git-send-email 2.18.0 In-Reply-To: <20200224113515.1744-1-konstantin.ananyev@intel.com> References: <20200224113515.1744-1-konstantin.ananyev@intel.com> Subject: [dpdk-dev] [RFC 6/6] test/ring: add contention stress test for HTS ring X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Introduce new test case to test HTS ring mode under contention. Signed-off-by: Konstantin Ananyev --- app/test/Makefile | 1 + app/test/meson.build | 1 + app/test/test_ring_hts_stress.c | 28 ++++++++++++++++++++++++++++ 3 files changed, 30 insertions(+) create mode 100644 app/test/test_ring_hts_stress.c diff --git a/app/test/Makefile b/app/test/Makefile index d22b9f702..ff151cd55 100644 --- a/app/test/Makefile +++ b/app/test/Makefile @@ -77,6 +77,7 @@ SRCS-y += test_external_mem.c SRCS-y += test_rand_perf.c SRCS-y += test_ring.c +SRCS-y += test_ring_hts_stress.c SRCS-y += test_ring_perf.c SRCS-y += test_ring_rts_stress.c SRCS-y += test_ring_stress.c diff --git a/app/test/meson.build b/app/test/meson.build index fa4fb4b51..7e58fa999 100644 --- a/app/test/meson.build +++ b/app/test/meson.build @@ -100,6 +100,7 @@ test_sources = files('commands.c', 'test_rib.c', 'test_rib6.c', 'test_ring.c', + 'test_ring_hts_stress.c', 'test_ring_perf.c', 'test_ring_rts_stress.c', 'test_ring_stress.c', diff --git a/app/test/test_ring_hts_stress.c b/app/test/test_ring_hts_stress.c new file mode 100644 index 000000000..b7f2d21fc --- /dev/null +++ b/app/test/test_ring_hts_stress.c @@ -0,0 +1,28 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2020 Intel Corporation + */ + +#include "test_ring_stress.h" + +static inline uint32_t +_st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n, + uint32_t *avail) +{ + return rte_ring_hts_dequeue_bulk(r, obj, n, avail); +} + +static inline uint32_t +_st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n, + uint32_t *free) +{ + return rte_ring_hts_enqueue_bulk(r, obj, n, free); +} + +static int +_st_ring_init(struct rte_ring *r, const char *name, uint32_t num) +{ + return rte_ring_init(r, name, num, + RING_F_MP_HTS_ENQ | RING_F_MC_HTS_DEQ); +} + +REGISTER_TEST_COMMAND(ring_stress_hts_autotest, test_ring_stress);