@@ -37,13 +37,14 @@ atq_fwd_event(struct rte_event *const ev, uint8_t *const sched_type_list,
static int
perf_atq_worker(void *arg, const int enable_fwd_latency)
{
- PERF_WORKER_INIT;
+ uint16_t enq = 0, deq = 0;
struct rte_event ev;
+ PERF_WORKER_INIT;
while (t->done == false) {
- uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+ deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
- if (!event) {
+ if (!deq) {
rte_pause();
continue;
}
@@ -78,24 +79,29 @@ perf_atq_worker(void *arg, const int enable_fwd_latency)
bufs, sz, cnt);
} else {
atq_fwd_event(&ev, sched_type_list, nb_stages);
- while (rte_event_enqueue_burst(dev, port, &ev, 1) != 1)
- rte_pause();
+ do {
+ enq = rte_event_enqueue_burst(dev, port, &ev,
+ 1);
+ } while (!enq && !t->done);
}
}
+
+ perf_worker_cleanup(pool, dev, port, &ev, enq, deq);
+
return 0;
}
static int
perf_atq_worker_burst(void *arg, const int enable_fwd_latency)
{
- PERF_WORKER_INIT;
- uint16_t i;
/* +1 to avoid prefetch out of array check */
struct rte_event ev[BURST_SIZE + 1];
+ uint16_t enq = 0, nb_rx = 0;
+ PERF_WORKER_INIT;
+ uint16_t i;
while (t->done == false) {
- uint16_t const nb_rx = rte_event_dequeue_burst(dev, port, ev,
- BURST_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
if (!nb_rx) {
rte_pause();
@@ -146,14 +152,15 @@ perf_atq_worker_burst(void *arg, const int enable_fwd_latency)
}
}
- uint16_t enq;
-
enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
- while (enq < nb_rx) {
+ while ((enq < nb_rx) && !t->done) {
enq += rte_event_enqueue_burst(dev, port,
ev + enq, nb_rx - enq);
}
}
+
+ perf_worker_cleanup(pool, dev, port, ev, enq, nb_rx);
+
return 0;
}
@@ -985,6 +985,23 @@ perf_opt_dump(struct evt_options *opt, uint8_t nb_queues)
evt_dump("prod_enq_burst_sz", "%d", opt->prod_enq_burst_sz);
}
+void
+perf_worker_cleanup(struct rte_mempool *const pool, uint8_t dev_id,
+ uint8_t port_id, struct rte_event events[], uint16_t nb_enq,
+ uint16_t nb_deq)
+{
+ int i;
+
+ if (nb_deq) {
+ for (i = nb_enq; i < nb_deq; i++)
+ rte_mempool_put(pool, events[i].event_ptr);
+
+ for (i = 0; i < nb_deq; i++)
+ events[i].op = RTE_EVENT_OP_RELEASE;
+ rte_event_enqueue_burst(dev_id, port_id, events, nb_deq);
+ }
+}
+
void
perf_eventdev_destroy(struct evt_test *test, struct evt_options *opt)
{
@@ -184,5 +184,8 @@ void perf_cryptodev_destroy(struct evt_test *test, struct evt_options *opt);
void perf_ethdev_destroy(struct evt_test *test, struct evt_options *opt);
void perf_ethdev_rx_stop(struct evt_test *test, struct evt_options *opt);
void perf_mempool_destroy(struct evt_test *test, struct evt_options *opt);
+void perf_worker_cleanup(struct rte_mempool *const pool, uint8_t dev_id,
+ uint8_t port_id, struct rte_event events[],
+ uint16_t nb_enq, uint16_t nb_deq);
#endif /* _TEST_PERF_COMMON_ */
@@ -39,13 +39,14 @@ fwd_event(struct rte_event *const ev, uint8_t *const sched_type_list,
static int
perf_queue_worker(void *arg, const int enable_fwd_latency)
{
- PERF_WORKER_INIT;
+ uint16_t enq = 0, deq = 0;
struct rte_event ev;
+ PERF_WORKER_INIT;
while (t->done == false) {
- uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+ deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
- if (!event) {
+ if (!deq) {
rte_pause();
continue;
}
@@ -80,24 +81,29 @@ perf_queue_worker(void *arg, const int enable_fwd_latency)
&ev, w, bufs, sz, cnt);
} else {
fwd_event(&ev, sched_type_list, nb_stages);
- while (rte_event_enqueue_burst(dev, port, &ev, 1) != 1)
- rte_pause();
+ do {
+ enq = rte_event_enqueue_burst(dev, port, &ev,
+ 1);
+ } while (!enq && !t->done);
}
}
+
+ perf_worker_cleanup(pool, dev, port, &ev, enq, deq);
+
return 0;
}
static int
perf_queue_worker_burst(void *arg, const int enable_fwd_latency)
{
- PERF_WORKER_INIT;
- uint16_t i;
/* +1 to avoid prefetch out of array check */
struct rte_event ev[BURST_SIZE + 1];
+ uint16_t enq = 0, nb_rx = 0;
+ PERF_WORKER_INIT;
+ uint16_t i;
while (t->done == false) {
- uint16_t const nb_rx = rte_event_dequeue_burst(dev, port, ev,
- BURST_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
if (!nb_rx) {
rte_pause();
@@ -147,14 +153,16 @@ perf_queue_worker_burst(void *arg, const int enable_fwd_latency)
}
}
- uint16_t enq;
enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
- while (enq < nb_rx) {
+ while (enq < nb_rx && !t->done) {
enq += rte_event_enqueue_burst(dev, port,
ev + enq, nb_rx - enq);
}
}
+
+ perf_worker_cleanup(pool, dev, port, ev, enq, nb_rx);
+
return 0;
}
@@ -21,18 +21,20 @@ static __rte_noinline int
pipeline_atq_worker_single_stage_tx(void *arg)
{
PIPELINE_WORKER_SINGLE_STAGE_INIT;
+ uint8_t enq = 0, deq = 0;
while (t->done == false) {
- uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+ deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
- if (!event) {
+ if (!deq) {
rte_pause();
continue;
}
- pipeline_event_tx(dev, port, &ev);
+ deq = pipeline_event_tx(dev, port, &ev, t);
w->processed_pkts++;
}
+ pipeline_worker_cleanup(dev, port, &ev, enq, deq);
return 0;
}
@@ -42,20 +44,22 @@ pipeline_atq_worker_single_stage_fwd(void *arg)
{
PIPELINE_WORKER_SINGLE_STAGE_INIT;
const uint8_t *tx_queue = t->tx_evqueue_id;
+ uint8_t enq = 0, deq = 0;
while (t->done == false) {
- uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+ deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
- if (!event) {
+ if (!deq) {
rte_pause();
continue;
}
ev.queue_id = tx_queue[ev.mbuf->port];
pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
- pipeline_event_enqueue(dev, port, &ev);
+ enq = pipeline_event_enqueue(dev, port, &ev, t);
w->processed_pkts++;
}
+ pipeline_worker_cleanup(dev, port, &ev, enq, deq);
return 0;
}
@@ -64,10 +68,10 @@ static __rte_noinline int
pipeline_atq_worker_single_stage_burst_tx(void *arg)
{
PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
+ uint16_t nb_rx = 0, nb_tx = 0;
while (t->done == false) {
- uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
- BURST_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
if (!nb_rx) {
rte_pause();
@@ -79,9 +83,10 @@ pipeline_atq_worker_single_stage_burst_tx(void *arg)
rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
}
- pipeline_event_tx_burst(dev, port, ev, nb_rx);
- w->processed_pkts += nb_rx;
+ nb_tx = pipeline_event_tx_burst(dev, port, ev, nb_rx, t);
+ w->processed_pkts += nb_tx;
}
+ pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
return 0;
}
@@ -91,10 +96,10 @@ pipeline_atq_worker_single_stage_burst_fwd(void *arg)
{
PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
const uint8_t *tx_queue = t->tx_evqueue_id;
+ uint16_t nb_rx = 0, nb_tx = 0;
while (t->done == false) {
- uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
- BURST_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
if (!nb_rx) {
rte_pause();
@@ -108,9 +113,10 @@ pipeline_atq_worker_single_stage_burst_fwd(void *arg)
pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
}
- pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
- w->processed_pkts += nb_rx;
+ nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
+ w->processed_pkts += nb_tx;
}
+ pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
return 0;
}
@@ -119,19 +125,21 @@ static __rte_noinline int
pipeline_atq_worker_single_stage_tx_vector(void *arg)
{
PIPELINE_WORKER_SINGLE_STAGE_INIT;
+ uint8_t enq = 0, deq = 0;
uint16_t vector_sz;
while (!t->done) {
- uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+ deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
- if (!event) {
+ if (!deq) {
rte_pause();
continue;
}
vector_sz = ev.vec->nb_elem;
- pipeline_event_tx_vector(dev, port, &ev);
+ enq = pipeline_event_tx_vector(dev, port, &ev, t);
w->processed_pkts += vector_sz;
}
+ pipeline_worker_cleanup(dev, port, &ev, enq, deq);
return 0;
}
@@ -141,12 +149,13 @@ pipeline_atq_worker_single_stage_fwd_vector(void *arg)
{
PIPELINE_WORKER_SINGLE_STAGE_INIT;
const uint8_t *tx_queue = t->tx_evqueue_id;
+ uint8_t enq = 0, deq = 0;
uint16_t vector_sz;
while (!t->done) {
- uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+ deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
- if (!event) {
+ if (!deq) {
rte_pause();
continue;
}
@@ -155,9 +164,10 @@ pipeline_atq_worker_single_stage_fwd_vector(void *arg)
ev.queue_id = tx_queue[ev.vec->port];
ev.vec->queue = 0;
pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
- pipeline_event_enqueue(dev, port, &ev);
+ enq = pipeline_event_enqueue(dev, port, &ev, t);
w->processed_pkts += vector_sz;
}
+ pipeline_worker_cleanup(dev, port, &ev, enq, deq);
return 0;
}
@@ -166,11 +176,11 @@ static __rte_noinline int
pipeline_atq_worker_single_stage_burst_tx_vector(void *arg)
{
PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
+ uint16_t nb_rx = 0, nb_tx = 0;
uint16_t vector_sz;
while (!t->done) {
- uint16_t nb_rx =
- rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
if (!nb_rx) {
rte_pause();
@@ -182,9 +192,10 @@ pipeline_atq_worker_single_stage_burst_tx_vector(void *arg)
ev[i].vec->queue = 0;
}
- pipeline_event_tx_burst(dev, port, ev, nb_rx);
+ nb_tx = pipeline_event_tx_burst(dev, port, ev, nb_rx, t);
w->processed_pkts += vector_sz;
}
+ pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
return 0;
}
@@ -194,11 +205,11 @@ pipeline_atq_worker_single_stage_burst_fwd_vector(void *arg)
{
PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
const uint8_t *tx_queue = t->tx_evqueue_id;
+ uint16_t nb_rx = 0, nb_tx = 0;
uint16_t vector_sz;
while (!t->done) {
- uint16_t nb_rx =
- rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
if (!nb_rx) {
rte_pause();
@@ -214,9 +225,10 @@ pipeline_atq_worker_single_stage_burst_fwd_vector(void *arg)
RTE_SCHED_TYPE_ATOMIC);
}
- pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+ nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
w->processed_pkts += vector_sz;
}
+ pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
return 0;
}
@@ -225,11 +237,12 @@ static __rte_noinline int
pipeline_atq_worker_multi_stage_tx(void *arg)
{
PIPELINE_WORKER_MULTI_STAGE_INIT;
+ uint8_t enq = 0, deq = 0;
while (t->done == false) {
- uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+ deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
- if (!event) {
+ if (!deq) {
rte_pause();
continue;
}
@@ -237,15 +250,16 @@ pipeline_atq_worker_multi_stage_tx(void *arg)
cq_id = ev.sub_event_type % nb_stages;
if (cq_id == last_queue) {
- pipeline_event_tx(dev, port, &ev);
+ enq = pipeline_event_tx(dev, port, &ev, t);
w->processed_pkts++;
continue;
}
ev.sub_event_type++;
pipeline_fwd_event(&ev, sched_type_list[cq_id]);
- pipeline_event_enqueue(dev, port, &ev);
+ enq = pipeline_event_enqueue(dev, port, &ev, t);
}
+ pipeline_worker_cleanup(dev, port, &ev, enq, deq);
return 0;
}
@@ -255,11 +269,12 @@ pipeline_atq_worker_multi_stage_fwd(void *arg)
{
PIPELINE_WORKER_MULTI_STAGE_INIT;
const uint8_t *tx_queue = t->tx_evqueue_id;
+ uint8_t enq = 0, deq = 0;
while (t->done == false) {
- uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+ deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
- if (!event) {
+ if (!deq) {
rte_pause();
continue;
}
@@ -275,8 +290,9 @@ pipeline_atq_worker_multi_stage_fwd(void *arg)
pipeline_fwd_event(&ev, sched_type_list[cq_id]);
}
- pipeline_event_enqueue(dev, port, &ev);
+ enq = pipeline_event_enqueue(dev, port, &ev, t);
}
+ pipeline_worker_cleanup(dev, port, &ev, enq, deq);
return 0;
}
@@ -285,10 +301,10 @@ static __rte_noinline int
pipeline_atq_worker_multi_stage_burst_tx(void *arg)
{
PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
+ uint16_t nb_rx = 0, nb_tx = 0;
while (t->done == false) {
- uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
- BURST_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
if (!nb_rx) {
rte_pause();
@@ -300,7 +316,7 @@ pipeline_atq_worker_multi_stage_burst_tx(void *arg)
cq_id = ev[i].sub_event_type % nb_stages;
if (cq_id == last_queue) {
- pipeline_event_tx(dev, port, &ev[i]);
+ pipeline_event_tx(dev, port, &ev[i], t);
ev[i].op = RTE_EVENT_OP_RELEASE;
w->processed_pkts++;
continue;
@@ -310,8 +326,9 @@ pipeline_atq_worker_multi_stage_burst_tx(void *arg)
pipeline_fwd_event(&ev[i], sched_type_list[cq_id]);
}
- pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+ nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
}
+ pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
return 0;
}
@@ -321,10 +338,10 @@ pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
{
PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
const uint8_t *tx_queue = t->tx_evqueue_id;
+ uint16_t nb_rx = 0, nb_tx = 0;
while (t->done == false) {
- uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
- BURST_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
if (!nb_rx) {
rte_pause();
@@ -347,8 +364,9 @@ pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
}
}
- pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+ nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
}
+ pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
return 0;
}
@@ -357,12 +375,13 @@ static __rte_noinline int
pipeline_atq_worker_multi_stage_tx_vector(void *arg)
{
PIPELINE_WORKER_MULTI_STAGE_INIT;
+ uint8_t enq = 0, deq = 0;
uint16_t vector_sz;
while (!t->done) {
- uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+ deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
- if (!event) {
+ if (!deq) {
rte_pause();
continue;
}
@@ -371,15 +390,16 @@ pipeline_atq_worker_multi_stage_tx_vector(void *arg)
if (cq_id == last_queue) {
vector_sz = ev.vec->nb_elem;
- pipeline_event_tx_vector(dev, port, &ev);
+ enq = pipeline_event_tx_vector(dev, port, &ev, t);
w->processed_pkts += vector_sz;
continue;
}
ev.sub_event_type++;
pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
- pipeline_event_enqueue(dev, port, &ev);
+ enq = pipeline_event_enqueue(dev, port, &ev, t);
}
+ pipeline_worker_cleanup(dev, port, &ev, enq, deq);
return 0;
}
@@ -389,12 +409,13 @@ pipeline_atq_worker_multi_stage_fwd_vector(void *arg)
{
PIPELINE_WORKER_MULTI_STAGE_INIT;
const uint8_t *tx_queue = t->tx_evqueue_id;
+ uint8_t enq = 0, deq = 0;
uint16_t vector_sz;
while (!t->done) {
- uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+ deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
- if (!event) {
+ if (!deq) {
rte_pause();
continue;
}
@@ -406,14 +427,15 @@ pipeline_atq_worker_multi_stage_fwd_vector(void *arg)
ev.vec->queue = 0;
vector_sz = ev.vec->nb_elem;
pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
- pipeline_event_enqueue(dev, port, &ev);
+ enq = pipeline_event_enqueue(dev, port, &ev, t);
w->processed_pkts += vector_sz;
} else {
ev.sub_event_type++;
pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
- pipeline_event_enqueue(dev, port, &ev);
+ enq = pipeline_event_enqueue(dev, port, &ev, t);
}
}
+ pipeline_worker_cleanup(dev, port, &ev, enq, deq);
return 0;
}
@@ -422,11 +444,11 @@ static __rte_noinline int
pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg)
{
PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
+ uint16_t nb_rx = 0, nb_tx = 0;
uint16_t vector_sz;
while (!t->done) {
- uint16_t nb_rx =
- rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
if (!nb_rx) {
rte_pause();
@@ -438,7 +460,7 @@ pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg)
if (cq_id == last_queue) {
vector_sz = ev[i].vec->nb_elem;
- pipeline_event_tx_vector(dev, port, &ev[i]);
+ pipeline_event_tx_vector(dev, port, &ev[i], t);
ev[i].op = RTE_EVENT_OP_RELEASE;
w->processed_pkts += vector_sz;
continue;
@@ -449,8 +471,9 @@ pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg)
sched_type_list[cq_id]);
}
- pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+ nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
}
+ pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
return 0;
}
@@ -460,11 +483,11 @@ pipeline_atq_worker_multi_stage_burst_fwd_vector(void *arg)
{
PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
const uint8_t *tx_queue = t->tx_evqueue_id;
+ uint16_t nb_rx = 0, nb_tx = 0;
uint16_t vector_sz;
while (!t->done) {
- uint16_t nb_rx =
- rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
if (!nb_rx) {
rte_pause();
@@ -488,8 +511,9 @@ pipeline_atq_worker_multi_stage_burst_fwd_vector(void *arg)
}
}
- pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+ nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
}
+ pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
return 0;
}
@@ -505,6 +505,45 @@ pipeline_event_tx_adapter_setup(struct evt_options *opt,
return ret;
}
+static void
+pipeline_vector_array_free(struct rte_event events[], uint16_t num)
+{
+ uint16_t i;
+
+ for (i = 0; i < num; i++) {
+ rte_pktmbuf_free_bulk(events[i].vec->mbufs,
+ events[i].vec->nb_elem);
+ rte_mempool_put(rte_mempool_from_obj(events[i].vec),
+ events[i].vec);
+ }
+}
+
+void
+pipeline_worker_cleanup(uint8_t dev, uint8_t port, struct rte_event ev[],
+ uint16_t enq, uint16_t deq)
+{
+ int i;
+
+ if (!(deq - enq))
+ return;
+
+ if (deq) {
+ for (i = enq; i < deq; i++) {
+ if (ev[i].op == RTE_EVENT_OP_RELEASE)
+ continue;
+ if (ev[i].event_type & RTE_EVENT_TYPE_VECTOR)
+ pipeline_vector_array_free(&ev[i], 1);
+ else
+ rte_pktmbuf_free(ev[i].mbuf);
+ }
+
+ for (i = 0; i < deq; i++)
+ ev[i].op = RTE_EVENT_OP_RELEASE;
+
+ rte_event_enqueue_burst(dev, port, ev, deq);
+ }
+}
+
void
pipeline_ethdev_rx_stop(struct evt_test *test, struct evt_options *opt)
{
@@ -109,59 +109,80 @@ pipeline_fwd_event_vector(struct rte_event *ev, uint8_t sched)
ev->sched_type = sched;
}
-static __rte_always_inline void
+static __rte_always_inline uint8_t
pipeline_event_tx(const uint8_t dev, const uint8_t port,
- struct rte_event * const ev)
+ struct rte_event *const ev, struct test_pipeline *t)
{
+ uint8_t enq;
+
rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0);
- while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0))
- rte_pause();
+ do {
+ enq = rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0);
+ } while (!enq && !t->done);
+
+ return enq;
}
-static __rte_always_inline void
+static __rte_always_inline uint8_t
pipeline_event_tx_vector(const uint8_t dev, const uint8_t port,
- struct rte_event *const ev)
+ struct rte_event *const ev, struct test_pipeline *t)
{
+ uint8_t enq;
+
ev->vec->queue = 0;
+ do {
+ enq = rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0);
+ } while (!enq && !t->done);
- while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0))
- rte_pause();
+ return enq;
}
-static __rte_always_inline void
+static __rte_always_inline uint16_t
pipeline_event_tx_burst(const uint8_t dev, const uint8_t port,
- struct rte_event *ev, const uint16_t nb_rx)
+ struct rte_event *ev, const uint16_t nb_rx,
+ struct test_pipeline *t)
{
uint16_t enq;
enq = rte_event_eth_tx_adapter_enqueue(dev, port, ev, nb_rx, 0);
- while (enq < nb_rx) {
+ while (enq < nb_rx && !t->done) {
enq += rte_event_eth_tx_adapter_enqueue(dev, port,
ev + enq, nb_rx - enq, 0);
}
+
+ return enq;
}
-static __rte_always_inline void
+static __rte_always_inline uint8_t
pipeline_event_enqueue(const uint8_t dev, const uint8_t port,
- struct rte_event *ev)
+ struct rte_event *ev, struct test_pipeline *t)
{
- while (rte_event_enqueue_burst(dev, port, ev, 1) != 1)
- rte_pause();
+ uint8_t enq;
+
+ do {
+ enq = rte_event_enqueue_burst(dev, port, ev, 1);
+ } while (!enq && !t->done);
+
+ return enq;
}
-static __rte_always_inline void
+static __rte_always_inline uint16_t
pipeline_event_enqueue_burst(const uint8_t dev, const uint8_t port,
- struct rte_event *ev, const uint16_t nb_rx)
+ struct rte_event *ev, const uint16_t nb_rx,
+ struct test_pipeline *t)
{
uint16_t enq;
enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
- while (enq < nb_rx) {
+ while (enq < nb_rx && !t->done) {
enq += rte_event_enqueue_burst(dev, port,
ev + enq, nb_rx - enq);
}
+
+ return enq;
}
+
static inline int
pipeline_nb_event_ports(struct evt_options *opt)
{
@@ -188,5 +209,7 @@ void pipeline_eventdev_destroy(struct evt_test *test, struct evt_options *opt);
void pipeline_ethdev_destroy(struct evt_test *test, struct evt_options *opt);
void pipeline_ethdev_rx_stop(struct evt_test *test, struct evt_options *opt);
void pipeline_mempool_destroy(struct evt_test *test, struct evt_options *opt);
+void pipeline_worker_cleanup(uint8_t dev, uint8_t port, struct rte_event ev[],
+ uint16_t enq, uint16_t deq);
#endif /* _TEST_PIPELINE_COMMON_ */
@@ -21,24 +21,27 @@ static __rte_noinline int
pipeline_queue_worker_single_stage_tx(void *arg)
{
PIPELINE_WORKER_SINGLE_STAGE_INIT;
+ uint8_t enq = 0, deq = 0;
while (t->done == false) {
- uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+ deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
- if (!event) {
+ if (!deq) {
rte_pause();
continue;
}
if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
- pipeline_event_tx(dev, port, &ev);
+ enq = pipeline_event_tx(dev, port, &ev, t);
+ ev.op = RTE_EVENT_OP_RELEASE;
w->processed_pkts++;
} else {
ev.queue_id++;
pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
- pipeline_event_enqueue(dev, port, &ev);
+ enq = pipeline_event_enqueue(dev, port, &ev, t);
}
}
+ pipeline_worker_cleanup(dev, port, &ev, enq, deq);
return 0;
}
@@ -48,11 +51,12 @@ pipeline_queue_worker_single_stage_fwd(void *arg)
{
PIPELINE_WORKER_SINGLE_STAGE_INIT;
const uint8_t *tx_queue = t->tx_evqueue_id;
+ uint8_t enq = 0, deq = 0;
while (t->done == false) {
- uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+ deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
- if (!event) {
+ if (!deq) {
rte_pause();
continue;
}
@@ -60,9 +64,10 @@ pipeline_queue_worker_single_stage_fwd(void *arg)
ev.queue_id = tx_queue[ev.mbuf->port];
rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
- pipeline_event_enqueue(dev, port, &ev);
+ enq = pipeline_event_enqueue(dev, port, &ev, t);
w->processed_pkts++;
}
+ pipeline_worker_cleanup(dev, port, &ev, enq, deq);
return 0;
}
@@ -71,10 +76,10 @@ static __rte_noinline int
pipeline_queue_worker_single_stage_burst_tx(void *arg)
{
PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
+ uint16_t nb_rx = 0, nb_tx = 0;
while (t->done == false) {
- uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
- BURST_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
if (!nb_rx) {
rte_pause();
@@ -84,17 +89,18 @@ pipeline_queue_worker_single_stage_burst_tx(void *arg)
for (i = 0; i < nb_rx; i++) {
rte_prefetch0(ev[i + 1].mbuf);
if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
- pipeline_event_tx(dev, port, &ev[i]);
+ pipeline_event_tx(dev, port, &ev[i], t);
+ ev[i].op = RTE_EVENT_OP_RELEASE;
w->processed_pkts++;
} else {
ev[i].queue_id++;
pipeline_fwd_event(&ev[i],
RTE_SCHED_TYPE_ATOMIC);
- pipeline_event_enqueue_burst(dev, port, ev,
- nb_rx);
}
}
+ nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
}
+ pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
return 0;
}
@@ -104,10 +110,10 @@ pipeline_queue_worker_single_stage_burst_fwd(void *arg)
{
PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
const uint8_t *tx_queue = t->tx_evqueue_id;
+ uint16_t nb_rx = 0, nb_tx = 0;
while (t->done == false) {
- uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
- BURST_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
if (!nb_rx) {
rte_pause();
@@ -121,9 +127,10 @@ pipeline_queue_worker_single_stage_burst_fwd(void *arg)
pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
}
- pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+ nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
w->processed_pkts += nb_rx;
}
+ pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
return 0;
}
@@ -132,26 +139,29 @@ static __rte_noinline int
pipeline_queue_worker_single_stage_tx_vector(void *arg)
{
PIPELINE_WORKER_SINGLE_STAGE_INIT;
+ uint8_t enq = 0, deq = 0;
uint16_t vector_sz;
while (!t->done) {
- uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+ deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
- if (!event) {
+ if (!deq) {
rte_pause();
continue;
}
if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
vector_sz = ev.vec->nb_elem;
- pipeline_event_tx_vector(dev, port, &ev);
+ enq = pipeline_event_tx_vector(dev, port, &ev, t);
+ ev.op = RTE_EVENT_OP_RELEASE;
w->processed_pkts += vector_sz;
} else {
ev.queue_id++;
pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
- pipeline_event_enqueue(dev, port, &ev);
+ enq = pipeline_event_enqueue(dev, port, &ev, t);
}
}
+ pipeline_worker_cleanup(dev, port, &ev, enq, deq);
return 0;
}
@@ -161,12 +171,13 @@ pipeline_queue_worker_single_stage_fwd_vector(void *arg)
{
PIPELINE_WORKER_SINGLE_STAGE_INIT;
const uint8_t *tx_queue = t->tx_evqueue_id;
+ uint8_t enq = 0, deq = 0;
uint16_t vector_sz;
while (!t->done) {
- uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+ deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
- if (!event) {
+ if (!deq) {
rte_pause();
continue;
}
@@ -175,9 +186,10 @@ pipeline_queue_worker_single_stage_fwd_vector(void *arg)
ev.vec->queue = 0;
vector_sz = ev.vec->nb_elem;
pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
- pipeline_event_enqueue(dev, port, &ev);
+ enq = pipeline_event_enqueue(dev, port, &ev, t);
w->processed_pkts += vector_sz;
}
+ pipeline_worker_cleanup(dev, port, &ev, enq, deq);
return 0;
}
@@ -186,11 +198,11 @@ static __rte_noinline int
pipeline_queue_worker_single_stage_burst_tx_vector(void *arg)
{
PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
+ uint16_t nb_rx = 0, nb_tx = 0;
uint16_t vector_sz;
while (!t->done) {
- uint16_t nb_rx =
- rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
if (!nb_rx) {
rte_pause();
@@ -200,7 +212,7 @@ pipeline_queue_worker_single_stage_burst_tx_vector(void *arg)
for (i = 0; i < nb_rx; i++) {
if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
vector_sz = ev[i].vec->nb_elem;
- pipeline_event_tx_vector(dev, port, &ev[i]);
+ pipeline_event_tx_vector(dev, port, &ev[i], t);
ev[i].op = RTE_EVENT_OP_RELEASE;
w->processed_pkts += vector_sz;
} else {
@@ -210,8 +222,9 @@ pipeline_queue_worker_single_stage_burst_tx_vector(void *arg)
}
}
- pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+ nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
}
+ pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
return 0;
}
@@ -221,11 +234,11 @@ pipeline_queue_worker_single_stage_burst_fwd_vector(void *arg)
{
PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
const uint8_t *tx_queue = t->tx_evqueue_id;
+ uint16_t nb_rx = 0, nb_tx = 0;
uint16_t vector_sz;
while (!t->done) {
- uint16_t nb_rx =
- rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
if (!nb_rx) {
rte_pause();
@@ -241,9 +254,10 @@ pipeline_queue_worker_single_stage_burst_fwd_vector(void *arg)
RTE_SCHED_TYPE_ATOMIC);
}
- pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+ nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
w->processed_pkts += vector_sz;
}
+ pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
return 0;
}
@@ -253,11 +267,12 @@ pipeline_queue_worker_multi_stage_tx(void *arg)
{
PIPELINE_WORKER_MULTI_STAGE_INIT;
const uint8_t *tx_queue = t->tx_evqueue_id;
+ uint8_t enq = 0, deq = 0;
while (t->done == false) {
- uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+ deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
- if (!event) {
+ if (!deq) {
rte_pause();
continue;
}
@@ -265,7 +280,8 @@ pipeline_queue_worker_multi_stage_tx(void *arg)
cq_id = ev.queue_id % nb_stages;
if (ev.queue_id == tx_queue[ev.mbuf->port]) {
- pipeline_event_tx(dev, port, &ev);
+ enq = pipeline_event_tx(dev, port, &ev, t);
+ ev.op = RTE_EVENT_OP_RELEASE;
w->processed_pkts++;
continue;
}
@@ -274,8 +290,9 @@ pipeline_queue_worker_multi_stage_tx(void *arg)
pipeline_fwd_event(&ev, cq_id != last_queue ?
sched_type_list[cq_id] :
RTE_SCHED_TYPE_ATOMIC);
- pipeline_event_enqueue(dev, port, &ev);
+ enq = pipeline_event_enqueue(dev, port, &ev, t);
}
+ pipeline_worker_cleanup(dev, port, &ev, enq, deq);
return 0;
}
@@ -285,11 +302,12 @@ pipeline_queue_worker_multi_stage_fwd(void *arg)
{
PIPELINE_WORKER_MULTI_STAGE_INIT;
const uint8_t *tx_queue = t->tx_evqueue_id;
+ uint8_t enq = 0, deq = 0;
while (t->done == false) {
- uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+ deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
- if (!event) {
+ if (!deq) {
rte_pause();
continue;
}
@@ -300,14 +318,15 @@ pipeline_queue_worker_multi_stage_fwd(void *arg)
ev.queue_id = tx_queue[ev.mbuf->port];
rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
- pipeline_event_enqueue(dev, port, &ev);
+ enq = pipeline_event_enqueue(dev, port, &ev, t);
w->processed_pkts++;
} else {
ev.queue_id++;
pipeline_fwd_event(&ev, sched_type_list[cq_id]);
- pipeline_event_enqueue(dev, port, &ev);
+ enq = pipeline_event_enqueue(dev, port, &ev, t);
}
}
+ pipeline_worker_cleanup(dev, port, &ev, enq, deq);
return 0;
}
@@ -317,10 +336,10 @@ pipeline_queue_worker_multi_stage_burst_tx(void *arg)
{
PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
const uint8_t *tx_queue = t->tx_evqueue_id;
+ uint16_t nb_rx = 0, nb_tx = 0;
while (t->done == false) {
- uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
- BURST_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
if (!nb_rx) {
rte_pause();
@@ -332,7 +351,8 @@ pipeline_queue_worker_multi_stage_burst_tx(void *arg)
cq_id = ev[i].queue_id % nb_stages;
if (ev[i].queue_id == tx_queue[ev[i].mbuf->port]) {
- pipeline_event_tx(dev, port, &ev[i]);
+ pipeline_event_tx(dev, port, &ev[i], t);
+ ev[i].op = RTE_EVENT_OP_RELEASE;
w->processed_pkts++;
continue;
}
@@ -341,9 +361,10 @@ pipeline_queue_worker_multi_stage_burst_tx(void *arg)
pipeline_fwd_event(&ev[i], cq_id != last_queue ?
sched_type_list[cq_id] :
RTE_SCHED_TYPE_ATOMIC);
- pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
}
+ nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
}
+ pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
return 0;
}
@@ -353,11 +374,11 @@ pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
{
PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
const uint8_t *tx_queue = t->tx_evqueue_id;
+ uint16_t nb_rx = 0, nb_tx = 0;
while (t->done == false) {
uint16_t processed_pkts = 0;
- uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
- BURST_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
if (!nb_rx) {
rte_pause();
@@ -381,9 +402,10 @@ pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
}
}
- pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+ nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
w->processed_pkts += processed_pkts;
}
+ pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
return 0;
}
@@ -393,12 +415,13 @@ pipeline_queue_worker_multi_stage_tx_vector(void *arg)
{
PIPELINE_WORKER_MULTI_STAGE_INIT;
const uint8_t *tx_queue = t->tx_evqueue_id;
+ uint8_t enq = 0, deq = 0;
uint16_t vector_sz;
while (!t->done) {
- uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+ deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
- if (!event) {
+ if (!deq) {
rte_pause();
continue;
}
@@ -407,8 +430,9 @@ pipeline_queue_worker_multi_stage_tx_vector(void *arg)
if (ev.queue_id == tx_queue[ev.vec->port]) {
vector_sz = ev.vec->nb_elem;
- pipeline_event_tx_vector(dev, port, &ev);
+ enq = pipeline_event_tx_vector(dev, port, &ev, t);
w->processed_pkts += vector_sz;
+ ev.op = RTE_EVENT_OP_RELEASE;
continue;
}
@@ -416,8 +440,9 @@ pipeline_queue_worker_multi_stage_tx_vector(void *arg)
pipeline_fwd_event_vector(&ev, cq_id != last_queue
? sched_type_list[cq_id]
: RTE_SCHED_TYPE_ATOMIC);
- pipeline_event_enqueue(dev, port, &ev);
+ enq = pipeline_event_enqueue(dev, port, &ev, t);
}
+ pipeline_worker_cleanup(dev, port, &ev, enq, deq);
return 0;
}
@@ -427,12 +452,13 @@ pipeline_queue_worker_multi_stage_fwd_vector(void *arg)
{
PIPELINE_WORKER_MULTI_STAGE_INIT;
const uint8_t *tx_queue = t->tx_evqueue_id;
+ uint8_t enq = 0, deq = 0;
uint16_t vector_sz;
while (!t->done) {
- uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+ deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
- if (!event) {
+ if (!deq) {
rte_pause();
continue;
}
@@ -449,8 +475,9 @@ pipeline_queue_worker_multi_stage_fwd_vector(void *arg)
pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
}
- pipeline_event_enqueue(dev, port, &ev);
+ enq = pipeline_event_enqueue(dev, port, &ev, t);
}
+ pipeline_worker_cleanup(dev, port, &ev, enq, deq);
return 0;
}
@@ -460,11 +487,11 @@ pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg)
{
PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
const uint8_t *tx_queue = t->tx_evqueue_id;
+ uint16_t nb_rx = 0, nb_tx = 0;
uint16_t vector_sz;
while (!t->done) {
- uint16_t nb_rx =
- rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
if (!nb_rx) {
rte_pause();
@@ -476,7 +503,7 @@ pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg)
if (ev[i].queue_id == tx_queue[ev[i].vec->port]) {
vector_sz = ev[i].vec->nb_elem;
- pipeline_event_tx_vector(dev, port, &ev[i]);
+ pipeline_event_tx_vector(dev, port, &ev[i], t);
ev[i].op = RTE_EVENT_OP_RELEASE;
w->processed_pkts += vector_sz;
continue;
@@ -489,8 +516,9 @@ pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg)
: RTE_SCHED_TYPE_ATOMIC);
}
- pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+ nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
}
+ pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
return 0;
}
@@ -500,11 +528,11 @@ pipeline_queue_worker_multi_stage_burst_fwd_vector(void *arg)
{
PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
const uint8_t *tx_queue = t->tx_evqueue_id;
+ uint16_t nb_rx = 0, nb_tx = 0;
uint16_t vector_sz;
while (!t->done) {
- uint16_t nb_rx =
- rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
if (!nb_rx) {
rte_pause();
@@ -527,8 +555,9 @@ pipeline_queue_worker_multi_stage_burst_fwd_vector(void *arg)
}
}
- pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+ nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
}
+ pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
return 0;
}