[dpdk-dev,10/13] examples/eventdev: add single stage pipeline worker
Checks
Commit Message
Add optimized eventdev pipeline when ethdev supports thread safe Tx
and number of configured stages is one.
Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
---
.../eventdev_pipeline_sw_pmd/pipeline_worker_tx.c | 109 +++++++++++++++++++--
1 file changed, 101 insertions(+), 8 deletions(-)
Comments
Hi Pavan,
</snip>
> static int
> @@ -265,6 +350,7 @@ worker_do_tx_burst_atq(void *arg)
> worker_fwd_event(&ev[i],
> cdata.queue_type);
> }
> + work(ev[i].mbuf);
Please move this change to the prior patch ("examples/eventdev: add all type queue option")
Thanks,
Gage
@@ -68,6 +68,91 @@ worker_tx_pkt(struct rte_mbuf *mbuf)
rte_pause();
}
+/* Single stage pipeline workers */
+
+static int
+worker_do_tx_single(void *arg)
+{
+ struct worker_data *data = (struct worker_data *)arg;
+ const uint8_t dev = data->dev_id;
+ const uint8_t port = data->port_id;
+ size_t fwd = 0, received = 0, tx = 0;
+ struct rte_event ev;
+
+ while (!fdata->done) {
+
+ if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) {
+ rte_pause();
+ continue;
+ }
+
+ received++;
+
+ if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+ worker_tx_pkt(ev.mbuf);
+ tx++;
+ continue;
+ }
+ work(ev.mbuf);
+ ev.queue_id++;
+ worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+ worker_event_enqueue(dev, port, &ev);
+ fwd++;
+ }
+
+ if (!cdata.quiet)
+ printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+ rte_lcore_id(), received, fwd, tx);
+ return 0;
+}
+
+static int
+worker_do_tx_single_burst(void *arg)
+{
+ struct rte_event ev[BATCH_SIZE + 1];
+
+ struct worker_data *data = (struct worker_data *)arg;
+ const uint8_t dev = data->dev_id;
+ const uint8_t port = data->port_id;
+ size_t fwd = 0, received = 0, tx = 0;
+
+ while (!fdata->done) {
+ uint16_t i;
+ uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
+ BATCH_SIZE, 0);
+
+ if (!nb_rx) {
+ rte_pause();
+ continue;
+ }
+ received += nb_rx;
+
+ for (i = 0; i < nb_rx; i++) {
+ rte_prefetch0(ev[i + 1].mbuf);
+ if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+ worker_tx_pkt(ev[i].mbuf);
+ ev[i].op = RTE_EVENT_OP_RELEASE;
+ tx++;
+
+ } else {
+ ev[i].queue_id++;
+ worker_fwd_event(&ev[i],
+ RTE_SCHED_TYPE_ATOMIC);
+ }
+ work(ev[i].mbuf);
+ }
+
+ worker_event_enqueue_burst(dev, port, ev, nb_rx);
+ fwd += nb_rx;
+ }
+
+ if (!cdata.quiet)
+ printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+ rte_lcore_id(), received, fwd, tx);
+ return 0;
+}
+
/* Multi stage Pipeline Workers */
static int
@@ -265,6 +350,7 @@ worker_do_tx_burst_atq(void *arg)
worker_fwd_event(&ev[i],
cdata.queue_type);
}
+ work(ev[i].mbuf);
}
worker_event_enqueue_burst(dev, port, ev, nb_rx);
@@ -610,14 +696,21 @@ set_worker_tx_setup_data(struct setup_data *caps, bool burst)
{
uint8_t atq = cdata.all_type_queues ? 1 : 0;
- if (burst && atq)
- caps->worker_loop = worker_do_tx_burst_atq;
- if (burst && !atq)
- caps->worker_loop = worker_do_tx_burst;
- if (!burst && atq)
- caps->worker_loop = worker_do_tx_atq;
- if (!burst && !atq)
- caps->worker_loop = worker_do_tx;
+ if (cdata.num_stages == 1) {
+ if (burst)
+ caps->worker_loop = worker_do_tx_single_burst;
+ if (!burst)
+ caps->worker_loop = worker_do_tx_single;
+ } else {
+ if (burst && atq)
+ caps->worker_loop = worker_do_tx_burst_atq;
+ if (burst && !atq)
+ caps->worker_loop = worker_do_tx_burst;
+ if (!burst && atq)
+ caps->worker_loop = worker_do_tx_atq;
+ if (!burst && !atq)
+ caps->worker_loop = worker_do_tx;
+ }
caps->opt_check = opt_check;
caps->consumer_loop = NULL;