From patchwork Wed Sep 27 15:08:53 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Paul Szczepanek X-Patchwork-Id: 132027 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 758D242653; Wed, 27 Sep 2023 17:09:54 +0200 (CEST) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 0BFA4406B7; Wed, 27 Sep 2023 17:09:50 +0200 (CEST) Received: from foss.arm.com (foss.arm.com [217.140.110.172]) by mails.dpdk.org (Postfix) with ESMTP id 755F240271 for ; Wed, 27 Sep 2023 17:09:47 +0200 (CEST) Received: from usa-sjc-imap-foss1.foss.arm.com (unknown [10.121.207.14]) by usa-sjc-mx-foss1.foss.arm.com (Postfix) with ESMTP id 192FDDA7; Wed, 27 Sep 2023 08:10:25 -0700 (PDT) Received: from ampere-altra-2-1.usa.Arm.com (ampere-altra-2-1.usa.arm.com [10.118.91.158]) by usa-sjc-imap-foss1.foss.arm.com (Postfix) with ESMTPA id E4FE93F7C5; Wed, 27 Sep 2023 08:09:46 -0700 (PDT) From: Paul Szczepanek To: dev@dpdk.org Cc: Paul Szczepanek , Honnappa Nagarahalli , Kamalakshitha Aligeri Subject: [RFC 1/2] eal: add pointer compression functions Date: Wed, 27 Sep 2023 15:08:53 +0000 Message-Id: <20230927150854.3670391-2-paul.szczepanek@arm.com> X-Mailer: git-send-email 2.25.1 In-Reply-To: <20230927150854.3670391-1-paul.szczepanek@arm.com> References: <20230927150854.3670391-1-paul.szczepanek@arm.com> MIME-Version: 1.0 X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Add a new utility header for compressing pointers. Pointers are compressed by taking advantage of their locality. Instead of storing the full address only an offset from a known base is stored. The provided functions can store pointers in 32bit offsets. Suggested-by: Honnappa Nagarahalli Signed-off-by: Paul Szczepanek Signed-off-by: Kamalakshitha Aligeri Reviewed-by: Honnappa Nagarahalli --- .mailmap | 1 + lib/eal/include/meson.build | 1 + lib/eal/include/rte_ptr_compress.h | 158 +++++++++++++++++++++++++++++ 3 files changed, 160 insertions(+) create mode 100644 lib/eal/include/rte_ptr_compress.h diff --git a/.mailmap b/.mailmap index 864d33ee46..3f0c9d32f5 100644 --- a/.mailmap +++ b/.mailmap @@ -1058,6 +1058,7 @@ Paul Greenwalt Paulis Gributs Paul Luse Paul M Stillwell Jr +Paul Szczepanek Pavan Kumar Linga Pavan Nikhilesh Pavel Belous diff --git a/lib/eal/include/meson.build b/lib/eal/include/meson.build index a0463efac7..60b056ef96 100644 --- a/lib/eal/include/meson.build +++ b/lib/eal/include/meson.build @@ -35,6 +35,7 @@ headers += files( 'rte_pci_dev_feature_defs.h', 'rte_pci_dev_features.h', 'rte_per_lcore.h', + 'rte_ptr_compress.h', 'rte_pflock.h', 'rte_random.h', 'rte_reciprocal.h', diff --git a/lib/eal/include/rte_ptr_compress.h b/lib/eal/include/rte_ptr_compress.h new file mode 100644 index 0000000000..6498587c0b --- /dev/null +++ b/lib/eal/include/rte_ptr_compress.h @@ -0,0 +1,158 @@ +/* SPDX-License-Identifier: BSD-shift-Clause + * Copyright(c) 2023 Arm Limited + */ + +#ifndef _RTE_PTR_COMPRESS_H_ +#define _RTE_PTR_COMPRESS_H_ + +/** + * @file + * RTE pointer compression and decompression. + */ + +#include +#include + +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Compress pointers into 32 bit offsets from base pointer. + * + * @note Offsets from the base pointer must fit within 32bits. Alignment allows + * us to drop bits from the offsets - this means that for pointers aligned by + * 8 bytes they must be within 32GB of the base pointer. Unaligned pointers + * must be within 4GB. + * + * @param ptr_base + * A pointer used to calculate offsets of pointers in src_table. + * @param src_table + * A pointer to an array of pointers. + * @param dest_table + * A pointer to an array of compressed pointers returned by this function. + * @param n + * The number of objects to compress, must be strictly positive. + * @param bit_shift + * Byte alignment of memory pointed to by the pointers allows for + * bits to be dropped from the offset and hence widen the memory region that + * can be covered. This controls how many bits are right shifted. + **/ +static __rte_always_inline void +rte_ptr_compress_32(void *ptr_base, void **src_table, + uint32_t *dest_table, unsigned int n, unsigned int bit_shift) +{ + unsigned int i = 0; +#if defined RTE_HAS_SVE_ACLE + svuint64_t v_src_table; + svuint64_t v_dest_table; + svbool_t pg = svwhilelt_b64(i, n); + do { + v_src_table = svld1_u64(pg, (uint64_t *)src_table + i); + v_dest_table = svsub_x(pg, v_src_table, (uint64_t)ptr_base); + v_dest_table = svlsr_x(pg, v_dest_table, bit_shift); + svst1w(pg, &dest_table[i], v_dest_table); + i += svcntd(); + pg = svwhilelt_b64(i, n); + } while (svptest_any(svptrue_b64(), pg)); +#elif defined __ARM_NEON + uint64_t ptr_diff; + uint64x2_t v_src_table; + uint64x2_t v_dest_table; + /* right shift is done by left shifting by negative int */ + int64x2_t v_shift = vdupq_n_s64(-bit_shift); + uint64x2_t v_ptr_base = vdupq_n_u64((uint64_t)ptr_base); + for (; i < (n & ~0x1); i += 2) { + v_src_table = vld1q_u64((const uint64_t *)src_table + i); + v_dest_table = vsubq_u64(v_src_table, v_ptr_base); + v_dest_table = vshlq_u64(v_dest_table, v_shift); + vst1_u32(dest_table + i, vqmovn_u64(v_dest_table)); + } + /* process leftover single item in case of odd number of n */ + if (unlikely(n & 0x1)) { + ptr_diff = RTE_PTR_DIFF(src_table[i], ptr_base); + dest_table[i] = (uint32_t) (ptr_diff >> bit_shift); + } +#else + uint64_t ptr_diff; + for (; i < n; i++) { + ptr_diff = RTE_PTR_DIFF(src_table[i], ptr_base); + /* save extra bits that are redundant due to alignment */ + ptr_diff = ptr_diff >> bit_shift; + /* make sure no truncation will happen when casting */ + RTE_ASSERT(ptr_diff <= UINT32_MAX); + dest_table[i] = (uint32_t) ptr_diff; + } +#endif +} + +/** + * Decompress pointers from 32 bit offsets from base pointer. + * + * @param ptr_base + * A pointer which was used to calculate offsets in src_table. + * @param src_table + * A pointer to an array to compressed pointers. + * @param dest_table + * A pointer to an array of decompressed pointers returned by this function. + * @param n + * The number of objects to decompress, must be strictly positive. + * @param bit_shift + * Byte alignment of memory pointed to by the pointers allows for + * bits to be dropped from the offset and hence widen the memory region that + * can be covered. This controls how many bits are left shifted when pointers + * are recovered from the offsets. + **/ +static __rte_always_inline void +rte_ptr_decompress_32(void *ptr_base, uint32_t *src_table, + void **dest_table, unsigned int n, unsigned int bit_shift) +{ + unsigned int i = 0; +#if defined RTE_HAS_SVE_ACLE + svuint64_t v_src_table; + svuint64_t v_dest_table; + svbool_t pg = svwhilelt_b64(i, n); + do { + v_src_table = svld1uw_u64(pg, &src_table[i]); + v_src_table = svlsl_x(pg, v_src_table, bit_shift); + v_dest_table = svadd_x(pg, v_src_table, (uint64_t)ptr_base); + svst1(pg, (uint64_t *)dest_table + i, v_dest_table); + i += svcntd(); + pg = svwhilelt_b64(i, n); + } while (svptest_any(svptrue_b64(), pg)); +#elif defined __ARM_NEON + uint64_t ptr_diff; + uint64x2_t v_src_table; + uint64x2_t v_dest_table; + int64x2_t v_shift = vdupq_n_s64(bit_shift); + uint64x2_t v_ptr_base = vdupq_n_u64((uint64_t)ptr_base); + for (; i < (n & ~0x1); i += 2) { + v_src_table = vmovl_u32(vld1_u32(src_table + i)); + v_src_table = vshlq_u64(v_dest_table, v_shift); + v_dest_table = vaddq_u64(v_src_table, v_ptr_base); + vst1q_u64((uint64_t *)dest_table + i, v_dest_table); + } + /* process leftover single item in case of odd number of n */ + if (unlikely(n & 0x1)) { + ptr_diff = ((uint64_t) src_table[i]) << bit_shift; + dest_table[i] = RTE_PTR_ADD(ptr_base, ptr_diff); + } +#else + uint64_t ptr_diff; + for (; i < n; i++) { + ptr_diff = ((uint64_t) src_table[i]) << bit_shift; + dest_table[i] = RTE_PTR_ADD(ptr_base, ptr_diff); + } +#endif +} + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_PTR_COMPRESS_H_ */ From patchwork Wed Sep 27 15:08:54 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Paul Szczepanek X-Patchwork-Id: 132028 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id A98D342653; Wed, 27 Sep 2023 17:10:00 +0200 (CEST) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 2AAAA40A77; Wed, 27 Sep 2023 17:09:51 +0200 (CEST) Received: from foss.arm.com (foss.arm.com [217.140.110.172]) by mails.dpdk.org (Postfix) with ESMTP id 89A404029B for ; Wed, 27 Sep 2023 17:09:47 +0200 (CEST) Received: from usa-sjc-imap-foss1.foss.arm.com (unknown [10.121.207.14]) by usa-sjc-mx-foss1.foss.arm.com (Postfix) with ESMTP id 240341042; Wed, 27 Sep 2023 08:10:25 -0700 (PDT) Received: from ampere-altra-2-1.usa.Arm.com (ampere-altra-2-1.usa.arm.com [10.118.91.158]) by usa-sjc-imap-foss1.foss.arm.com (Postfix) with ESMTPA id EF7DC3F59C; Wed, 27 Sep 2023 08:09:46 -0700 (PDT) From: Paul Szczepanek To: dev@dpdk.org Cc: Paul Szczepanek , Honnappa Nagarahalli Subject: [RFC 2/2] test: add pointer compress tests to ring perf test Date: Wed, 27 Sep 2023 15:08:54 +0000 Message-Id: <20230927150854.3670391-3-paul.szczepanek@arm.com> X-Mailer: git-send-email 2.25.1 In-Reply-To: <20230927150854.3670391-1-paul.szczepanek@arm.com> References: <20230927150854.3670391-1-paul.szczepanek@arm.com> MIME-Version: 1.0 X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Add a test that runs a zero copy burst enqueue and dequeue on a ring of raw pointers and compressed pointers at different burst sizes to showcase performance benefits of newly added pointer compression APIs To reuse existing code, some refactoring was done to pass more parameters to test threads. Additionally more bulk sizes were added to showcase their effects on compression. To keep runtime reasoanble iterations where adjusted to take into account bulk sizes. Old printfs are adjusted to match new ones which have aligned numbers. Signed-off-by: Paul Szczepanek Reviewed-by: Honnappa Nagarahalli --- app/test/test_ring.h | 59 ++++++- app/test/test_ring_perf.c | 324 +++++++++++++++++++++++--------------- 2 files changed, 259 insertions(+), 124 deletions(-) diff --git a/app/test/test_ring.h b/app/test/test_ring.h index 45c263f3ff..e8b7525c23 100644 --- a/app/test/test_ring.h +++ b/app/test/test_ring.h @@ -1,10 +1,12 @@ /* SPDX-License-Identifier: BSD-3-Clause - * Copyright(c) 2019 Arm Limited + * Copyright(c) 2019-2023 Arm Limited */ #include #include #include +#include +#include /* API type to call * rte_ring__enqueue_ @@ -25,6 +27,9 @@ #define TEST_RING_ELEM_BULK 16 #define TEST_RING_ELEM_BURST 32 +#define TEST_RING_ELEM_BURST_ZC 64 +#define TEST_RING_ELEM_BURST_ZC_COMPRESS_PTR_32 128 + #define TEST_RING_IGNORE_API_TYPE ~0U /* This function is placed here as it is required for both @@ -101,6 +106,9 @@ static inline unsigned int test_ring_enqueue(struct rte_ring *r, void **obj, int esize, unsigned int n, unsigned int api_type) { + unsigned int ret; + struct rte_ring_zc_data zcd = {0}; + /* Legacy queue APIs? */ if (esize == -1) switch (api_type) { @@ -152,6 +160,29 @@ test_ring_enqueue(struct rte_ring *r, void **obj, int esize, unsigned int n, case (TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BURST): return rte_ring_mp_enqueue_burst_elem(r, obj, esize, n, NULL); + case (TEST_RING_ELEM_BURST_ZC): + ret = rte_ring_enqueue_zc_burst_elem_start( + r, esize, n, &zcd, NULL); + if (unlikely(ret == 0)) + return 0; + rte_memcpy(zcd.ptr1, (char *)obj, zcd.n1 * esize); + if (unlikely(zcd.ptr2 != NULL)) + rte_memcpy(zcd.ptr2, + (char *)obj + zcd.n1 * esize, + (ret - zcd.n1) * esize); + rte_ring_enqueue_zc_finish(r, ret); + return ret; + case (TEST_RING_ELEM_BURST_ZC_COMPRESS_PTR_32): + ret = rte_ring_enqueue_zc_burst_elem_start( + r, sizeof(uint32_t), n, &zcd, NULL); + if (unlikely(ret == 0)) + return 0; + rte_ptr_compress_32(0, obj, zcd.ptr1, zcd.n1, 3); + if (unlikely(zcd.ptr2 != NULL)) + rte_ptr_compress_32(0, obj + zcd.n1, + zcd.ptr2, ret - zcd.n1, 3); + rte_ring_enqueue_zc_finish(r, ret); + return ret; default: printf("Invalid API type\n"); return 0; @@ -162,6 +193,9 @@ static inline unsigned int test_ring_dequeue(struct rte_ring *r, void **obj, int esize, unsigned int n, unsigned int api_type) { + unsigned int ret; + struct rte_ring_zc_data zcd = {0}; + /* Legacy queue APIs? */ if (esize == -1) switch (api_type) { @@ -213,6 +247,29 @@ test_ring_dequeue(struct rte_ring *r, void **obj, int esize, unsigned int n, case (TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BURST): return rte_ring_mc_dequeue_burst_elem(r, obj, esize, n, NULL); + case (TEST_RING_ELEM_BURST_ZC): + ret = rte_ring_dequeue_zc_burst_elem_start( + r, esize, n, &zcd, NULL); + if (unlikely(ret == 0)) + return 0; + rte_memcpy((char *)obj, zcd.ptr1, zcd.n1 * esize); + if (unlikely(zcd.ptr2 != NULL)) + rte_memcpy((char *)obj + zcd.n1 * esize, + zcd.ptr2, + (ret - zcd.n1) * esize); + rte_ring_dequeue_zc_finish(r, ret); + return ret; + case (TEST_RING_ELEM_BURST_ZC_COMPRESS_PTR_32): + ret = rte_ring_dequeue_zc_burst_elem_start( + r, sizeof(uint32_t), n, &zcd, NULL); + if (unlikely(ret == 0)) + return 0; + rte_ptr_decompress_32(0, zcd.ptr1, obj, zcd.n1, 3); + if (unlikely(zcd.ptr2 != NULL)) + rte_ptr_decompress_32(0, zcd.ptr2, + obj + zcd.n1, ret - zcd.n1, 3); + rte_ring_dequeue_zc_finish(r, ret); + return ret; default: printf("Invalid API type\n"); return 0; diff --git a/app/test/test_ring_perf.c b/app/test/test_ring_perf.c index d7c5a4c30b..3c9dc24447 100644 --- a/app/test/test_ring_perf.c +++ b/app/test/test_ring_perf.c @@ -1,6 +1,6 @@ /* SPDX-License-Identifier: BSD-3-Clause * Copyright(c) 2010-2014 Intel Corporation - * Copyright(c) 2019 Arm Limited + * Copyright(c) 2019-2023 Arm Limited */ @@ -21,14 +21,15 @@ */ #define RING_NAME "RING_PERF" +#define RING_NAME_COMPRESS "RING_COMP" #define RING_SIZE 4096 -#define MAX_BURST 32 +#define MAX_BURST 128 /* * the sizes to enqueue and dequeue in testing * (marked volatile so they won't be seen as compile-time constants) */ -static const volatile unsigned bulk_sizes[] = { 8, 32 }; +static const volatile unsigned int bulk_sizes[] = { 8, 32, 64, 128 }; struct lcore_pair { unsigned c1, c2; @@ -43,26 +44,29 @@ test_ring_print_test_string(unsigned int api_type, int esize, if (esize == -1) printf("legacy APIs"); else - printf("elem APIs: element size %dB", esize); + printf("elem APIs (size:%2dB)", esize); if (api_type == TEST_RING_IGNORE_API_TYPE) return; if ((api_type & TEST_RING_THREAD_DEF) == TEST_RING_THREAD_DEF) - printf(": default enqueue/dequeue: "); + printf(" - default enqueue/dequeue"); else if ((api_type & TEST_RING_THREAD_SPSC) == TEST_RING_THREAD_SPSC) - printf(": SP/SC: "); + printf(" - SP/SC"); else if ((api_type & TEST_RING_THREAD_MPMC) == TEST_RING_THREAD_MPMC) - printf(": MP/MC: "); + printf(" - MP/MC"); if ((api_type & TEST_RING_ELEM_SINGLE) == TEST_RING_ELEM_SINGLE) - printf("single: "); + printf(" - single - "); else if ((api_type & TEST_RING_ELEM_BULK) == TEST_RING_ELEM_BULK) - printf("bulk (size: %u): ", bsz); + printf(" - bulk (n:%-3u) - ", bsz); else if ((api_type & TEST_RING_ELEM_BURST) == TEST_RING_ELEM_BURST) - printf("burst (size: %u): ", bsz); + printf(" - burst (n:%-3u) - ", bsz); + else if ((api_type & (TEST_RING_ELEM_BURST_ZC | + TEST_RING_ELEM_BURST_ZC_COMPRESS_PTR_32)) != 0) + printf(" - burst zero copy (n:%-3u) - ", bsz); - printf("%.2F\n", value); + printf("cycles per elem: %.3F\n", value); } /**** Functions to analyse our core mask to get cores for different tests ***/ @@ -144,28 +148,33 @@ static void test_empty_dequeue(struct rte_ring *r, const int esize, const unsigned int api_type) { - const unsigned int iter_shift = 26; + const unsigned int iter_shift = 30; const unsigned int iterations = 1 << iter_shift; unsigned int i = 0; void *burst[MAX_BURST]; + const unsigned int bulk_iterations = iterations / bulk_sizes[0]; const uint64_t start = rte_rdtsc(); - for (i = 0; i < iterations; i++) + for (i = 0; i < bulk_iterations; i++) test_ring_dequeue(r, burst, esize, bulk_sizes[0], api_type); const uint64_t end = rte_rdtsc(); test_ring_print_test_string(api_type, esize, bulk_sizes[0], - ((double)(end - start)) / iterations); + ((double)end - start) / iterations); } -/* - * for the separate enqueue and dequeue threads they take in one param - * and return two. Input = burst size, output = cycle average for sp/sc & mp/mc - */ -struct thread_params { +/* describes the ring used by the enqueue and dequeue thread */ +struct ring_params { struct rte_ring *r; - unsigned size; /* input value, the burst size */ - double spsc, mpmc; /* output value, the single or multi timings */ + unsigned int elem_size; + unsigned int bulk_sizes_i; /* index into bulk_size array */ + unsigned int ring_flags; /* flags for test_ring_enqueue/dequeue */ +}; + +/* Used do specify enqueue and dequeue ring operations and their results */ +struct thread_params { + struct ring_params *ring_params; + double *results; /* result array size must be equal to bulk_sizes */ }; /* @@ -174,16 +183,15 @@ struct thread_params { * flag == 1 -> dequeue */ static __rte_always_inline int -enqueue_dequeue_bulk_helper(const unsigned int flag, const int esize, - struct thread_params *p) +enqueue_dequeue_bulk_helper(const unsigned int flag, struct thread_params *p) { int ret; - const unsigned int iter_shift = 15; + const unsigned int iter_shift = 30; const unsigned int iterations = 1 << iter_shift; - struct rte_ring *r = p->r; - unsigned int bsize = p->size; unsigned int i; void *burst = NULL; + unsigned int n_remaining; + const unsigned int bulk_n = bulk_sizes[p->ring_params->bulk_sizes_i]; #ifdef RTE_USE_C11_MEM_MODEL if (__atomic_fetch_add(&lcore_count, 1, __ATOMIC_RELAXED) + 1 != 2) @@ -193,44 +201,38 @@ enqueue_dequeue_bulk_helper(const unsigned int flag, const int esize, while(lcore_count != 2) rte_pause(); - burst = test_ring_calloc(MAX_BURST, esize); + burst = test_ring_calloc(MAX_BURST, p->ring_params->elem_size); if (burst == NULL) return -1; const uint64_t sp_start = rte_rdtsc(); - for (i = 0; i < iterations; i++) + const unsigned int bulk_iterations = iterations / bulk_n; + for (i = 0; i < bulk_iterations; i++) { + n_remaining = bulk_n; do { if (flag == 0) - ret = test_ring_enqueue(r, burst, esize, bsize, - TEST_RING_THREAD_SPSC | - TEST_RING_ELEM_BULK); + ret = test_ring_enqueue(p->ring_params->r, + burst, + p->ring_params->elem_size, + n_remaining, + p->ring_params->ring_flags); else if (flag == 1) - ret = test_ring_dequeue(r, burst, esize, bsize, - TEST_RING_THREAD_SPSC | - TEST_RING_ELEM_BULK); + ret = test_ring_dequeue(p->ring_params->r, + burst, + p->ring_params->elem_size, + n_remaining, + p->ring_params->ring_flags); if (ret == 0) rte_pause(); - } while (!ret); + else + n_remaining -= ret; + } while (n_remaining > 0); + } const uint64_t sp_end = rte_rdtsc(); - const uint64_t mp_start = rte_rdtsc(); - for (i = 0; i < iterations; i++) - do { - if (flag == 0) - ret = test_ring_enqueue(r, burst, esize, bsize, - TEST_RING_THREAD_MPMC | - TEST_RING_ELEM_BULK); - else if (flag == 1) - ret = test_ring_dequeue(r, burst, esize, bsize, - TEST_RING_THREAD_MPMC | - TEST_RING_ELEM_BULK); - if (ret == 0) - rte_pause(); - } while (!ret); - const uint64_t mp_end = rte_rdtsc(); + p->results[p->ring_params->bulk_sizes_i] = + ((double)sp_end - sp_start) / iterations; - p->spsc = ((double)(sp_end - sp_start))/(iterations * bsize); - p->mpmc = ((double)(mp_end - mp_start))/(iterations * bsize); return 0; } @@ -243,15 +245,7 @@ enqueue_bulk(void *p) { struct thread_params *params = p; - return enqueue_dequeue_bulk_helper(0, -1, params); -} - -static int -enqueue_bulk_16B(void *p) -{ - struct thread_params *params = p; - - return enqueue_dequeue_bulk_helper(0, 16, params); + return enqueue_dequeue_bulk_helper(0, params); } /* @@ -263,15 +257,7 @@ dequeue_bulk(void *p) { struct thread_params *params = p; - return enqueue_dequeue_bulk_helper(1, -1, params); -} - -static int -dequeue_bulk_16B(void *p) -{ - struct thread_params *params = p; - - return enqueue_dequeue_bulk_helper(1, 16, params); + return enqueue_dequeue_bulk_helper(1, params); } /* @@ -279,42 +265,32 @@ dequeue_bulk_16B(void *p) * used to measure ring perf between hyperthreads, cores and sockets. */ static int -run_on_core_pair(struct lcore_pair *cores, struct rte_ring *r, const int esize) +run_on_core_pair(struct lcore_pair *cores, + struct thread_params *param1, struct thread_params *param2) { - lcore_function_t *f1, *f2; - struct thread_params param1 = {0}, param2 = {0}; unsigned i; - - if (esize == -1) { - f1 = enqueue_bulk; - f2 = dequeue_bulk; - } else { - f1 = enqueue_bulk_16B; - f2 = dequeue_bulk_16B; - } + struct ring_params *ring_params = param1->ring_params; for (i = 0; i < RTE_DIM(bulk_sizes); i++) { lcore_count = 0; - param1.size = param2.size = bulk_sizes[i]; - param1.r = param2.r = r; + ring_params->bulk_sizes_i = i; if (cores->c1 == rte_get_main_lcore()) { - rte_eal_remote_launch(f2, ¶m2, cores->c2); - f1(¶m1); + rte_eal_remote_launch(dequeue_bulk, param2, cores->c2); + enqueue_bulk(param1); rte_eal_wait_lcore(cores->c2); } else { - rte_eal_remote_launch(f1, ¶m1, cores->c1); - rte_eal_remote_launch(f2, ¶m2, cores->c2); + rte_eal_remote_launch(enqueue_bulk, param1, cores->c1); + rte_eal_remote_launch(dequeue_bulk, param2, cores->c2); if (rte_eal_wait_lcore(cores->c1) < 0) return -1; if (rte_eal_wait_lcore(cores->c2) < 0) return -1; } test_ring_print_test_string( - TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK, - esize, bulk_sizes[i], param1.spsc + param2.spsc); - test_ring_print_test_string( - TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK, - esize, bulk_sizes[i], param1.mpmc + param2.mpmc); + ring_params->ring_flags, + ring_params->elem_size, + bulk_sizes[i], + param1->results[i] + param2->results[i]); } return 0; @@ -333,7 +309,7 @@ load_loop_fn_helper(struct thread_params *p, const int esize) uint64_t hz = rte_get_timer_hz(); uint64_t lcount = 0; const unsigned int lcore = rte_lcore_id(); - struct thread_params *params = p; + struct ring_params *ring_params = p->ring_params; void *burst = NULL; burst = test_ring_calloc(MAX_BURST, esize); @@ -346,9 +322,11 @@ load_loop_fn_helper(struct thread_params *p, const int esize) begin = rte_get_timer_cycles(); while (time_diff < hz * TIME_MS / 1000) { - test_ring_enqueue(params->r, burst, esize, params->size, + test_ring_enqueue(ring_params->r, burst, esize, + ring_params->elem_size, TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK); - test_ring_dequeue(params->r, burst, esize, params->size, + test_ring_dequeue(ring_params->r, burst, esize, + ring_params->elem_size, TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK); lcount++; time_diff = rte_get_timer_cycles() - begin; @@ -380,7 +358,8 @@ static int run_on_all_cores(struct rte_ring *r, const int esize) { uint64_t total; - struct thread_params param; + struct ring_params ring_params = {0}; + struct thread_params params = { .ring_params = &ring_params }; lcore_function_t *lcore_f; unsigned int i, c; @@ -389,21 +368,20 @@ run_on_all_cores(struct rte_ring *r, const int esize) else lcore_f = load_loop_fn_16B; - memset(¶m, 0, sizeof(struct thread_params)); for (i = 0; i < RTE_DIM(bulk_sizes); i++) { total = 0; printf("\nBulk enq/dequeue count on size %u\n", bulk_sizes[i]); - param.size = bulk_sizes[i]; - param.r = r; + params.ring_params->bulk_sizes_i = i; + params.ring_params->r = r; /* clear synchro and start workers */ __atomic_store_n(&synchro, 0, __ATOMIC_RELAXED); - if (rte_eal_mp_remote_launch(lcore_f, ¶m, SKIP_MAIN) < 0) + if (rte_eal_mp_remote_launch(lcore_f, ¶ms, SKIP_MAIN) < 0) return -1; /* start synchro and launch test on main */ __atomic_store_n(&synchro, 1, __ATOMIC_RELAXED); - lcore_f(¶m); + lcore_f(¶ms); rte_eal_mp_wait_lcore(); @@ -462,9 +440,9 @@ static int test_burst_bulk_enqueue_dequeue(struct rte_ring *r, const int esize, const unsigned int api_type) { - const unsigned int iter_shift = 23; + const unsigned int iter_shift = 27; const unsigned int iterations = 1 << iter_shift; - unsigned int sz, i = 0; + unsigned int sz, i; void **burst = NULL; burst = test_ring_calloc(MAX_BURST, esize); @@ -472,17 +450,18 @@ test_burst_bulk_enqueue_dequeue(struct rte_ring *r, const int esize, return -1; for (sz = 0; sz < RTE_DIM(bulk_sizes); sz++) { + const unsigned int n = iterations / bulk_sizes[sz]; const uint64_t start = rte_rdtsc(); - for (i = 0; i < iterations; i++) { + for (i = 0; i < n; i++) { test_ring_enqueue(r, burst, esize, bulk_sizes[sz], - api_type); + api_type); test_ring_dequeue(r, burst, esize, bulk_sizes[sz], - api_type); + api_type); } const uint64_t end = rte_rdtsc(); test_ring_print_test_string(api_type, esize, bulk_sizes[sz], - ((double)(end - start)) / iterations); + ((double)end - start) / iterations); } rte_free(burst); @@ -490,12 +469,43 @@ test_burst_bulk_enqueue_dequeue(struct rte_ring *r, const int esize, return 0; } +static __rte_always_inline int +test_ring_perf_esize_run_on_two_cores( + struct thread_params *param1, struct thread_params *param2) +{ + struct lcore_pair cores; + + if (get_two_hyperthreads(&cores) == 0) { + printf("\n### Testing using two hyperthreads ###\n"); + if (run_on_core_pair(&cores, param1, param2) < 0) + return -1; + } + if (get_two_cores(&cores) == 0) { + printf("\n### Testing using two physical cores ###\n"); + if (run_on_core_pair(&cores, param1, param2) < 0) + return -1; + } + if (get_two_sockets(&cores) == 0) { + printf("\n### Testing using two NUMA nodes ###\n"); + if (run_on_core_pair(&cores, param1, param2) < 0) + return -1; + } + return 0; +} + /* Run all tests for a given element size */ static __rte_always_inline int test_ring_perf_esize(const int esize) { - struct lcore_pair cores; struct rte_ring *r = NULL; + double results_enq[RTE_DIM(bulk_sizes)]; + double results_deq[RTE_DIM(bulk_sizes)]; + struct ring_params ring_params = { + .elem_size = esize, .ring_flags = TEST_RING_ELEM_BULK }; + struct thread_params param1 = { + .ring_params = &ring_params, .results = results_enq }; + struct thread_params param2 = { + .ring_params = &ring_params, .results = results_deq }; /* * Performance test for legacy/_elem APIs @@ -535,22 +545,13 @@ test_ring_perf_esize(const int esize) test_empty_dequeue(r, esize, TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK); - if (get_two_hyperthreads(&cores) == 0) { - printf("\n### Testing using two hyperthreads ###\n"); - if (run_on_core_pair(&cores, r, esize) < 0) - goto test_fail; - } + ring_params.r = r; - if (get_two_cores(&cores) == 0) { - printf("\n### Testing using two physical cores ###\n"); - if (run_on_core_pair(&cores, r, esize) < 0) - goto test_fail; - } - if (get_two_sockets(&cores) == 0) { - printf("\n### Testing using two NUMA nodes ###\n"); - if (run_on_core_pair(&cores, r, esize) < 0) - goto test_fail; - } + ring_params.ring_flags = TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK; + test_ring_perf_esize_run_on_two_cores(¶m1, ¶m2); + + ring_params.ring_flags = TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK; + test_ring_perf_esize_run_on_two_cores(¶m1, ¶m2); printf("\n### Testing using all worker nodes ###\n"); if (run_on_all_cores(r, esize) < 0) @@ -566,6 +567,79 @@ test_ring_perf_esize(const int esize) return -1; } + +static __rte_always_inline int +test_ring_perf_compression(void) +{ + double results1[RTE_DIM(bulk_sizes)]; + double results2[RTE_DIM(bulk_sizes)]; + double results1_comp[RTE_DIM(bulk_sizes)]; + double results2_comp[RTE_DIM(bulk_sizes)]; + + struct lcore_pair cores; + int ret = -1; + unsigned int i; + struct ring_params ring_params = { .elem_size = sizeof(void *) }; + struct thread_params param1 = { + .ring_params = &ring_params, .results = results1 }; + struct thread_params param2 = { + .ring_params = &ring_params, .results = results2 }; + + printf("\n### Testing compression gain ###"); + + ring_params.r = rte_ring_create_elem( + RING_NAME, sizeof(void *), + RING_SIZE, rte_socket_id(), + RING_F_SP_ENQ | RING_F_SC_DEQ); + + if (ring_params.r == NULL) + return -1; + + if (get_two_cores(&cores) == 0) { + printf("\n### Testing zero copy ###\n"); + ring_params.ring_flags = TEST_RING_ELEM_BURST_ZC; + ret = run_on_core_pair(&cores, ¶m1, ¶m2); + } + + rte_ring_free(ring_params.r); + + if (ret != 0) + return ret; + + ring_params.r = rte_ring_create_elem( + RING_NAME_COMPRESS, sizeof(uint32_t), + RING_SIZE, rte_socket_id(), + RING_F_SP_ENQ | RING_F_SC_DEQ); + + if (ring_params.r == NULL) + return -1; + + /* swap out the result arrays for the compression ones */ + param1.results = results1_comp; + param2.results = results2_comp; + + if (get_two_cores(&cores) == 0) { + printf("\n### Testing zero copy with compression ###\n"); + ring_params.ring_flags = + TEST_RING_ELEM_BURST_ZC_COMPRESS_PTR_32; + ret = run_on_core_pair(&cores, ¶m1, ¶m2); + } + + rte_ring_free(ring_params.r); + + printf("\n\n### Potential gain from compression ###\n"); + for (i = 0; i < RTE_DIM(bulk_sizes); i++) { + const double result = results1[i] + results2[i]; + const double result_comp = results1_comp[i] + results1_comp[i]; + const double gain = 100 - (result_comp / result) * 100; + + printf("Gain of %5.1F%% for burst of %-3u elems\n", + gain, bulk_sizes[i]); + } + + return ret; +} + static int test_ring_perf(void) { @@ -576,6 +650,10 @@ test_ring_perf(void) if (test_ring_perf_esize(16) == -1) return -1; + /* Test for performance gain of compression */ + if (test_ring_perf_compression() == -1) + return -1; + return 0; }