[dpdk-dev,7/7] examples/eventdev_pipeline: adding example

Message ID 1479319207-130646-8-git-send-email-harry.van.haaren@intel.com (mailing list archive)
State Superseded, archived
Headers

Checks

Context Check Description
checkpatch/checkpatch warning coding style issues

Commit Message

Van Haaren, Harry Nov. 16, 2016, 6 p.m. UTC
  This patch adds a sample app to the examples/ directory, which can be used
as a reference application and for general testing. The application requires
two ethdev ports and expects traffic to be flowing. The application must be
run with the --vdev flags as follows to indicate to EAL that a virtual
eventdev device called "evdev_sw0" is available to be used:

    ./build/eventdev_pipeline --vdev evdev_sw0

The general flow of the traffic is as follows:

    Rx core -> Atomic Queue => 4 worker cores => TX core

A scheduler core is required to do the packet scheduling, making this
configuration require 7 cores (Rx, Tx, Scheduler, and 4 workers). Finally
a master core brings the core count to 8 for this configuration. The
application can be configured for various numbers of flows and worker
cores. Run the application with -h for details.

Signed-off-by: Gage Eads <gage.eads@intel.com>
Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
---
 examples/eventdev_pipeline/Makefile |  49 +++
 examples/eventdev_pipeline/main.c   | 718 ++++++++++++++++++++++++++++++++++++
 2 files changed, 767 insertions(+)
 create mode 100644 examples/eventdev_pipeline/Makefile
 create mode 100644 examples/eventdev_pipeline/main.c
  

Comments

Jerin Jacob Nov. 22, 2016, 6:02 a.m. UTC | #1
On Wed, Nov 16, 2016 at 06:00:07PM +0000, Harry van Haaren wrote:
> This patch adds a sample app to the examples/ directory, which can be used
> as a reference application and for general testing. The application requires
> two ethdev ports and expects traffic to be flowing. The application must be
> run with the --vdev flags as follows to indicate to EAL that a virtual
> eventdev device called "evdev_sw0" is available to be used:
> 
>     ./build/eventdev_pipeline --vdev evdev_sw0
> 
> The general flow of the traffic is as follows:
> 
>     Rx core -> Atomic Queue => 4 worker cores => TX core
> 
> A scheduler core is required to do the packet scheduling, making this
> configuration require 7 cores (Rx, Tx, Scheduler, and 4 workers). Finally
> a master core brings the core count to 8 for this configuration. The

Thanks for the example application.I will try to share my views on
ethdev integration and usability perspective. Hope we can converge.

Some of the high level details first before getting into exact details.

1) From the HW and ethdev integration perspective, The integrated NIC controllers
does not need producer core(s) to push the event/packets to event queue. So, I was
thinking to use 6WIND rte_flow spec to create the "ethdev port to event
queue wiring" connection by extending the output ACTION definition, which
specifies event queue its need to enqueued to for the given ethdev port
(something your are doing in application).

I guess, the producer part of this example can be created as common
code, somewhere in rte_flow/ethdev to reuse. We would need this scheme also
where when we deal with external nics + HW event manager use case

The complete event driven model can be verified and exercised without
integrating with eventdev subsystem. So I think, may be we need to
focus on functional applications without ethdev to verify the eventdev
features like(automatic multicore scaling,  dynamic load balancing, pipelining,
packet ingress order maintenance and synchronization services) and then
integrate with ethdev

> +	const unsigned cores_needed = num_workers +
> +			/*main*/1 +
> +			/*sched*/1 +
> +			/*TX*/1 +
> +			/*RX*/1;
> +

2) One of the prime aims of the event driven model is to remove the fixed
function core mappings and enable automatic multicore scaling,  dynamic load
balancing etc.I will try to use an example in review section to show the
method for removing "consumer core" in this case.

> application can be configured for various numbers of flows and worker
> cores. Run the application with -h for details.
> 
> Signed-off-by: Gage Eads <gage.eads@intel.com>
> Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
> ---
>  examples/eventdev_pipeline/Makefile |  49 +++
>  examples/eventdev_pipeline/main.c   | 718 ++++++++++++++++++++++++++++++++++++
>  2 files changed, 767 insertions(+)
>  create mode 100644 examples/eventdev_pipeline/Makefile
>  create mode 100644 examples/eventdev_pipeline/main.c
> 
> +static int sched_type = RTE_SCHED_TYPE_ATOMIC;

RTE_SCHED_TYPE_ORDERED makes sense as a default. Most common case will
have ORDERD at first stage so that it can scale.

> +
> +
> +static int
> +worker(void *arg)
> +{
> +	struct rte_event rcv_events[BATCH_SIZE];
> +
> +	struct worker_data *data = (struct worker_data *)arg;
> +	uint8_t event_dev_id = data->event_dev_id;
> +	uint8_t event_port_id = data->event_port_id;
> +	int32_t qid = data->qid;
> +	size_t sent = 0, received = 0;
> +
> +	while (!done) {
> +		uint16_t i;
> +
> +		uint16_t n = rte_event_dequeue_burst(event_dev_id,
> +						     event_port_id,
> +						     rcv_events,
> +						     RTE_DIM(rcv_events),
> +						     false);
> +		if (n == 0){
> +			rte_pause();
> +			/* Flush any buffered events */
> +			rte_event_dequeue(event_dev_id,
> +					  event_port_id,
> +					  NULL,
> +					  false);

The above can be done in implementation. May not be the candidate for common code.

> +			continue;
> +		}
> +		received += n;
> +
> +		for (i = 0; i < n; i++) {
> +			struct ether_hdr *eth;
> +			struct ether_addr addr;
> +			struct rte_event *ev = &rcv_events[i];
> +
> +			ev->queue_id = qid;
> +			ev->flow_id = 0;

Another way to deal wit out additional consumer core(it creates issue in
scaling and load balancing) is

in worker:
while(1) {

	ev = dequeue(port);

	// stage 1 app processing
	if (ev.event_type == RTE_EVENT_TYPE_ETHDEV) {
		// identify the Ethernet port and tx queue the packet needs to go
		// create the flow based on that
		ev.flow_id = flow(port_id, tx_queue_id);
		ev.sched_type = RTE_SCHED_TYPE_ATOMIC;
		ev.operation = RTE_EVENT_OP_FORWARD;
		ev.event_type = RTE_EVENT_TYPE_CORE;
	} // stage 2 app processing
	else if (ev.event_type == RTE_EVENT_TYPE_CORE) {
		port_id = function_of(ev.flow_id) ;// look stage 1 processing
		tx_queue_id = function_of(ev.flow_id) //look stage 1 processing
		remaining ethdev based tx is same as yours
	}
	enqueue(ev);
}



> +			ev->priority = 0;
> +			ev->sched_type = RTE_SCHED_TYPE_ATOMIC;
> +			ev->operation = RTE_EVENT_OP_FORWARD;
> +
> +			uint64_t now = rte_rdtsc();
> +			while(now + 750 > rte_rdtsc()) {}

Why delay ?

> +
> +			/* change mac addresses on packet */
> +			eth = rte_pktmbuf_mtod(ev->mbuf, struct ether_hdr *);
> +			ether_addr_copy(&eth->d_addr, &addr);
> +			ether_addr_copy(&eth->s_addr, &eth->d_addr);
> +			ether_addr_copy(&addr, &eth->s_addr);
> +		}
> +		int ret = rte_event_enqueue_burst(event_dev_id, event_port_id,
> +					rcv_events, n, false);
> +		if (ret != n)
> +			rte_panic("worker %u thread failed to enqueue event\n",
> +				rte_lcore_id());
> +	}
> +
> +	/* Flush the buffered events */
> +	rte_event_dequeue(event_dev_id, event_port_id, NULL, false);
> +
> +	if (!quiet)
> +		printf("  worker %u thread done. RX=%zu TX=%zu\n",
> +				rte_lcore_id(), received, sent);
> +
> +	return 0;
> +}
> +
> +static int
> +scheduler(void *arg)
> +{

Maybe better to abstract as "service core" or something like I mentioned
earlier, as HW implementation does not need this

> +	RTE_SET_USED(arg);
> +	size_t loops = 0;
> +
> +	while (!done) {
> +		/* Assumes an event dev ID of 0 */
> +		rte_event_schedule(0);
> +		loops++;
> +	}
> +
> +	printf("  scheduler thread done. loops=%zu\n", loops);
> +
> +	return 0;
> +}
> +
> +
> +static int
> +producer(void *arg)
> +{
> +
> +	struct prod_data *data = (struct prod_data *)arg;
> +	size_t npackets = num_packets;
> +	unsigned i;
> +	uint64_t mbuf_seqno = 0;
> +	size_t sent = 0;
> +	uint8_t eth_port = 0;
> +	uint8_t event_dev_id = data->event_dev_id;
> +	uint8_t event_port_id = data->event_port_id;
> +	int fid_counter = 0;
> +
> +	while (!done) {
> +		int ret;
> +		unsigned num_ports = data->num_ports;
> +		int32_t qid = data->qid;
> +		struct rte_event events[BATCH_SIZE];
> +		struct rte_mbuf *mbufs[BATCH_SIZE];
> +
> +		uint16_t nb_rx = rte_eth_rx_burst(eth_port, 0, mbufs, BATCH_SIZE);
> +		if (++eth_port == num_ports)
> +			eth_port = 0;
> +		if (nb_rx == 0) {
> +			rte_pause();
> +			/* Flush any buffered events */
> +			rte_event_dequeue(event_dev_id,
> +					  event_port_id,
> +					  NULL,
> +					  false);
> +			continue;
> +		}
> +
> +		for (i = 0; i < nb_rx; i++) {
> +			struct rte_mbuf *m = mbufs[i];
> +			struct rte_event *ev = &events[i];
> +
> +			ev->queue_id = qid;
> +			ev->flow_id = fid_counter++ % 6;

To me, flow_id should be a function of port_id and rx queue number here.
right?

> +			ev->priority = 0;
> +			m->udata64 = mbuf_seqno++;

Why update mbuf_seqno++ here. Shouldn't be something inside the
implementation?

> +			ev->mbuf = m;
> +			ev->sched_type = sched_type;
> +			ev->operation = RTE_EVENT_OP_NEW;
> +		}
> +
> +		do {
> +			ret = rte_event_enqueue_burst(event_dev_id,
> +							event_port_id,
> +							events,
> +							nb_rx,
> +							false);
> +		} while (ret == -ENOSPC);

I guess, -ENOSPC can be checked inside the implementation. I guess, we
can pass the info required in the configuration stage to decide the timeout. May
not be the candidate for common code.

> +		if (ret != nb_rx)
> +			rte_panic("producer thread failed to enqueue *all* events\n");
> +
> +		sent += nb_rx;
> +
> +		if (num_packets > 0 && npackets > 0) {
> +			npackets -= nb_rx;
> +			if (npackets == 0)
> +				break;
> +		}
> +	}
> +
> +	/* Flush any buffered events */
> +	while (!done)
> +		rte_event_dequeue(event_dev_id, event_port_id, NULL, false);
> +
> +	printf("  prod thread done! TX=%zu across %u flows\n", sent, num_fids);
> +
> +	return 0;
> +}
> +

> +static uint8_t
> +setup_event_dev(struct prod_data *prod_data,
> +		struct cons_data *cons_data,
> +		struct worker_data *worker_data)
> +{
> +	config.nb_events_limit = 256;

In real application, we may need to pass as command line

> +	config.dequeue_wait_ns = 0;
> +
> +	ret = rte_event_dev_configure(id, &config);
> +	if (ret)
> +		rte_panic("Failed to configure the event dev\n");
> +
> +	/* Create queues */
> +	queue_config.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY;
> +	queue_config.priority = 0;
> +
> +	qid0 = 0;
> +	ret = rte_event_queue_setup(id, qid0, &queue_config);
> +	if (ret < 0)
> +		rte_panic("Failed to create the scheduled QID\n");
> +
> +	queue_config.event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_CONSUMER;
> +	queue_config.priority = 0;
> +
> +	cons_qid = 1;
> +	ret = rte_event_queue_setup(id, cons_qid, &queue_config);
> +	if (ret < 0)
> +		rte_panic("Failed to create the cons directed QID\n");
> +
> +	queue_config.event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_CONSUMER;

I guess its more of RTE_EVENT_QUEUE_CFG_SINGLE_PRODUCER case, Does it
make sense to add RTE_EVENT_QUEUE_CFG_SINGLE_PRODUCER in spec, if you are
enqueueing only through that port. see next comment.

> +	queue_config.priority = 0;
> +
> +	prod_qid = 2;
> +	ret = rte_event_queue_setup(id, prod_qid, &queue_config);
> +	if (ret < 0)
> +		rte_panic("Failed to create the prod directed QID\n");
> +

Looks like prod_qid is just created as a dummy, The actual producer is
en-queuing on qid0.Something not adding up.

> +	/* Create ports */
> +#define LB_PORT_DEPTH 16
> +#define DIR_PORT_DEPTH 32
> +	port_config.enqueue_queue_depth = LB_PORT_DEPTH;
> +	port_config.dequeue_queue_depth = LB_PORT_DEPTH;

We need to check the info->max_enqueue_queue_depth.

Jerin
  
Bruce Richardson Nov. 22, 2016, 2:04 p.m. UTC | #2
> -----Original Message-----
> From: Jerin Jacob [mailto:jerin.jacob@caviumnetworks.com]
> Sent: Tuesday, November 22, 2016 6:02 AM
> To: Van Haaren, Harry <harry.van.haaren@intel.com>
> Cc: dev@dpdk.org; Eads, Gage <gage.eads@intel.com>; Richardson, Bruce
> <bruce.richardson@intel.com>
> Subject: Re: [dpdk-dev] [PATCH 7/7] examples/eventdev_pipeline: adding
> example
> 
> On Wed, Nov 16, 2016 at 06:00:07PM +0000, Harry van Haaren wrote:
> > This patch adds a sample app to the examples/ directory, which can be
> > used as a reference application and for general testing. The
> > application requires two ethdev ports and expects traffic to be
> > flowing. The application must be run with the --vdev flags as follows
> > to indicate to EAL that a virtual eventdev device called "evdev_sw0" is
> available to be used:
> >
> >     ./build/eventdev_pipeline --vdev evdev_sw0
> >
> > The general flow of the traffic is as follows:
> >
> >     Rx core -> Atomic Queue => 4 worker cores => TX core
> >
> > A scheduler core is required to do the packet scheduling, making this
> > configuration require 7 cores (Rx, Tx, Scheduler, and 4 workers).
> > Finally a master core brings the core count to 8 for this
> > configuration. The
> 
> Thanks for the example application.I will try to share my views on ethdev
> integration and usability perspective. Hope we can converge.

Hi Jerin, 

thanks for the feedback. We'll take it on board for a subsequent version
we produce. Additional comments and queries on your feedback inline below.

/Bruce

> 
> Some of the high level details first before getting into exact details.
> 
> 1) From the HW and ethdev integration perspective, The integrated NIC
> controllers does not need producer core(s) to push the event/packets to
> event queue. So, I was thinking to use 6WIND rte_flow spec to create the
> "ethdev port to event queue wiring" connection by extending the output
> ACTION definition, which specifies event queue its need to enqueued to for
> the given ethdev port (something your are doing in application).
> 
> I guess, the producer part of this example can be created as common code,
> somewhere in rte_flow/ethdev to reuse. We would need this scheme also
> where when we deal with external nics + HW event manager use case
> 
Yes. This is something to consider.

For the pure-software model, we also might want to look at the opposite
approach, where we register an ethdev with the scheduler for it to "pull"
new packets from. This would allow it to bypass the port logic for the new
packets. 

An alternative for this is to extend the schedule API to allow a burst of
packets to be passed in to be scheduled immediately as "NEW" packets. The end
results should be the same, saving cycles by bypassing unneeded processing
for the new packets.

> The complete event driven model can be verified and exercised without
> integrating with eventdev subsystem. So I think, may be we need to focus
> on functional applications without ethdev to verify the eventdev features
> like(automatic multicore scaling,  dynamic load balancing, pipelining,
> packet ingress order maintenance and synchronization services) and then
> integrate with ethdev

Yes, comprehensive unit tests will be needed too. But an example app that
pulls packets from an external NIC I also think is needed to get a feel
for the performance of the scheduler with real traffic.

> 
> > +	const unsigned cores_needed = num_workers +
> > +			/*main*/1 +
> > +			/*sched*/1 +
> > +			/*TX*/1 +
> > +			/*RX*/1;
> > +
> 
> 2) One of the prime aims of the event driven model is to remove the fixed
> function core mappings and enable automatic multicore scaling,  dynamic
> load balancing etc.I will try to use an example in review section to show
> the method for removing "consumer core" in this case.

Yes, I agree, but unfortunately, for some tasks, distributing those tasks
across multiple cores can hurt performance overall do to resource contention.
 
> 
> > application can be configured for various numbers of flows and worker
> > cores. Run the application with -h for details.
> >
> > Signed-off-by: Gage Eads <gage.eads@intel.com>
> > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> > Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
> > ---
> >  examples/eventdev_pipeline/Makefile |  49 +++
> >  examples/eventdev_pipeline/main.c   | 718
> ++++++++++++++++++++++++++++++++++++
> >  2 files changed, 767 insertions(+)
> >  create mode 100644 examples/eventdev_pipeline/Makefile
> >  create mode 100644 examples/eventdev_pipeline/main.c
> >
> > +static int sched_type = RTE_SCHED_TYPE_ATOMIC;
> 
> RTE_SCHED_TYPE_ORDERED makes sense as a default. Most common case will
> have ORDERD at first stage so that it can scale.
> 
> > +
> > +
> > +static int
> > +worker(void *arg)
> > +{
> > +	struct rte_event rcv_events[BATCH_SIZE];
> > +
> > +	struct worker_data *data = (struct worker_data *)arg;
> > +	uint8_t event_dev_id = data->event_dev_id;
> > +	uint8_t event_port_id = data->event_port_id;
> > +	int32_t qid = data->qid;
> > +	size_t sent = 0, received = 0;
> > +
> > +	while (!done) {
> > +		uint16_t i;
> > +
> > +		uint16_t n = rte_event_dequeue_burst(event_dev_id,
> > +						     event_port_id,
> > +						     rcv_events,
> > +						     RTE_DIM(rcv_events),
> > +						     false);
> > +		if (n == 0){
> > +			rte_pause();
> > +			/* Flush any buffered events */
> > +			rte_event_dequeue(event_dev_id,
> > +					  event_port_id,
> > +					  NULL,
> > +					  false);
> 
> The above can be done in implementation. May not be the candidate for
> common code.
> 
> > +			continue;
> > +		}
> > +		received += n;
> > +
> > +		for (i = 0; i < n; i++) {
> > +			struct ether_hdr *eth;
> > +			struct ether_addr addr;
> > +			struct rte_event *ev = &rcv_events[i];
> > +
> > +			ev->queue_id = qid;
> > +			ev->flow_id = 0;
> 
> Another way to deal wit out additional consumer core(it creates issue in
> scaling and load balancing) is
> 
> in worker:
> while(1) {
> 
> 	ev = dequeue(port);
> 
> 	// stage 1 app processing
> 	if (ev.event_type == RTE_EVENT_TYPE_ETHDEV) {
> 		// identify the Ethernet port and tx queue the packet needs to
> go
> 		// create the flow based on that
> 		ev.flow_id = flow(port_id, tx_queue_id);
> 		ev.sched_type = RTE_SCHED_TYPE_ATOMIC;
> 		ev.operation = RTE_EVENT_OP_FORWARD;
> 		ev.event_type = RTE_EVENT_TYPE_CORE;
> 	} // stage 2 app processing
> 	else if (ev.event_type == RTE_EVENT_TYPE_CORE) {
> 		port_id = function_of(ev.flow_id) ;// look stage 1 processing
> 		tx_queue_id = function_of(ev.flow_id) //look stage 1
> processing
> 		remaining ethdev based tx is same as yours
> 	}
> 	enqueue(ev);
> }
>
Yes, but you still need some core to do the work of pushing the packets into
the scheduler from the NIC, if you don't have a hardware path from NIC to 
HW scheduler. [Features like RSS can obviously help here with distributing that
work if needed]

In the case you do have a HW path - which I assume is the Cavium case - I assume
that the EVENT_TYPE_ETHDEV branch above needs also to take care of desc to mbuf
processing, as is normally done by the PMD?
 
> 
> 
> > +			ev->priority = 0;
> > +			ev->sched_type = RTE_SCHED_TYPE_ATOMIC;
> > +			ev->operation = RTE_EVENT_OP_FORWARD;
> > +
> > +			uint64_t now = rte_rdtsc();
> > +			while(now + 750 > rte_rdtsc()) {}
> 
> Why delay ?

Simulate some work being done by the worker, which makes the app slightly more
realistic and also helps the scheduler as there is not so much contention on the
shared cache lines.

> 
> > +
> > +			/* change mac addresses on packet */
> > +			eth = rte_pktmbuf_mtod(ev->mbuf, struct ether_hdr *);
> > +			ether_addr_copy(&eth->d_addr, &addr);
> > +			ether_addr_copy(&eth->s_addr, &eth->d_addr);
> > +			ether_addr_copy(&addr, &eth->s_addr);
> > +		}
> > +		int ret = rte_event_enqueue_burst(event_dev_id, event_port_id,
> > +					rcv_events, n, false);
> > +		if (ret != n)
> > +			rte_panic("worker %u thread failed to enqueue event\n",
> > +				rte_lcore_id());
> > +	}
> > +
> > +	/* Flush the buffered events */
> > +	rte_event_dequeue(event_dev_id, event_port_id, NULL, false);
> > +
> > +	if (!quiet)
> > +		printf("  worker %u thread done. RX=%zu TX=%zu\n",
> > +				rte_lcore_id(), received, sent);
> > +
> > +	return 0;
> > +}
> > +
> > +static int
> > +scheduler(void *arg)
> > +{
> 
> Maybe better to abstract as "service core" or something like I mentioned
> earlier, as HW implementation does not need this

Sure, we can look at this.

> 
> > +	RTE_SET_USED(arg);
> > +	size_t loops = 0;
> > +
> > +	while (!done) {
> > +		/* Assumes an event dev ID of 0 */
> > +		rte_event_schedule(0);
> > +		loops++;
> > +	}
> > +
> > +	printf("  scheduler thread done. loops=%zu\n", loops);
> > +
> > +	return 0;
> > +}
> > +
> > +
> > +static int
> > +producer(void *arg)
> > +{
> > +
> > +	struct prod_data *data = (struct prod_data *)arg;
> > +	size_t npackets = num_packets;
> > +	unsigned i;
> > +	uint64_t mbuf_seqno = 0;
> > +	size_t sent = 0;
> > +	uint8_t eth_port = 0;
> > +	uint8_t event_dev_id = data->event_dev_id;
> > +	uint8_t event_port_id = data->event_port_id;
> > +	int fid_counter = 0;
> > +
> > +	while (!done) {
> > +		int ret;
> > +		unsigned num_ports = data->num_ports;
> > +		int32_t qid = data->qid;
> > +		struct rte_event events[BATCH_SIZE];
> > +		struct rte_mbuf *mbufs[BATCH_SIZE];
> > +
> > +		uint16_t nb_rx = rte_eth_rx_burst(eth_port, 0, mbufs,
> BATCH_SIZE);
> > +		if (++eth_port == num_ports)
> > +			eth_port = 0;
> > +		if (nb_rx == 0) {
> > +			rte_pause();
> > +			/* Flush any buffered events */
> > +			rte_event_dequeue(event_dev_id,
> > +					  event_port_id,
> > +					  NULL,
> > +					  false);
> > +			continue;
> > +		}
> > +
> > +		for (i = 0; i < nb_rx; i++) {
> > +			struct rte_mbuf *m = mbufs[i];
> > +			struct rte_event *ev = &events[i];
> > +
> > +			ev->queue_id = qid;
> > +			ev->flow_id = fid_counter++ % 6;
> 
> To me, flow_id should be a function of port_id and rx queue number here.
> right?

I'd view it as app dependent. For a test app on IA, I'd expect to use the
NIC RSS value as an initial flow value.
NOTE: this is just a quick test app to demonstrate the concept for the RFC,
so not everything in it is necessarily realistic or what we'd expect in a
final version app.

> 
> > +			ev->priority = 0;
> > +			m->udata64 = mbuf_seqno++;
> 
> Why update mbuf_seqno++ here. Shouldn't be something inside the
> implementation?

I think this was to help verifying the packet ordering. Again, may not be
in a final version.

> 
> > +			ev->mbuf = m;
> > +			ev->sched_type = sched_type;
> > +			ev->operation = RTE_EVENT_OP_NEW;
> > +		}
> > +
> > +		do {
> > +			ret = rte_event_enqueue_burst(event_dev_id,
> > +							event_port_id,
> > +							events,
> > +							nb_rx,
> > +							false);
> > +		} while (ret == -ENOSPC);
> 
> I guess, -ENOSPC can be checked inside the implementation. I guess, we can
> pass the info required in the configuration stage to decide the timeout.
> May not be the candidate for common code.
> 
> > +		if (ret != nb_rx)
> > +			rte_panic("producer thread failed to enqueue *all*
> events\n");
> > +
> > +		sent += nb_rx;
> > +
> > +		if (num_packets > 0 && npackets > 0) {
> > +			npackets -= nb_rx;
> > +			if (npackets == 0)
> > +				break;
> > +		}
> > +	}
> > +
> > +	/* Flush any buffered events */
> > +	while (!done)
> > +		rte_event_dequeue(event_dev_id, event_port_id, NULL, false);
> > +
> > +	printf("  prod thread done! TX=%zu across %u flows\n", sent,
> > +num_fids);
> > +
> > +	return 0;
> > +}
> > +
> 
> > +static uint8_t
> > +setup_event_dev(struct prod_data *prod_data,
> > +		struct cons_data *cons_data,
> > +		struct worker_data *worker_data)
> > +{
> > +	config.nb_events_limit = 256;
> 
> In real application, we may need to pass as command line
> 
> > +	config.dequeue_wait_ns = 0;
> > +
> > +	ret = rte_event_dev_configure(id, &config);
> > +	if (ret)
> > +		rte_panic("Failed to configure the event dev\n");
> > +
> > +	/* Create queues */
> > +	queue_config.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY;
> > +	queue_config.priority = 0;
> > +
> > +	qid0 = 0;
> > +	ret = rte_event_queue_setup(id, qid0, &queue_config);
> > +	if (ret < 0)
> > +		rte_panic("Failed to create the scheduled QID\n");
> > +
> > +	queue_config.event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_CONSUMER;
> > +	queue_config.priority = 0;
> > +
> > +	cons_qid = 1;
> > +	ret = rte_event_queue_setup(id, cons_qid, &queue_config);
> > +	if (ret < 0)
> > +		rte_panic("Failed to create the cons directed QID\n");
> > +
> > +	queue_config.event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_CONSUMER;
> 
> I guess its more of RTE_EVENT_QUEUE_CFG_SINGLE_PRODUCER case, Does it make
> sense to add RTE_EVENT_QUEUE_CFG_SINGLE_PRODUCER in spec, if you are
> enqueueing only through that port. see next comment.
> 
> > +	queue_config.priority = 0;
> > +
> > +	prod_qid = 2;
> > +	ret = rte_event_queue_setup(id, prod_qid, &queue_config);
> > +	if (ret < 0)
> > +		rte_panic("Failed to create the prod directed QID\n");
> > +
> 
> Looks like prod_qid is just created as a dummy, The actual producer is en-
> queuing on qid0.Something not adding up.

Possibly not. We'll check it out.

> 
> > +	/* Create ports */
> > +#define LB_PORT_DEPTH 16
> > +#define DIR_PORT_DEPTH 32
> > +	port_config.enqueue_queue_depth = LB_PORT_DEPTH;
> > +	port_config.dequeue_queue_depth = LB_PORT_DEPTH;
> 
> We need to check the info->max_enqueue_queue_depth.
> 
> Jerin
  
Jerin Jacob Nov. 23, 2016, 12:30 a.m. UTC | #3
On Tue, Nov 22, 2016 at 02:04:27PM +0000, Richardson, Bruce wrote:
> 
> 
> > -----Original Message-----
> > From: Jerin Jacob [mailto:jerin.jacob@caviumnetworks.com]
> > Sent: Tuesday, November 22, 2016 6:02 AM
> > To: Van Haaren, Harry <harry.van.haaren@intel.com>
> > Cc: dev@dpdk.org; Eads, Gage <gage.eads@intel.com>; Richardson, Bruce
> > <bruce.richardson@intel.com>
> > Subject: Re: [dpdk-dev] [PATCH 7/7] examples/eventdev_pipeline: adding
> > example
> > 
> > On Wed, Nov 16, 2016 at 06:00:07PM +0000, Harry van Haaren wrote:
> > > This patch adds a sample app to the examples/ directory, which can be
> > > used as a reference application and for general testing. The
> > > application requires two ethdev ports and expects traffic to be
> > > flowing. The application must be run with the --vdev flags as follows
> > > to indicate to EAL that a virtual eventdev device called "evdev_sw0" is
> > available to be used:
> > >
> > >     ./build/eventdev_pipeline --vdev evdev_sw0
> > >
> > > The general flow of the traffic is as follows:
> > >
> > >     Rx core -> Atomic Queue => 4 worker cores => TX core
> > >
> > > A scheduler core is required to do the packet scheduling, making this
> > > configuration require 7 cores (Rx, Tx, Scheduler, and 4 workers).
> > > Finally a master core brings the core count to 8 for this
> > > configuration. The
> > 
> > Thanks for the example application.I will try to share my views on ethdev
> > integration and usability perspective. Hope we can converge.
> 
> Hi Jerin, 
> 
> thanks for the feedback. We'll take it on board for a subsequent version
> we produce. Additional comments and queries on your feedback inline below.

Thanks Bruce.

> 
> /Bruce
> 
> > 
> > Some of the high level details first before getting into exact details.
> > 
> > 1) From the HW and ethdev integration perspective, The integrated NIC
> > controllers does not need producer core(s) to push the event/packets to
> > event queue. So, I was thinking to use 6WIND rte_flow spec to create the
> > "ethdev port to event queue wiring" connection by extending the output
> > ACTION definition, which specifies event queue its need to enqueued to for
> > the given ethdev port (something your are doing in application).
> > 
> > I guess, the producer part of this example can be created as common code,
> > somewhere in rte_flow/ethdev to reuse. We would need this scheme also
> > where when we deal with external nics + HW event manager use case
> > 
> Yes. This is something to consider.
> 
> For the pure-software model, we also might want to look at the opposite
> approach, where we register an ethdev with the scheduler for it to "pull"
> new packets from. This would allow it to bypass the port logic for the new
> packets. 

Not sure,I understand this completely. How different its integrating
with rte_flow specification ?

> 
> An alternative for this is to extend the schedule API to allow a burst of
> packets to be passed in to be scheduled immediately as "NEW" packets. The end
> results should be the same, saving cycles by bypassing unneeded processing
> for the new packets.
> 
> > The complete event driven model can be verified and exercised without
> > integrating with eventdev subsystem. So I think, may be we need to focus
> > on functional applications without ethdev to verify the eventdev features
> > like(automatic multicore scaling,  dynamic load balancing, pipelining,
> > packet ingress order maintenance and synchronization services) and then
> > integrate with ethdev
> 
> Yes, comprehensive unit tests will be needed too. But an example app that
> pulls packets from an external NIC I also think is needed to get a feel
> for the performance of the scheduler with real traffic.

I agree, we need to have example to show case with real traffic.

Please check on ethdev integration aspects. Cavium has both server
(that's going to use SW event pmd) and NPU based platform(that's going to
use HW event pmd). So we would like to have common approach that makes
integration of both models with out changing the application.

I was thinking more with "service core" and "rte_flow" based
integration methodology to make that happen.

> 
> > 
> > > +	const unsigned cores_needed = num_workers +
> > > +			/*main*/1 +
> > > +			/*sched*/1 +
> > > +			/*TX*/1 +
> > > +			/*RX*/1;
> > > +
> > 
> > 2) One of the prime aims of the event driven model is to remove the fixed
> > function core mappings and enable automatic multicore scaling,  dynamic
> > load balancing etc.I will try to use an example in review section to show
> > the method for removing "consumer core" in this case.
> 
> Yes, I agree, but unfortunately, for some tasks, distributing those tasks
> across multiple cores can hurt performance overall do to resource contention.

May only in SW implementation.

>  
> > 
> > > application can be configured for various numbers of flows and worker
> > > cores. Run the application with -h for details.
> > >
> > 
> > Another way to deal wit out additional consumer core(it creates issue in
> > scaling and load balancing) is
> > 
> > in worker:
> > while(1) {
> > 
> > 	ev = dequeue(port);
> > 
> > 	// stage 1 app processing
> > 	if (ev.event_type == RTE_EVENT_TYPE_ETHDEV) {
> > 		// identify the Ethernet port and tx queue the packet needs to
> > go
> > 		// create the flow based on that
> > 		ev.flow_id = flow(port_id, tx_queue_id);
> > 		ev.sched_type = RTE_SCHED_TYPE_ATOMIC;
> > 		ev.operation = RTE_EVENT_OP_FORWARD;
> > 		ev.event_type = RTE_EVENT_TYPE_CORE;
> > 	} // stage 2 app processing
> > 	else if (ev.event_type == RTE_EVENT_TYPE_CORE) {
> > 		port_id = function_of(ev.flow_id) ;// look stage 1 processing
> > 		tx_queue_id = function_of(ev.flow_id) //look stage 1
> > processing
> > 		remaining ethdev based tx is same as yours
> > 	}
> > 	enqueue(ev);
> > }
> >
> Yes, but you still need some core to do the work of pushing the packets into
> the scheduler from the NIC, if you don't have a hardware path from NIC to 
> HW scheduler. [Features like RSS can obviously help here with distributing that
> work if needed]

Yes. make sense to have producer portion of code as common code.

> 
> In the case you do have a HW path - which I assume is the Cavium case - I assume
> that the EVENT_TYPE_ETHDEV branch above needs also to take care of desc to mbuf
> processing, as is normally done by the PMD?
>  
> > 
> > 
> > > +			ev->priority = 0;
> > > +			ev->sched_type = RTE_SCHED_TYPE_ATOMIC;
> > > +			ev->operation = RTE_EVENT_OP_FORWARD;
> > > +
> > > +			uint64_t now = rte_rdtsc();
> > > +			while(now + 750 > rte_rdtsc()) {}
> > 
> > Why delay ?
> 
> Simulate some work being done by the worker, which makes the app slightly more
> realistic and also helps the scheduler as there is not so much contention on the
> shared cache lines.

May not for performance test-cases.

>
  

Patch

diff --git a/examples/eventdev_pipeline/Makefile b/examples/eventdev_pipeline/Makefile
new file mode 100644
index 0000000..bab8916
--- /dev/null
+++ b/examples/eventdev_pipeline/Makefile
@@ -0,0 +1,49 @@ 
+#   BSD LICENSE
+#
+#   Copyright(c) 2016 Intel Corporation. All rights reserved.
+#
+#   Redistribution and use in source and binary forms, with or without
+#   modification, are permitted provided that the following conditions
+#   are met:
+#
+#     * Redistributions of source code must retain the above copyright
+#       notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above copyright
+#       notice, this list of conditions and the following disclaimer in
+#       the documentation and/or other materials provided with the
+#       distribution.
+#     * Neither the name of Intel Corporation nor the names of its
+#       contributors may be used to endorse or promote products derived
+#       from this software without specific prior written permission.
+#
+#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+ifeq ($(RTE_SDK),)
+$(error "Please define RTE_SDK environment variable")
+endif
+
+# Default target, can be overriden by command line or environment
+RTE_TARGET ?= x86_64-native-linuxapp-gcc
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# binary name
+APP = eventdev_pipeline
+
+# all source are stored in SRCS-y
+SRCS-y := main.c
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+
+include $(RTE_SDK)/mk/rte.extapp.mk
diff --git a/examples/eventdev_pipeline/main.c b/examples/eventdev_pipeline/main.c
new file mode 100644
index 0000000..6a8052c
--- /dev/null
+++ b/examples/eventdev_pipeline/main.c
@@ -0,0 +1,718 @@ 
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <getopt.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <signal.h>
+
+#include <rte_eal.h>
+#include <rte_mempool.h>
+#include <rte_mbuf.h>
+#include <rte_launch.h>
+#include <rte_malloc.h>
+#include <rte_cycles.h>
+#include <rte_ethdev.h>
+#include <rte_eventdev.h>
+
+#define BATCH_SIZE 32
+
+static unsigned int num_workers = 4;
+static unsigned long num_packets = (1L << 25); /* do ~32M packets */
+static unsigned int num_fids = 16;
+static unsigned int num_priorities = 1;
+static int sched_type = RTE_SCHED_TYPE_ATOMIC;
+
+struct prod_data {
+	uint8_t event_dev_id;
+	uint8_t event_port_id;
+	int32_t qid;
+	unsigned num_ports;
+};
+
+struct cons_data {
+	uint8_t event_dev_id;
+	uint8_t event_port_id;
+};
+
+struct worker_data {
+	uint8_t event_dev_id;
+	int event_port_id;
+	int32_t qid;
+};
+
+static volatile int done = 0;
+static int quiet = 0;
+struct rte_mempool *mp;
+
+static int
+worker(void *arg)
+{
+	struct rte_event rcv_events[BATCH_SIZE];
+
+	struct worker_data *data = (struct worker_data *)arg;
+	uint8_t event_dev_id = data->event_dev_id;
+	uint8_t event_port_id = data->event_port_id;
+	int32_t qid = data->qid;
+	size_t sent = 0, received = 0;
+
+	while (!done) {
+		uint16_t i;
+
+		uint16_t n = rte_event_dequeue_burst(event_dev_id,
+						     event_port_id,
+						     rcv_events,
+						     RTE_DIM(rcv_events),
+						     false);
+		if (n == 0){
+			rte_pause();
+			/* Flush any buffered events */
+			rte_event_dequeue(event_dev_id,
+					  event_port_id,
+					  NULL,
+					  false);
+			continue;
+		}
+		received += n;
+
+		for (i = 0; i < n; i++) {
+			struct ether_hdr *eth;
+			struct ether_addr addr;
+			struct rte_event *ev = &rcv_events[i];
+
+			ev->queue_id = qid;
+			ev->flow_id = 0;
+			ev->priority = 0;
+			ev->sched_type = RTE_SCHED_TYPE_ATOMIC;
+			ev->operation = RTE_EVENT_OP_FORWARD;
+
+			uint64_t now = rte_rdtsc();
+			while(now + 750 > rte_rdtsc()) {}
+
+			/* change mac addresses on packet */
+			eth = rte_pktmbuf_mtod(ev->mbuf, struct ether_hdr *);
+			ether_addr_copy(&eth->d_addr, &addr);
+			ether_addr_copy(&eth->s_addr, &eth->d_addr);
+			ether_addr_copy(&addr, &eth->s_addr);
+		}
+		int ret = rte_event_enqueue_burst(event_dev_id, event_port_id,
+					rcv_events, n, false);
+		if (ret != n)
+			rte_panic("worker %u thread failed to enqueue event\n",
+				rte_lcore_id());
+	}
+
+	/* Flush the buffered events */
+	rte_event_dequeue(event_dev_id, event_port_id, NULL, false);
+
+	if (!quiet)
+		printf("  worker %u thread done. RX=%zu TX=%zu\n",
+				rte_lcore_id(), received, sent);
+
+	return 0;
+}
+
+static int
+scheduler(void *arg)
+{
+	RTE_SET_USED(arg);
+	size_t loops = 0;
+
+	while (!done) {
+		/* Assumes an event dev ID of 0 */
+		rte_event_schedule(0);
+		loops++;
+	}
+
+	printf("  scheduler thread done. loops=%zu\n", loops);
+
+	return 0;
+}
+
+static int
+consumer(void *arg)
+{
+	struct rte_event events[BATCH_SIZE];
+
+	struct cons_data *data = (struct cons_data *)arg;
+	uint8_t event_dev_id = data->event_dev_id;
+	uint8_t event_port_id = data->event_port_id;
+	struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
+	size_t npackets = num_packets;
+	size_t received = 0;
+	size_t received_printed = 0; /* tracks when we last printed receive count */
+	uint64_t start_time = 0;
+	uint64_t freq_khz = rte_get_timer_hz() / 1000;
+	uint64_t dropped = 0;
+	unsigned i;
+
+	for (i = 0; i < rte_eth_dev_count(); i++) {
+		tx_buf[i] = rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(32), 0);
+		if (tx_buf[i] == NULL)
+			rte_panic("Out of memory\n");
+		rte_eth_tx_buffer_init(tx_buf[i], 32);
+		rte_eth_tx_buffer_set_err_callback(tx_buf[i],
+				rte_eth_tx_buffer_count_callback, &dropped);
+	}
+
+	while (!done) {
+		uint16_t i;
+		uint16_t n = rte_event_dequeue_burst(event_dev_id,
+						     event_port_id,
+						     events,
+						     RTE_DIM(events),
+						     false);
+
+		if (n == 0){
+			rte_pause();
+			continue;
+		}
+		if (start_time == 0)
+			start_time = rte_get_timer_cycles();
+
+		received += n;
+		for (i = 0; i < n; i++) {
+			uint8_t outport = events[i].mbuf->port;
+			rte_eth_tx_buffer(outport, 0, tx_buf[outport], events[i].mbuf);
+		}
+
+		if (!quiet && received >= received_printed + (1<<22)) {
+			printf("# consumer RX=%zu, time %"PRIu64"ms\n",
+					received,
+					(rte_get_timer_cycles() - start_time) / freq_khz);
+			received_printed = received;
+		}
+
+		if (num_packets > 0 && npackets > 0) {
+			npackets -= n;
+			if (npackets == 0)
+				done = 1;
+		}
+	}
+
+	for (i = 0; i < rte_eth_dev_count(); i++)
+		rte_eth_tx_buffer_flush(i, 0, tx_buf[i]);
+
+	printf("  consumer done! RX=%zu, time %"PRIu64"ms\n",
+			received,
+			(rte_get_timer_cycles() - start_time) / freq_khz);
+
+	return 0;
+}
+
+static int
+producer(void *arg)
+{
+
+	struct prod_data *data = (struct prod_data *)arg;
+	size_t npackets = num_packets;
+	unsigned i;
+	uint64_t mbuf_seqno = 0;
+	size_t sent = 0;
+	uint8_t eth_port = 0;
+	uint8_t event_dev_id = data->event_dev_id;
+	uint8_t event_port_id = data->event_port_id;
+	int fid_counter = 0;
+
+	while (!done) {
+		int ret;
+		unsigned num_ports = data->num_ports;
+		int32_t qid = data->qid;
+		struct rte_event events[BATCH_SIZE];
+		struct rte_mbuf *mbufs[BATCH_SIZE];
+
+		uint16_t nb_rx = rte_eth_rx_burst(eth_port, 0, mbufs, BATCH_SIZE);
+		if (++eth_port == num_ports)
+			eth_port = 0;
+		if (nb_rx == 0) {
+			rte_pause();
+			/* Flush any buffered events */
+			rte_event_dequeue(event_dev_id,
+					  event_port_id,
+					  NULL,
+					  false);
+			continue;
+		}
+
+		for (i = 0; i < nb_rx; i++) {
+			struct rte_mbuf *m = mbufs[i];
+			struct rte_event *ev = &events[i];
+
+			ev->queue_id = qid;
+			ev->flow_id = fid_counter++ % 6;
+			ev->priority = 0;
+			m->udata64 = mbuf_seqno++;
+			ev->mbuf = m;
+			ev->sched_type = sched_type;
+			ev->operation = RTE_EVENT_OP_NEW;
+		}
+
+		do {
+			ret = rte_event_enqueue_burst(event_dev_id,
+							event_port_id,
+							events,
+							nb_rx,
+							false);
+		} while (ret == -ENOSPC);
+		if (ret != nb_rx)
+			rte_panic("producer thread failed to enqueue *all* events\n");
+
+		sent += nb_rx;
+
+		if (num_packets > 0 && npackets > 0) {
+			npackets -= nb_rx;
+			if (npackets == 0)
+				break;
+		}
+	}
+
+	/* Flush any buffered events */
+	while (!done)
+		rte_event_dequeue(event_dev_id, event_port_id, NULL, false);
+
+	printf("  prod thread done! TX=%zu across %u flows\n", sent, num_fids);
+
+	return 0;
+}
+
+static struct option long_options[] = {
+	{"workers", required_argument, 0, 'w'},
+	{"packets", required_argument, 0, 'n'},
+	{"atomic-flows", required_argument, 0, 'f'},
+	{"priority", required_argument, 0, 'p'},
+	{"ordered", no_argument, 0, 'o'},
+	{"quiet", no_argument, 0, 'q'},
+	{0, 0, 0, 0}
+};
+
+static void
+usage(void)
+{
+	const char *usage_str =
+		"  Usage: eventdev_pipeline [options]\n"
+		"  Options:\n"
+		"  -w, --workers=N       Use N workers (default 4)\n"
+		"  -n, --packets=N       Send N packets (default ~32M), 0 implies no limit\n"
+		"  -f, --atomic-flows=N  Use N random flows from 1 to N (default 16)\n"
+		"  -p, --priority=N      Use N number of priorities (default 1)\n"
+		"  -o, --ordered         Use ordered scheduling\n"
+		"  -q, --quiet           Minimize printed output\n"
+		"\n";
+
+	fprintf(stderr, "%s", usage_str);
+	exit(1);
+}
+
+static void
+parse_app_args(int argc, char** argv)
+{
+	/* Parse cli options*/
+	int option_index;
+	int c;
+	opterr = 0;
+
+	for (;;) {
+		c = getopt_long(argc, argv, "w:n:f:p:oq", long_options,
+				&option_index);
+		if (c == -1)
+			break;
+
+		switch (c) {
+			case 'w':
+				num_workers = (unsigned int)atoi(optarg);
+				break;
+			case 'n':
+				num_packets = (unsigned long )atol(optarg);
+				break;
+			case 'f':
+				num_fids = (unsigned int)atoi(optarg);
+				break;
+			case 'p':
+				num_priorities = (unsigned int)atoi(optarg);
+				break;
+			case 'o':
+				sched_type = RTE_SCHED_TYPE_ORDERED;
+				break;
+			case 'q':
+				quiet = 1;
+				break;
+			default:
+				usage();
+		}
+	}
+	if (num_workers == 0)
+		usage();
+}
+
+/*
+ * Initializes a given port using global settings and with the RX buffers
+ * coming from the mbuf_pool passed as a parameter.
+ */
+static inline int
+port_init(uint8_t port, struct rte_mempool *mbuf_pool)
+{
+	static const struct rte_eth_conf port_conf_default = {
+		.rxmode = { .max_rx_pkt_len = ETHER_MAX_LEN }
+	};
+	const uint16_t rx_rings = 1, tx_rings = 1;
+	const uint16_t rx_ring_size = 512, tx_ring_size = 512;
+	struct rte_eth_conf port_conf = port_conf_default;
+	int retval;
+	uint16_t q;
+
+	if (port >= rte_eth_dev_count())
+		return -1;
+
+	/* Configure the Ethernet device. */
+	retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
+	if (retval != 0)
+		return retval;
+
+	/* Allocate and set up 1 RX queue per Ethernet port. */
+	for (q = 0; q < rx_rings; q++) {
+		retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
+				rte_eth_dev_socket_id(port), NULL, mbuf_pool);
+		if (retval < 0)
+			return retval;
+	}
+
+	/* Allocate and set up 1 TX queue per Ethernet port. */
+	for (q = 0; q < tx_rings; q++) {
+		retval = rte_eth_tx_queue_setup(port, q, tx_ring_size,
+				rte_eth_dev_socket_id(port), NULL);
+		if (retval < 0)
+			return retval;
+	}
+
+	/* Start the Ethernet port. */
+	retval = rte_eth_dev_start(port);
+	if (retval < 0)
+		return retval;
+
+	/* Display the port MAC address. */
+	struct ether_addr addr;
+	rte_eth_macaddr_get(port, &addr);
+	printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
+			   " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
+			(unsigned)port,
+			addr.addr_bytes[0], addr.addr_bytes[1],
+			addr.addr_bytes[2], addr.addr_bytes[3],
+			addr.addr_bytes[4], addr.addr_bytes[5]);
+
+	/* Enable RX in promiscuous mode for the Ethernet device. */
+	rte_eth_promiscuous_enable(port);
+
+	return 0;
+}
+
+static int
+init_ports(unsigned num_ports)
+{
+	uint8_t portid;
+
+	mp = rte_pktmbuf_pool_create("packet_pool",
+			/* mbufs */ 16384 * num_ports,
+			/* cache_size */ 512,
+			/* priv_size*/ 0,
+			/* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE,
+			rte_socket_id());
+
+	for (portid = 0; portid < num_ports; portid++)
+		if (port_init(portid, mp) != 0)
+			rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n",
+					portid);
+	return 0;
+}
+
+static uint8_t
+setup_event_dev(struct prod_data *prod_data,
+		struct cons_data *cons_data,
+		struct worker_data *worker_data)
+{
+	struct rte_event_dev_config config;
+	struct rte_event_queue_conf queue_config;
+	struct rte_event_port_conf port_config;
+	struct rte_event_queue_link link;
+	int prod_port;
+	int cons_port;
+	int qid0;
+	int cons_qid;
+	int prod_qid;
+	unsigned i;
+	int ret;
+	int8_t id;
+
+	const char *dev_name = "evdev_sw0";
+	id = rte_event_dev_get_dev_id(dev_name);
+	if (id < 0)
+		rte_panic("Failed to get %s device ID\n", dev_name);
+
+	config.nb_event_queues = 3;
+	config.nb_event_ports = num_workers + 2;
+	config.nb_events_limit = 256;
+	config.dequeue_wait_ns = 0;
+
+	ret = rte_event_dev_configure(id, &config);
+	if (ret)
+		rte_panic("Failed to configure the event dev\n");
+
+	/* Create queues */
+	queue_config.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY;
+	queue_config.priority = 0;
+
+	qid0 = 0;
+	ret = rte_event_queue_setup(id, qid0, &queue_config);
+	if (ret < 0)
+		rte_panic("Failed to create the scheduled QID\n");
+
+	queue_config.event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_CONSUMER;
+	queue_config.priority = 0;
+
+	cons_qid = 1;
+	ret = rte_event_queue_setup(id, cons_qid, &queue_config);
+	if (ret < 0)
+		rte_panic("Failed to create the cons directed QID\n");
+
+	queue_config.event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_CONSUMER;
+	queue_config.priority = 0;
+
+	prod_qid = 2;
+	ret = rte_event_queue_setup(id, prod_qid, &queue_config);
+	if (ret < 0)
+		rte_panic("Failed to create the prod directed QID\n");
+
+	/* Create ports */
+#define LB_PORT_DEPTH 16
+#define DIR_PORT_DEPTH 32
+	port_config.enqueue_queue_depth = LB_PORT_DEPTH;
+	port_config.dequeue_queue_depth = LB_PORT_DEPTH;
+	port_config.new_event_threshold = 255;
+
+	prod_port = 0;
+	ret = rte_event_port_setup(id, prod_port, &port_config);
+	if (ret < 0)
+		rte_panic("Failed to create the producer port\n");
+
+	cons_port = 1;
+	port_config.enqueue_queue_depth = DIR_PORT_DEPTH;
+	port_config.dequeue_queue_depth = DIR_PORT_DEPTH;
+	ret = rte_event_port_setup(id, cons_port, &port_config);
+	if (ret < 0)
+		rte_panic("Failed to create the consumer port\n");
+
+	port_config.enqueue_queue_depth = LB_PORT_DEPTH;
+	port_config.dequeue_queue_depth = LB_PORT_DEPTH;
+	for (i = 0; i < num_workers; i++) {
+		worker_data[i].event_port_id = i + 2;
+		ret = rte_event_port_setup(id, worker_data[i].event_port_id, &port_config);
+		if (ret < 0)
+			rte_panic("Failed to create worker port #%d\n", i);
+	}
+
+	/* Map ports/qids */
+	for (i = 0; i < num_workers; i++) {
+		link.queue_id = qid0;
+		link.priority = 0;
+
+		ret = rte_event_port_link(id, worker_data[i].event_port_id, &link, 1);
+		if (ret != 1)
+			rte_panic("Failed to map worker%d port to qid0\n", i);
+	}
+
+	/* Link consumer port to its QID */
+	link.queue_id = cons_qid;
+	link.priority = 0;
+
+	ret = rte_event_port_link(id, cons_port, &link, 1);
+	if (ret != 1)
+		rte_panic("Failed to map consumer port to cons_qid\n");
+
+	/* Link producer port to its QID */
+	link.queue_id = prod_qid;
+	link.priority = 0;
+
+	ret = rte_event_port_link(id, prod_port, &link, 1);
+	if (ret != 1)
+		rte_panic("Failed to map producer port to prod_qid\n");
+
+	/* Dispatch to slaves */
+	*prod_data = (struct prod_data){.event_dev_id = id,
+					.event_port_id = prod_port,
+					.qid = qid0};
+	*cons_data = (struct cons_data){.event_dev_id = id,
+					.event_port_id = cons_port};
+
+	for (i = 0; i < num_workers; i++) {
+		struct worker_data *w = &worker_data[i];
+		w->event_dev_id = id;
+		w->qid = cons_qid;
+	}
+
+	if (rte_event_dev_start(id) < 0) {
+		printf("%d: Error with start call\n", __LINE__);
+		return -1;
+	}
+
+	return (uint8_t) id;
+}
+
+static void sighndlr(int sig)
+{
+	/* Ctlr-Z to dump stats */
+	if(sig == SIGTSTP) {
+		rte_mempool_dump(stdout, mp);
+		rte_event_dev_dump(stdout, 0);
+	}
+	/* Ctlr-C to exit */
+	if(sig == SIGINT)
+		rte_exit(0, "sigint arrived, quitting\n");
+}
+
+int
+main(int argc, char **argv)
+{
+	signal(SIGINT , sighndlr);
+	signal(SIGTSTP, sighndlr);
+
+	struct prod_data prod_data = {0};
+	struct cons_data cons_data = {0};
+	struct worker_data *worker_data;
+	unsigned nworkers = 0;
+	unsigned num_ports;
+	int lcore_id;
+	int err;
+	int has_prod = 0;
+	int has_cons = 0;
+	int has_scheduler = 0;
+
+	err = rte_eal_init(argc, argv);
+	if (err < 0)
+		rte_panic("Invalid EAL arguments\n");
+
+	argc -= err;
+	argv += err;
+
+	/* Parse cli options*/
+	parse_app_args(argc, argv);
+
+	num_ports = rte_eth_dev_count();
+	if (num_ports == 0)
+		rte_panic("No ethernet ports found\n");
+
+	if (!quiet) {
+		printf("  Config:\n");
+		printf("\tports: %u\n", num_ports);
+		printf("\tworkers: %u\n", num_workers);
+		printf("\tpackets: %lu\n", num_packets);
+		printf("\tflows: %u\n", num_fids);
+		printf("\tpriorities: %u\n", num_priorities);
+		if (sched_type == RTE_SCHED_TYPE_ORDERED)
+			printf("\tqid0 type: ordered\n");
+		if (sched_type == RTE_SCHED_TYPE_ATOMIC)
+			printf("\tqid0 type: atomic\n");
+		printf("\n");
+	}
+
+	const unsigned cores_needed = num_workers +
+			/*main*/1 +
+			/*sched*/1 +
+			/*TX*/1 +
+			/*RX*/1;
+
+	if (!quiet) {
+		printf("Number of cores available: %u\n", rte_lcore_count());
+		printf("Number of cores to be used: %u\n", cores_needed);
+	}
+
+	if (rte_lcore_count() < cores_needed)
+		rte_panic("Too few cores\n");
+
+	const uint8_t ndevs = rte_event_dev_count();
+	if (ndevs == 0)
+		rte_panic("No event devs found. Do you need to pass in a --vdev flag?\n");
+	if (ndevs > 1)
+		fprintf(stderr, "Warning: More than one event dev, using idx 0");
+
+	worker_data = rte_calloc(0, num_workers, sizeof(worker_data[0]), 0);
+	if (worker_data == NULL)
+		rte_panic("rte_calloc failed\n");
+
+	uint8_t id = setup_event_dev(&prod_data, &cons_data, worker_data);
+	RTE_SET_USED(id);
+
+	prod_data.num_ports = num_ports;
+	init_ports(num_ports);
+
+	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
+		if (has_prod && has_cons && has_scheduler && nworkers == num_workers)
+			break;
+
+		if (!has_scheduler) {
+			err = rte_eal_remote_launch(scheduler, NULL, lcore_id);
+			if (err)
+				rte_panic("Failed to launch scheduler\n");
+
+			has_scheduler = 1;
+			continue;
+		}
+
+		if (nworkers < num_workers) {
+			err = rte_eal_remote_launch(worker, &worker_data[nworkers], lcore_id);
+			if (err)
+				rte_panic("Failed to launch worker%d\n", nworkers);
+			nworkers++;
+			continue;
+		}
+
+		if (!has_cons) {
+			err = rte_eal_remote_launch(consumer, &cons_data, lcore_id);
+			if (err)
+				rte_panic("Failed to launch consumer\n");
+			has_cons = 1;
+			continue;
+		}
+
+		if (!has_prod) {
+			err = rte_eal_remote_launch(producer, &prod_data, lcore_id);
+			if (err)
+				rte_panic("Failed to launch producer\n");
+			has_prod = 1;
+			continue;
+		}
+	}
+
+	rte_eal_mp_wait_lcore();
+
+	/* Cleanup done automatically by kernel on app exit */
+
+	return 0;
+}