From patchwork Fri May 13 16:07:15 2022 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Pavan Nikhilesh Bhagavatula X-Patchwork-Id: 111130 X-Patchwork-Delegate: jerinj@marvell.com Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id B73F6A00C3; Fri, 13 May 2022 18:07:36 +0200 (CEST) Received: from [217.70.189.124] (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id AAD2A42832; Fri, 13 May 2022 18:07:30 +0200 (CEST) Received: from mx0b-0016f401.pphosted.com (mx0b-0016f401.pphosted.com [67.231.156.173]) by mails.dpdk.org (Postfix) with ESMTP id 7CEA942832 for ; Fri, 13 May 2022 18:07:28 +0200 (CEST) Received: from pps.filterd (m0045851.ppops.net [127.0.0.1]) by mx0b-0016f401.pphosted.com (8.17.1.5/8.17.1.5) with ESMTP id 24DCHXoM010448 for ; Fri, 13 May 2022 09:07:27 -0700 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=marvell.com; h=from : to : cc : subject : date : message-id : in-reply-to : references : mime-version : content-transfer-encoding : content-type; s=pfpt0220; bh=1OdMPVhqAH24NFC+ol7EI610Lg9ZB935DxvyfQNU7Ok=; b=RMSpqXmltMVVH+m6IyvySQMdGgT0bqti+DGdAIU7R1W4GC/m2tvYYF75HmmEKbsVlDpr C9BYqbg9dNanAlxO8bPGLh7wH7xxUThUItC3F/C03RunUyQXc4WQATyxpzw7JymufpXo YgTy/iSfj6vXI9S20i2UgwTAfjfD3PSrmBdNc4i7gQEDJYBbDvs2xNHWQ/mtZaPPtDdA eDHDwBNQwBSmCRaq6QYo1JNS7ikJvv6I1ekNWRWHgzsX6uMma/zs2FNvwjbJT97BQjHZ o6Vr7dJMyxsmIV9kyp5v36CErmtqb8biD/zpCS945HMEPolXtU+z7NbS2OtMkI9NZ9r8 yw== Received: from dc5-exch02.marvell.com ([199.233.59.182]) by mx0b-0016f401.pphosted.com (PPS) with ESMTPS id 3g1c37b431-1 (version=TLSv1.2 cipher=ECDHE-RSA-AES256-SHA384 bits=256 verify=NOT) for ; Fri, 13 May 2022 09:07:27 -0700 Received: from DC5-EXCH01.marvell.com (10.69.176.38) by DC5-EXCH02.marvell.com (10.69.176.39) with Microsoft SMTP Server (TLS) id 15.0.1497.18; Fri, 13 May 2022 09:07:25 -0700 Received: from maili.marvell.com (10.69.176.80) by DC5-EXCH01.marvell.com (10.69.176.38) with Microsoft SMTP Server id 15.0.1497.2 via Frontend Transport; Fri, 13 May 2022 09:07:25 -0700 Received: from MININT-80QBFE8.corp.innovium.com (unknown [10.193.70.72]) by maili.marvell.com (Postfix) with ESMTP id 38FDA3F7070; Fri, 13 May 2022 09:07:22 -0700 (PDT) From: To: CC: , Pavan Nikhilesh Subject: [PATCH v2 2/6] app/eventdev: clean up worker state before exit Date: Fri, 13 May 2022 21:37:15 +0530 Message-ID: <20220513160719.10558-2-pbhagavatula@marvell.com> X-Mailer: git-send-email 2.25.1 In-Reply-To: <20220513160719.10558-1-pbhagavatula@marvell.com> References: <20220426211412.6138-1-pbhagavatula@marvell.com> <20220513160719.10558-1-pbhagavatula@marvell.com> MIME-Version: 1.0 X-Proofpoint-GUID: 1I6rQj6hqprD1mcLsJwPUu0-UrEB4Rq4 X-Proofpoint-ORIG-GUID: 1I6rQj6hqprD1mcLsJwPUu0-UrEB4Rq4 X-Proofpoint-Virus-Version: vendor=baseguard engine=ICAP:2.0.205,Aquarius:18.0.858,Hydra:6.0.486,FMLib:17.11.64.514 definitions=2022-05-13_08,2022-05-13_01,2022-02-23_01 X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org From: Pavan Nikhilesh Event ports are configured to implicitly release the scheduler contexts currently held in the next call to rte_event_dequeue_burst(). A worker core might still hold a scheduling context during exit, as the next call to rte_event_dequeue_burst() is never made. This might lead to deadlock based on the worker exit timing and when there are very less number of flows. Add clean up function to release any scheduling contexts held by the worker by using RTE_EVENT_OP_RELEASE. Signed-off-by: Pavan Nikhilesh Acked-by: Jerin Jacob --- app/test-eventdev/test_perf_atq.c | 31 +++-- app/test-eventdev/test_perf_common.c | 17 +++ app/test-eventdev/test_perf_common.h | 3 + app/test-eventdev/test_perf_queue.c | 30 +++-- app/test-eventdev/test_pipeline_atq.c | 134 ++++++++++++--------- app/test-eventdev/test_pipeline_common.c | 39 ++++++ app/test-eventdev/test_pipeline_common.h | 59 ++++++--- app/test-eventdev/test_pipeline_queue.c | 145 ++++++++++++++--------- 8 files changed, 304 insertions(+), 154 deletions(-) diff --git a/app/test-eventdev/test_perf_atq.c b/app/test-eventdev/test_perf_atq.c index bac3ea602f..5a0b190384 100644 --- a/app/test-eventdev/test_perf_atq.c +++ b/app/test-eventdev/test_perf_atq.c @@ -37,13 +37,14 @@ atq_fwd_event(struct rte_event *const ev, uint8_t *const sched_type_list, static int perf_atq_worker(void *arg, const int enable_fwd_latency) { - PERF_WORKER_INIT; + uint16_t enq = 0, deq = 0; struct rte_event ev; + PERF_WORKER_INIT; while (t->done == false) { - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); - if (!event) { + if (!deq) { rte_pause(); continue; } @@ -78,24 +79,29 @@ perf_atq_worker(void *arg, const int enable_fwd_latency) bufs, sz, cnt); } else { atq_fwd_event(&ev, sched_type_list, nb_stages); - while (rte_event_enqueue_burst(dev, port, &ev, 1) != 1) - rte_pause(); + do { + enq = rte_event_enqueue_burst(dev, port, &ev, + 1); + } while (!enq && !t->done); } } + + perf_worker_cleanup(pool, dev, port, &ev, enq, deq); + return 0; } static int perf_atq_worker_burst(void *arg, const int enable_fwd_latency) { - PERF_WORKER_INIT; - uint16_t i; /* +1 to avoid prefetch out of array check */ struct rte_event ev[BURST_SIZE + 1]; + uint16_t enq = 0, nb_rx = 0; + PERF_WORKER_INIT; + uint16_t i; while (t->done == false) { - uint16_t const nb_rx = rte_event_dequeue_burst(dev, port, ev, - BURST_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -146,14 +152,15 @@ perf_atq_worker_burst(void *arg, const int enable_fwd_latency) } } - uint16_t enq; - enq = rte_event_enqueue_burst(dev, port, ev, nb_rx); - while (enq < nb_rx) { + while ((enq < nb_rx) && !t->done) { enq += rte_event_enqueue_burst(dev, port, ev + enq, nb_rx - enq); } } + + perf_worker_cleanup(pool, dev, port, ev, enq, nb_rx); + return 0; } diff --git a/app/test-eventdev/test_perf_common.c b/app/test-eventdev/test_perf_common.c index 4cf16b4267..b51a100425 100644 --- a/app/test-eventdev/test_perf_common.c +++ b/app/test-eventdev/test_perf_common.c @@ -985,6 +985,23 @@ perf_opt_dump(struct evt_options *opt, uint8_t nb_queues) evt_dump("prod_enq_burst_sz", "%d", opt->prod_enq_burst_sz); } +void +perf_worker_cleanup(struct rte_mempool *const pool, uint8_t dev_id, + uint8_t port_id, struct rte_event events[], uint16_t nb_enq, + uint16_t nb_deq) +{ + int i; + + if (nb_deq) { + for (i = nb_enq; i < nb_deq; i++) + rte_mempool_put(pool, events[i].event_ptr); + + for (i = 0; i < nb_deq; i++) + events[i].op = RTE_EVENT_OP_RELEASE; + rte_event_enqueue_burst(dev_id, port_id, events, nb_deq); + } +} + void perf_eventdev_destroy(struct evt_test *test, struct evt_options *opt) { diff --git a/app/test-eventdev/test_perf_common.h b/app/test-eventdev/test_perf_common.h index e504bb1df9..f6bfc73be0 100644 --- a/app/test-eventdev/test_perf_common.h +++ b/app/test-eventdev/test_perf_common.h @@ -184,5 +184,8 @@ void perf_cryptodev_destroy(struct evt_test *test, struct evt_options *opt); void perf_ethdev_destroy(struct evt_test *test, struct evt_options *opt); void perf_ethdev_rx_stop(struct evt_test *test, struct evt_options *opt); void perf_mempool_destroy(struct evt_test *test, struct evt_options *opt); +void perf_worker_cleanup(struct rte_mempool *const pool, uint8_t dev_id, + uint8_t port_id, struct rte_event events[], + uint16_t nb_enq, uint16_t nb_deq); #endif /* _TEST_PERF_COMMON_ */ diff --git a/app/test-eventdev/test_perf_queue.c b/app/test-eventdev/test_perf_queue.c index 108f1742a7..b498cacef6 100644 --- a/app/test-eventdev/test_perf_queue.c +++ b/app/test-eventdev/test_perf_queue.c @@ -39,13 +39,14 @@ fwd_event(struct rte_event *const ev, uint8_t *const sched_type_list, static int perf_queue_worker(void *arg, const int enable_fwd_latency) { - PERF_WORKER_INIT; + uint16_t enq = 0, deq = 0; struct rte_event ev; + PERF_WORKER_INIT; while (t->done == false) { - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); - if (!event) { + if (!deq) { rte_pause(); continue; } @@ -80,24 +81,29 @@ perf_queue_worker(void *arg, const int enable_fwd_latency) &ev, w, bufs, sz, cnt); } else { fwd_event(&ev, sched_type_list, nb_stages); - while (rte_event_enqueue_burst(dev, port, &ev, 1) != 1) - rte_pause(); + do { + enq = rte_event_enqueue_burst(dev, port, &ev, + 1); + } while (!enq && !t->done); } } + + perf_worker_cleanup(pool, dev, port, &ev, enq, deq); + return 0; } static int perf_queue_worker_burst(void *arg, const int enable_fwd_latency) { - PERF_WORKER_INIT; - uint16_t i; /* +1 to avoid prefetch out of array check */ struct rte_event ev[BURST_SIZE + 1]; + uint16_t enq = 0, nb_rx = 0; + PERF_WORKER_INIT; + uint16_t i; while (t->done == false) { - uint16_t const nb_rx = rte_event_dequeue_burst(dev, port, ev, - BURST_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -147,14 +153,16 @@ perf_queue_worker_burst(void *arg, const int enable_fwd_latency) } } - uint16_t enq; enq = rte_event_enqueue_burst(dev, port, ev, nb_rx); - while (enq < nb_rx) { + while (enq < nb_rx && !t->done) { enq += rte_event_enqueue_burst(dev, port, ev + enq, nb_rx - enq); } } + + perf_worker_cleanup(pool, dev, port, ev, enq, nb_rx); + return 0; } diff --git a/app/test-eventdev/test_pipeline_atq.c b/app/test-eventdev/test_pipeline_atq.c index 79218502ba..4b10197127 100644 --- a/app/test-eventdev/test_pipeline_atq.c +++ b/app/test-eventdev/test_pipeline_atq.c @@ -21,18 +21,20 @@ static __rte_noinline int pipeline_atq_worker_single_stage_tx(void *arg) { PIPELINE_WORKER_SINGLE_STAGE_INIT; + uint8_t enq = 0, deq = 0; while (t->done == false) { - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); - if (!event) { + if (!deq) { rte_pause(); continue; } - pipeline_event_tx(dev, port, &ev); + deq = pipeline_event_tx(dev, port, &ev, t); w->processed_pkts++; } + pipeline_worker_cleanup(dev, port, &ev, enq, deq); return 0; } @@ -42,20 +44,22 @@ pipeline_atq_worker_single_stage_fwd(void *arg) { PIPELINE_WORKER_SINGLE_STAGE_INIT; const uint8_t *tx_queue = t->tx_evqueue_id; + uint8_t enq = 0, deq = 0; while (t->done == false) { - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); - if (!event) { + if (!deq) { rte_pause(); continue; } ev.queue_id = tx_queue[ev.mbuf->port]; pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); - pipeline_event_enqueue(dev, port, &ev); + enq = pipeline_event_enqueue(dev, port, &ev, t); w->processed_pkts++; } + pipeline_worker_cleanup(dev, port, &ev, enq, deq); return 0; } @@ -64,10 +68,10 @@ static __rte_noinline int pipeline_atq_worker_single_stage_burst_tx(void *arg) { PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; + uint16_t nb_rx = 0, nb_tx = 0; while (t->done == false) { - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, - BURST_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -79,9 +83,10 @@ pipeline_atq_worker_single_stage_burst_tx(void *arg) rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0); } - pipeline_event_tx_burst(dev, port, ev, nb_rx); - w->processed_pkts += nb_rx; + nb_tx = pipeline_event_tx_burst(dev, port, ev, nb_rx, t); + w->processed_pkts += nb_tx; } + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); return 0; } @@ -91,10 +96,10 @@ pipeline_atq_worker_single_stage_burst_fwd(void *arg) { PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; const uint8_t *tx_queue = t->tx_evqueue_id; + uint16_t nb_rx = 0, nb_tx = 0; while (t->done == false) { - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, - BURST_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -108,9 +113,10 @@ pipeline_atq_worker_single_stage_burst_fwd(void *arg) pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); } - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); - w->processed_pkts += nb_rx; + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); + w->processed_pkts += nb_tx; } + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); return 0; } @@ -119,19 +125,21 @@ static __rte_noinline int pipeline_atq_worker_single_stage_tx_vector(void *arg) { PIPELINE_WORKER_SINGLE_STAGE_INIT; + uint8_t enq = 0, deq = 0; uint16_t vector_sz; while (!t->done) { - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); - if (!event) { + if (!deq) { rte_pause(); continue; } vector_sz = ev.vec->nb_elem; - pipeline_event_tx_vector(dev, port, &ev); + enq = pipeline_event_tx_vector(dev, port, &ev, t); w->processed_pkts += vector_sz; } + pipeline_worker_cleanup(dev, port, &ev, enq, deq); return 0; } @@ -141,12 +149,13 @@ pipeline_atq_worker_single_stage_fwd_vector(void *arg) { PIPELINE_WORKER_SINGLE_STAGE_INIT; const uint8_t *tx_queue = t->tx_evqueue_id; + uint8_t enq = 0, deq = 0; uint16_t vector_sz; while (!t->done) { - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); - if (!event) { + if (!deq) { rte_pause(); continue; } @@ -155,9 +164,10 @@ pipeline_atq_worker_single_stage_fwd_vector(void *arg) ev.queue_id = tx_queue[ev.vec->port]; ev.vec->queue = 0; pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC); - pipeline_event_enqueue(dev, port, &ev); + enq = pipeline_event_enqueue(dev, port, &ev, t); w->processed_pkts += vector_sz; } + pipeline_worker_cleanup(dev, port, &ev, enq, deq); return 0; } @@ -166,11 +176,11 @@ static __rte_noinline int pipeline_atq_worker_single_stage_burst_tx_vector(void *arg) { PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; + uint16_t nb_rx = 0, nb_tx = 0; uint16_t vector_sz; while (!t->done) { - uint16_t nb_rx = - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -182,9 +192,10 @@ pipeline_atq_worker_single_stage_burst_tx_vector(void *arg) ev[i].vec->queue = 0; } - pipeline_event_tx_burst(dev, port, ev, nb_rx); + nb_tx = pipeline_event_tx_burst(dev, port, ev, nb_rx, t); w->processed_pkts += vector_sz; } + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); return 0; } @@ -194,11 +205,11 @@ pipeline_atq_worker_single_stage_burst_fwd_vector(void *arg) { PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; const uint8_t *tx_queue = t->tx_evqueue_id; + uint16_t nb_rx = 0, nb_tx = 0; uint16_t vector_sz; while (!t->done) { - uint16_t nb_rx = - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -214,9 +225,10 @@ pipeline_atq_worker_single_stage_burst_fwd_vector(void *arg) RTE_SCHED_TYPE_ATOMIC); } - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); w->processed_pkts += vector_sz; } + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); return 0; } @@ -225,11 +237,12 @@ static __rte_noinline int pipeline_atq_worker_multi_stage_tx(void *arg) { PIPELINE_WORKER_MULTI_STAGE_INIT; + uint8_t enq = 0, deq = 0; while (t->done == false) { - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); - if (!event) { + if (!deq) { rte_pause(); continue; } @@ -237,15 +250,16 @@ pipeline_atq_worker_multi_stage_tx(void *arg) cq_id = ev.sub_event_type % nb_stages; if (cq_id == last_queue) { - pipeline_event_tx(dev, port, &ev); + enq = pipeline_event_tx(dev, port, &ev, t); w->processed_pkts++; continue; } ev.sub_event_type++; pipeline_fwd_event(&ev, sched_type_list[cq_id]); - pipeline_event_enqueue(dev, port, &ev); + enq = pipeline_event_enqueue(dev, port, &ev, t); } + pipeline_worker_cleanup(dev, port, &ev, enq, deq); return 0; } @@ -255,11 +269,12 @@ pipeline_atq_worker_multi_stage_fwd(void *arg) { PIPELINE_WORKER_MULTI_STAGE_INIT; const uint8_t *tx_queue = t->tx_evqueue_id; + uint8_t enq = 0, deq = 0; while (t->done == false) { - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); - if (!event) { + if (!deq) { rte_pause(); continue; } @@ -275,8 +290,9 @@ pipeline_atq_worker_multi_stage_fwd(void *arg) pipeline_fwd_event(&ev, sched_type_list[cq_id]); } - pipeline_event_enqueue(dev, port, &ev); + enq = pipeline_event_enqueue(dev, port, &ev, t); } + pipeline_worker_cleanup(dev, port, &ev, enq, deq); return 0; } @@ -285,10 +301,10 @@ static __rte_noinline int pipeline_atq_worker_multi_stage_burst_tx(void *arg) { PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; + uint16_t nb_rx = 0, nb_tx = 0; while (t->done == false) { - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, - BURST_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -300,7 +316,7 @@ pipeline_atq_worker_multi_stage_burst_tx(void *arg) cq_id = ev[i].sub_event_type % nb_stages; if (cq_id == last_queue) { - pipeline_event_tx(dev, port, &ev[i]); + pipeline_event_tx(dev, port, &ev[i], t); ev[i].op = RTE_EVENT_OP_RELEASE; w->processed_pkts++; continue; @@ -310,8 +326,9 @@ pipeline_atq_worker_multi_stage_burst_tx(void *arg) pipeline_fwd_event(&ev[i], sched_type_list[cq_id]); } - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); } + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); return 0; } @@ -321,10 +338,10 @@ pipeline_atq_worker_multi_stage_burst_fwd(void *arg) { PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; const uint8_t *tx_queue = t->tx_evqueue_id; + uint16_t nb_rx = 0, nb_tx = 0; while (t->done == false) { - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, - BURST_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -347,8 +364,9 @@ pipeline_atq_worker_multi_stage_burst_fwd(void *arg) } } - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); } + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); return 0; } @@ -357,12 +375,13 @@ static __rte_noinline int pipeline_atq_worker_multi_stage_tx_vector(void *arg) { PIPELINE_WORKER_MULTI_STAGE_INIT; + uint8_t enq = 0, deq = 0; uint16_t vector_sz; while (!t->done) { - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); - if (!event) { + if (!deq) { rte_pause(); continue; } @@ -371,15 +390,16 @@ pipeline_atq_worker_multi_stage_tx_vector(void *arg) if (cq_id == last_queue) { vector_sz = ev.vec->nb_elem; - pipeline_event_tx_vector(dev, port, &ev); + enq = pipeline_event_tx_vector(dev, port, &ev, t); w->processed_pkts += vector_sz; continue; } ev.sub_event_type++; pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]); - pipeline_event_enqueue(dev, port, &ev); + enq = pipeline_event_enqueue(dev, port, &ev, t); } + pipeline_worker_cleanup(dev, port, &ev, enq, deq); return 0; } @@ -389,12 +409,13 @@ pipeline_atq_worker_multi_stage_fwd_vector(void *arg) { PIPELINE_WORKER_MULTI_STAGE_INIT; const uint8_t *tx_queue = t->tx_evqueue_id; + uint8_t enq = 0, deq = 0; uint16_t vector_sz; while (!t->done) { - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); - if (!event) { + if (!deq) { rte_pause(); continue; } @@ -406,14 +427,15 @@ pipeline_atq_worker_multi_stage_fwd_vector(void *arg) ev.vec->queue = 0; vector_sz = ev.vec->nb_elem; pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC); - pipeline_event_enqueue(dev, port, &ev); + enq = pipeline_event_enqueue(dev, port, &ev, t); w->processed_pkts += vector_sz; } else { ev.sub_event_type++; pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]); - pipeline_event_enqueue(dev, port, &ev); + enq = pipeline_event_enqueue(dev, port, &ev, t); } } + pipeline_worker_cleanup(dev, port, &ev, enq, deq); return 0; } @@ -422,11 +444,11 @@ static __rte_noinline int pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg) { PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; + uint16_t nb_rx = 0, nb_tx = 0; uint16_t vector_sz; while (!t->done) { - uint16_t nb_rx = - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -438,7 +460,7 @@ pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg) if (cq_id == last_queue) { vector_sz = ev[i].vec->nb_elem; - pipeline_event_tx_vector(dev, port, &ev[i]); + pipeline_event_tx_vector(dev, port, &ev[i], t); ev[i].op = RTE_EVENT_OP_RELEASE; w->processed_pkts += vector_sz; continue; @@ -449,8 +471,9 @@ pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg) sched_type_list[cq_id]); } - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); } + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); return 0; } @@ -460,11 +483,11 @@ pipeline_atq_worker_multi_stage_burst_fwd_vector(void *arg) { PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; const uint8_t *tx_queue = t->tx_evqueue_id; + uint16_t nb_rx = 0, nb_tx = 0; uint16_t vector_sz; while (!t->done) { - uint16_t nb_rx = - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -488,8 +511,9 @@ pipeline_atq_worker_multi_stage_burst_fwd_vector(void *arg) } } - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); } + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); return 0; } diff --git a/app/test-eventdev/test_pipeline_common.c b/app/test-eventdev/test_pipeline_common.c index 29b64014d7..d8e80903b2 100644 --- a/app/test-eventdev/test_pipeline_common.c +++ b/app/test-eventdev/test_pipeline_common.c @@ -505,6 +505,45 @@ pipeline_event_tx_adapter_setup(struct evt_options *opt, return ret; } +static void +pipeline_vector_array_free(struct rte_event events[], uint16_t num) +{ + uint16_t i; + + for (i = 0; i < num; i++) { + rte_pktmbuf_free_bulk(events[i].vec->mbufs, + events[i].vec->nb_elem); + rte_mempool_put(rte_mempool_from_obj(events[i].vec), + events[i].vec); + } +} + +void +pipeline_worker_cleanup(uint8_t dev, uint8_t port, struct rte_event ev[], + uint16_t enq, uint16_t deq) +{ + int i; + + if (!(deq - enq)) + return; + + if (deq) { + for (i = enq; i < deq; i++) { + if (ev[i].op == RTE_EVENT_OP_RELEASE) + continue; + if (ev[i].event_type & RTE_EVENT_TYPE_VECTOR) + pipeline_vector_array_free(&ev[i], 1); + else + rte_pktmbuf_free(ev[i].mbuf); + } + + for (i = 0; i < deq; i++) + ev[i].op = RTE_EVENT_OP_RELEASE; + + rte_event_enqueue_burst(dev, port, ev, deq); + } +} + void pipeline_ethdev_rx_stop(struct evt_test *test, struct evt_options *opt) { diff --git a/app/test-eventdev/test_pipeline_common.h b/app/test-eventdev/test_pipeline_common.h index c979c33772..a6443faea4 100644 --- a/app/test-eventdev/test_pipeline_common.h +++ b/app/test-eventdev/test_pipeline_common.h @@ -109,59 +109,80 @@ pipeline_fwd_event_vector(struct rte_event *ev, uint8_t sched) ev->sched_type = sched; } -static __rte_always_inline void +static __rte_always_inline uint8_t pipeline_event_tx(const uint8_t dev, const uint8_t port, - struct rte_event * const ev) + struct rte_event *const ev, struct test_pipeline *t) { + uint8_t enq; + rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0); - while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0)) - rte_pause(); + do { + enq = rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0); + } while (!enq && !t->done); + + return enq; } -static __rte_always_inline void +static __rte_always_inline uint8_t pipeline_event_tx_vector(const uint8_t dev, const uint8_t port, - struct rte_event *const ev) + struct rte_event *const ev, struct test_pipeline *t) { + uint8_t enq; + ev->vec->queue = 0; + do { + enq = rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0); + } while (!enq && !t->done); - while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0)) - rte_pause(); + return enq; } -static __rte_always_inline void +static __rte_always_inline uint16_t pipeline_event_tx_burst(const uint8_t dev, const uint8_t port, - struct rte_event *ev, const uint16_t nb_rx) + struct rte_event *ev, const uint16_t nb_rx, + struct test_pipeline *t) { uint16_t enq; enq = rte_event_eth_tx_adapter_enqueue(dev, port, ev, nb_rx, 0); - while (enq < nb_rx) { + while (enq < nb_rx && !t->done) { enq += rte_event_eth_tx_adapter_enqueue(dev, port, ev + enq, nb_rx - enq, 0); } + + return enq; } -static __rte_always_inline void +static __rte_always_inline uint8_t pipeline_event_enqueue(const uint8_t dev, const uint8_t port, - struct rte_event *ev) + struct rte_event *ev, struct test_pipeline *t) { - while (rte_event_enqueue_burst(dev, port, ev, 1) != 1) - rte_pause(); + uint8_t enq; + + do { + enq = rte_event_enqueue_burst(dev, port, ev, 1); + } while (!enq && !t->done); + + return enq; } -static __rte_always_inline void +static __rte_always_inline uint16_t pipeline_event_enqueue_burst(const uint8_t dev, const uint8_t port, - struct rte_event *ev, const uint16_t nb_rx) + struct rte_event *ev, const uint16_t nb_rx, + struct test_pipeline *t) { uint16_t enq; enq = rte_event_enqueue_burst(dev, port, ev, nb_rx); - while (enq < nb_rx) { + while (enq < nb_rx && !t->done) { enq += rte_event_enqueue_burst(dev, port, ev + enq, nb_rx - enq); } + + return enq; } + static inline int pipeline_nb_event_ports(struct evt_options *opt) { @@ -188,5 +209,7 @@ void pipeline_eventdev_destroy(struct evt_test *test, struct evt_options *opt); void pipeline_ethdev_destroy(struct evt_test *test, struct evt_options *opt); void pipeline_ethdev_rx_stop(struct evt_test *test, struct evt_options *opt); void pipeline_mempool_destroy(struct evt_test *test, struct evt_options *opt); +void pipeline_worker_cleanup(uint8_t dev, uint8_t port, struct rte_event ev[], + uint16_t enq, uint16_t deq); #endif /* _TEST_PIPELINE_COMMON_ */ diff --git a/app/test-eventdev/test_pipeline_queue.c b/app/test-eventdev/test_pipeline_queue.c index 343f8f3b1d..e989396474 100644 --- a/app/test-eventdev/test_pipeline_queue.c +++ b/app/test-eventdev/test_pipeline_queue.c @@ -21,24 +21,27 @@ static __rte_noinline int pipeline_queue_worker_single_stage_tx(void *arg) { PIPELINE_WORKER_SINGLE_STAGE_INIT; + uint8_t enq = 0, deq = 0; while (t->done == false) { - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); - if (!event) { + if (!deq) { rte_pause(); continue; } if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { - pipeline_event_tx(dev, port, &ev); + enq = pipeline_event_tx(dev, port, &ev, t); + ev.op = RTE_EVENT_OP_RELEASE; w->processed_pkts++; } else { ev.queue_id++; pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); - pipeline_event_enqueue(dev, port, &ev); + enq = pipeline_event_enqueue(dev, port, &ev, t); } } + pipeline_worker_cleanup(dev, port, &ev, enq, deq); return 0; } @@ -48,11 +51,12 @@ pipeline_queue_worker_single_stage_fwd(void *arg) { PIPELINE_WORKER_SINGLE_STAGE_INIT; const uint8_t *tx_queue = t->tx_evqueue_id; + uint8_t enq = 0, deq = 0; while (t->done == false) { - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); - if (!event) { + if (!deq) { rte_pause(); continue; } @@ -60,9 +64,10 @@ pipeline_queue_worker_single_stage_fwd(void *arg) ev.queue_id = tx_queue[ev.mbuf->port]; rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0); pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); - pipeline_event_enqueue(dev, port, &ev); + enq = pipeline_event_enqueue(dev, port, &ev, t); w->processed_pkts++; } + pipeline_worker_cleanup(dev, port, &ev, enq, deq); return 0; } @@ -71,10 +76,10 @@ static __rte_noinline int pipeline_queue_worker_single_stage_burst_tx(void *arg) { PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; + uint16_t nb_rx = 0, nb_tx = 0; while (t->done == false) { - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, - BURST_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -84,17 +89,18 @@ pipeline_queue_worker_single_stage_burst_tx(void *arg) for (i = 0; i < nb_rx; i++) { rte_prefetch0(ev[i + 1].mbuf); if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { - pipeline_event_tx(dev, port, &ev[i]); + pipeline_event_tx(dev, port, &ev[i], t); + ev[i].op = RTE_EVENT_OP_RELEASE; w->processed_pkts++; } else { ev[i].queue_id++; pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); - pipeline_event_enqueue_burst(dev, port, ev, - nb_rx); } } + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); } + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); return 0; } @@ -104,10 +110,10 @@ pipeline_queue_worker_single_stage_burst_fwd(void *arg) { PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; const uint8_t *tx_queue = t->tx_evqueue_id; + uint16_t nb_rx = 0, nb_tx = 0; while (t->done == false) { - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, - BURST_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -121,9 +127,10 @@ pipeline_queue_worker_single_stage_burst_fwd(void *arg) pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); } - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); w->processed_pkts += nb_rx; } + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); return 0; } @@ -132,26 +139,29 @@ static __rte_noinline int pipeline_queue_worker_single_stage_tx_vector(void *arg) { PIPELINE_WORKER_SINGLE_STAGE_INIT; + uint8_t enq = 0, deq = 0; uint16_t vector_sz; while (!t->done) { - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); - if (!event) { + if (!deq) { rte_pause(); continue; } if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { vector_sz = ev.vec->nb_elem; - pipeline_event_tx_vector(dev, port, &ev); + enq = pipeline_event_tx_vector(dev, port, &ev, t); + ev.op = RTE_EVENT_OP_RELEASE; w->processed_pkts += vector_sz; } else { ev.queue_id++; pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC); - pipeline_event_enqueue(dev, port, &ev); + enq = pipeline_event_enqueue(dev, port, &ev, t); } } + pipeline_worker_cleanup(dev, port, &ev, enq, deq); return 0; } @@ -161,12 +171,13 @@ pipeline_queue_worker_single_stage_fwd_vector(void *arg) { PIPELINE_WORKER_SINGLE_STAGE_INIT; const uint8_t *tx_queue = t->tx_evqueue_id; + uint8_t enq = 0, deq = 0; uint16_t vector_sz; while (!t->done) { - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); - if (!event) { + if (!deq) { rte_pause(); continue; } @@ -175,9 +186,10 @@ pipeline_queue_worker_single_stage_fwd_vector(void *arg) ev.vec->queue = 0; vector_sz = ev.vec->nb_elem; pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC); - pipeline_event_enqueue(dev, port, &ev); + enq = pipeline_event_enqueue(dev, port, &ev, t); w->processed_pkts += vector_sz; } + pipeline_worker_cleanup(dev, port, &ev, enq, deq); return 0; } @@ -186,11 +198,11 @@ static __rte_noinline int pipeline_queue_worker_single_stage_burst_tx_vector(void *arg) { PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; + uint16_t nb_rx = 0, nb_tx = 0; uint16_t vector_sz; while (!t->done) { - uint16_t nb_rx = - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -200,7 +212,7 @@ pipeline_queue_worker_single_stage_burst_tx_vector(void *arg) for (i = 0; i < nb_rx; i++) { if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { vector_sz = ev[i].vec->nb_elem; - pipeline_event_tx_vector(dev, port, &ev[i]); + pipeline_event_tx_vector(dev, port, &ev[i], t); ev[i].op = RTE_EVENT_OP_RELEASE; w->processed_pkts += vector_sz; } else { @@ -210,8 +222,9 @@ pipeline_queue_worker_single_stage_burst_tx_vector(void *arg) } } - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); } + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); return 0; } @@ -221,11 +234,11 @@ pipeline_queue_worker_single_stage_burst_fwd_vector(void *arg) { PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; const uint8_t *tx_queue = t->tx_evqueue_id; + uint16_t nb_rx = 0, nb_tx = 0; uint16_t vector_sz; while (!t->done) { - uint16_t nb_rx = - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -241,9 +254,10 @@ pipeline_queue_worker_single_stage_burst_fwd_vector(void *arg) RTE_SCHED_TYPE_ATOMIC); } - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); w->processed_pkts += vector_sz; } + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); return 0; } @@ -253,11 +267,12 @@ pipeline_queue_worker_multi_stage_tx(void *arg) { PIPELINE_WORKER_MULTI_STAGE_INIT; const uint8_t *tx_queue = t->tx_evqueue_id; + uint8_t enq = 0, deq = 0; while (t->done == false) { - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); - if (!event) { + if (!deq) { rte_pause(); continue; } @@ -265,7 +280,8 @@ pipeline_queue_worker_multi_stage_tx(void *arg) cq_id = ev.queue_id % nb_stages; if (ev.queue_id == tx_queue[ev.mbuf->port]) { - pipeline_event_tx(dev, port, &ev); + enq = pipeline_event_tx(dev, port, &ev, t); + ev.op = RTE_EVENT_OP_RELEASE; w->processed_pkts++; continue; } @@ -274,8 +290,9 @@ pipeline_queue_worker_multi_stage_tx(void *arg) pipeline_fwd_event(&ev, cq_id != last_queue ? sched_type_list[cq_id] : RTE_SCHED_TYPE_ATOMIC); - pipeline_event_enqueue(dev, port, &ev); + enq = pipeline_event_enqueue(dev, port, &ev, t); } + pipeline_worker_cleanup(dev, port, &ev, enq, deq); return 0; } @@ -285,11 +302,12 @@ pipeline_queue_worker_multi_stage_fwd(void *arg) { PIPELINE_WORKER_MULTI_STAGE_INIT; const uint8_t *tx_queue = t->tx_evqueue_id; + uint8_t enq = 0, deq = 0; while (t->done == false) { - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); - if (!event) { + if (!deq) { rte_pause(); continue; } @@ -300,14 +318,15 @@ pipeline_queue_worker_multi_stage_fwd(void *arg) ev.queue_id = tx_queue[ev.mbuf->port]; rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0); pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); - pipeline_event_enqueue(dev, port, &ev); + enq = pipeline_event_enqueue(dev, port, &ev, t); w->processed_pkts++; } else { ev.queue_id++; pipeline_fwd_event(&ev, sched_type_list[cq_id]); - pipeline_event_enqueue(dev, port, &ev); + enq = pipeline_event_enqueue(dev, port, &ev, t); } } + pipeline_worker_cleanup(dev, port, &ev, enq, deq); return 0; } @@ -317,10 +336,10 @@ pipeline_queue_worker_multi_stage_burst_tx(void *arg) { PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; const uint8_t *tx_queue = t->tx_evqueue_id; + uint16_t nb_rx = 0, nb_tx = 0; while (t->done == false) { - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, - BURST_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -332,7 +351,8 @@ pipeline_queue_worker_multi_stage_burst_tx(void *arg) cq_id = ev[i].queue_id % nb_stages; if (ev[i].queue_id == tx_queue[ev[i].mbuf->port]) { - pipeline_event_tx(dev, port, &ev[i]); + pipeline_event_tx(dev, port, &ev[i], t); + ev[i].op = RTE_EVENT_OP_RELEASE; w->processed_pkts++; continue; } @@ -341,9 +361,10 @@ pipeline_queue_worker_multi_stage_burst_tx(void *arg) pipeline_fwd_event(&ev[i], cq_id != last_queue ? sched_type_list[cq_id] : RTE_SCHED_TYPE_ATOMIC); - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); } + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); } + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); return 0; } @@ -353,11 +374,11 @@ pipeline_queue_worker_multi_stage_burst_fwd(void *arg) { PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; const uint8_t *tx_queue = t->tx_evqueue_id; + uint16_t nb_rx = 0, nb_tx = 0; while (t->done == false) { uint16_t processed_pkts = 0; - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, - BURST_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -381,9 +402,10 @@ pipeline_queue_worker_multi_stage_burst_fwd(void *arg) } } - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); w->processed_pkts += processed_pkts; } + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); return 0; } @@ -393,12 +415,13 @@ pipeline_queue_worker_multi_stage_tx_vector(void *arg) { PIPELINE_WORKER_MULTI_STAGE_INIT; const uint8_t *tx_queue = t->tx_evqueue_id; + uint8_t enq = 0, deq = 0; uint16_t vector_sz; while (!t->done) { - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); - if (!event) { + if (!deq) { rte_pause(); continue; } @@ -407,8 +430,9 @@ pipeline_queue_worker_multi_stage_tx_vector(void *arg) if (ev.queue_id == tx_queue[ev.vec->port]) { vector_sz = ev.vec->nb_elem; - pipeline_event_tx_vector(dev, port, &ev); + enq = pipeline_event_tx_vector(dev, port, &ev, t); w->processed_pkts += vector_sz; + ev.op = RTE_EVENT_OP_RELEASE; continue; } @@ -416,8 +440,9 @@ pipeline_queue_worker_multi_stage_tx_vector(void *arg) pipeline_fwd_event_vector(&ev, cq_id != last_queue ? sched_type_list[cq_id] : RTE_SCHED_TYPE_ATOMIC); - pipeline_event_enqueue(dev, port, &ev); + enq = pipeline_event_enqueue(dev, port, &ev, t); } + pipeline_worker_cleanup(dev, port, &ev, enq, deq); return 0; } @@ -427,12 +452,13 @@ pipeline_queue_worker_multi_stage_fwd_vector(void *arg) { PIPELINE_WORKER_MULTI_STAGE_INIT; const uint8_t *tx_queue = t->tx_evqueue_id; + uint8_t enq = 0, deq = 0; uint16_t vector_sz; while (!t->done) { - uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); + deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); - if (!event) { + if (!deq) { rte_pause(); continue; } @@ -449,8 +475,9 @@ pipeline_queue_worker_multi_stage_fwd_vector(void *arg) pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]); } - pipeline_event_enqueue(dev, port, &ev); + enq = pipeline_event_enqueue(dev, port, &ev, t); } + pipeline_worker_cleanup(dev, port, &ev, enq, deq); return 0; } @@ -460,11 +487,11 @@ pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg) { PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; const uint8_t *tx_queue = t->tx_evqueue_id; + uint16_t nb_rx = 0, nb_tx = 0; uint16_t vector_sz; while (!t->done) { - uint16_t nb_rx = - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -476,7 +503,7 @@ pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg) if (ev[i].queue_id == tx_queue[ev[i].vec->port]) { vector_sz = ev[i].vec->nb_elem; - pipeline_event_tx_vector(dev, port, &ev[i]); + pipeline_event_tx_vector(dev, port, &ev[i], t); ev[i].op = RTE_EVENT_OP_RELEASE; w->processed_pkts += vector_sz; continue; @@ -489,8 +516,9 @@ pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg) : RTE_SCHED_TYPE_ATOMIC); } - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); } + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); return 0; } @@ -500,11 +528,11 @@ pipeline_queue_worker_multi_stage_burst_fwd_vector(void *arg) { PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; const uint8_t *tx_queue = t->tx_evqueue_id; + uint16_t nb_rx = 0, nb_tx = 0; uint16_t vector_sz; while (!t->done) { - uint16_t nb_rx = - rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -527,8 +555,9 @@ pipeline_queue_worker_multi_stage_burst_fwd_vector(void *arg) } } - pipeline_event_enqueue_burst(dev, port, ev, nb_rx); + nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); } + pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); return 0; }