[dpdk-dev,1/2] eventdev: add implicit release disable capability

Message ID 1512015636-31878-2-git-send-email-gage.eads@intel.com
State Superseded, archived
Delegated to: Jerin Jacob
Headers show

Checks

Context Check Description
ci/Intel-compilation success Compilation OK
ci/checkpatch success coding style OK

Commit Message

Eads, Gage Nov. 30, 2017, 4:20 a.m.
This commit introduces a capability for disabling the "implicit" release
functionality for a port, which prevents the eventdev PMD from issuing
outstanding releases for previously dequeued events when dequeuing a new
batch of events.

If a PMD does not support this capability, the application will receive an
error if it attempts to setup a port with implicit releases disabled.
Otherwise, if the port is configured with implicit releases disabled, the
application must release each dequeued event by invoking
rte_event_enqueue_burst() with RTE_EVENT_OP_RELEASE or
RTE_EVENT_OP_FORWARD.

Signed-off-by: Gage Eads <gage.eads@intel.com>
---
 drivers/event/dpaa2/dpaa2_eventdev.c       |  2 ++
 drivers/event/octeontx/ssovf_evdev.c       |  1 +
 drivers/event/skeleton/skeleton_eventdev.c |  1 +
 drivers/event/sw/sw_evdev.c                | 10 +++++++---
 drivers/event/sw/sw_evdev.h                |  1 +
 drivers/event/sw/sw_evdev_worker.c         | 16 ++++++++--------
 examples/eventdev_pipeline_sw_pmd/main.c   | 24 +++++++++++++++++++++++-
 lib/librte_eventdev/rte_eventdev.c         |  9 +++++++++
 lib/librte_eventdev/rte_eventdev.h         | 23 ++++++++++++++++++++---
 test/test/test_eventdev.c                  |  9 +++++++++
 test/test/test_eventdev_sw.c               | 20 ++++++++++++++++++--
 11 files changed, 99 insertions(+), 17 deletions(-)

Comments

Van Haaren, Harry Dec. 11, 2017, 12:36 p.m. | #1
> -----Original Message-----
> From: Eads, Gage
> Sent: Thursday, November 30, 2017 4:21 AM
> To: dev@dpdk.org
> Cc: jerin.jacob@caviumnetworks.com; Van Haaren, Harry
> <harry.van.haaren@intel.com>; Richardson, Bruce
> <bruce.richardson@intel.com>; hemant.agrawal@nxp.com; nipun.gupta@nxp.com;
> santosh.shukla@caviumnetworks.com
> Subject: [PATCH 1/2] eventdev: add implicit release disable capability
> 
> This commit introduces a capability for disabling the "implicit" release
> functionality for a port, which prevents the eventdev PMD from issuing
> outstanding releases for previously dequeued events when dequeuing a new
> batch of events.
> 
> If a PMD does not support this capability, the application will receive an
> error if it attempts to setup a port with implicit releases disabled.
> Otherwise, if the port is configured with implicit releases disabled, the
> application must release each dequeued event by invoking
> rte_event_enqueue_burst() with RTE_EVENT_OP_RELEASE or
> RTE_EVENT_OP_FORWARD.
> 
> Signed-off-by: Gage Eads <gage.eads@intel.com>

Some comments inline. In general, I think this makes sense, and is the easiest solution that I can see.


>  drivers/event/dpaa2/dpaa2_eventdev.c       |  2 ++
>  drivers/event/octeontx/ssovf_evdev.c       |  1 +
>  drivers/event/skeleton/skeleton_eventdev.c |  1 +
>  drivers/event/sw/sw_evdev.c                | 10 +++++++---
>  drivers/event/sw/sw_evdev.h                |  1 +
>  drivers/event/sw/sw_evdev_worker.c         | 16 ++++++++--------
>  examples/eventdev_pipeline_sw_pmd/main.c   | 24 +++++++++++++++++++++++-
>  lib/librte_eventdev/rte_eventdev.c         |  9 +++++++++
>  lib/librte_eventdev/rte_eventdev.h         | 23 ++++++++++++++++++++---
>  test/test/test_eventdev.c                  |  9 +++++++++
>  test/test/test_eventdev_sw.c               | 20 ++++++++++++++++++--
>  11 files changed, 99 insertions(+), 17 deletions(-)
> 
> diff --git a/drivers/event/dpaa2/dpaa2_eventdev.c
> b/drivers/event/dpaa2/dpaa2_eventdev.c
> index eeeb231..236b211 100644
> --- a/drivers/event/dpaa2/dpaa2_eventdev.c
> +++ b/drivers/event/dpaa2/dpaa2_eventdev.c
> @@ -440,6 +440,8 @@ dpaa2_eventdev_port_def_conf(struct rte_eventdev *dev,
> uint8_t port_id,
>  		DPAA2_EVENT_MAX_PORT_DEQUEUE_DEPTH;
>  	port_conf->enqueue_depth =
>  		DPAA2_EVENT_MAX_PORT_ENQUEUE_DEPTH;
> +	port_conf->disable_implicit_release =
> +		0;

Merge "0;" onto previous line?

<snip>

> --- a/drivers/event/skeleton/skeleton_eventdev.c
> +++ b/drivers/event/skeleton/skeleton_eventdev.c
> @@ -237,6 +237,7 @@ skeleton_eventdev_port_def_conf(struct rte_eventdev
> *dev, uint8_t port_id,
>  	port_conf->new_event_threshold = 32 * 1024;
>  	port_conf->dequeue_depth = 16;
>  	port_conf->enqueue_depth = 16;
> +	port_conf->disable_implicit_release = false;

Prefer 0 to false.

<snip>

> diff --git a/examples/eventdev_pipeline_sw_pmd/main.c
> b/examples/eventdev_pipeline_sw_pmd/main.c
> index 5f431d8..3910b53 100644
> --- a/examples/eventdev_pipeline_sw_pmd/main.c
> +++ b/examples/eventdev_pipeline_sw_pmd/main.c
> @@ -62,6 +62,7 @@ struct prod_data {
>  struct cons_data {
>  	uint8_t dev_id;
>  	uint8_t port_id;
> +	bool release;

I'd personally uint8_t this instead of bool, which requires <stdbool.h>. I haven't seen stdbool.h in other DPDK headers, so suggesting stick with the good old byte-sized integers for flags.. 


>  } __rte_cache_aligned;
> 
>  static struct prod_data prod_data;
> @@ -167,6 +168,18 @@ consumer(void)
>  		uint8_t outport = packets[i].mbuf->port;
>  		rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
>  				packets[i].mbuf);
> +
> +		packets[i].op = RTE_EVENT_OP_RELEASE;
> +	}
> +
> +	if (cons_data.release) {
> +		uint16_t nb_tx;
> +
> +		nb_tx = rte_event_enqueue_burst(dev_id, port_id, packets, n);
> +		while (nb_tx < n)
> +			nb_tx += rte_event_enqueue_burst(dev_id, port_id,
> +							 packets + nb_tx,
> +							 n - nb_tx);
>  	}
> 
>  	/* Print out mpps every 1<22 packets */
> @@ -702,6 +715,7 @@ setup_eventdev(struct prod_data *prod_data,
>  	};
> 
>  	struct port_link worker_queues[MAX_NUM_STAGES];
> +	bool disable_implicit_release;

Same uint8_t over stdbool.h comment as above


> @@ -3240,7 +3244,7 @@ test_sw_eventdev(void)
>  	if (rte_lcore_count() >= 3) {
>  		printf("*** Running Worker loopback test...\n");
> -		ret = worker_loopback(t);
> +		ret = worker_loopback(t, 0);
>  		if (ret != 0) {
>  			printf("ERROR - Worker loopback test FAILED.\n");
>  			return ret;
> @@ -3249,6 +3253,18 @@ test_sw_eventdev(void)
>  		printf("### Not enough cores for worker loopback test.\n");
>  		printf("### Need at least 3 cores for test.\n");
>  	}
> +	if (rte_lcore_count() >= 3) {
> +		printf("*** Running Worker loopback test (implicit release
> disabled)...\n");
> +		ret = worker_loopback(t, 1);
> +		if (ret != 0) {
> +			printf("ERROR - Worker loopback test FAILED.\n");
> +			return ret;
> +		}
> +	} else {
> +		printf("### Not enough cores for worker loopback test.\n");
> +		printf("### Need at least 3 cores for test.\n");
> +	}

The double if (count >= 3) here looks like it could be removed and the loopback(t, 1) added to the first if()- but otherwise the logic is fine.


With the above changes, this looks good to me!

Acked-by: Harry van Haaren <harry.van.haaren@intel.com>
Eads, Gage Dec. 11, 2017, 5:39 p.m. | #2
Sure, will make all suggested changes in v2.

> -----Original Message-----
> From: Van Haaren, Harry
> Sent: Monday, December 11, 2017 6:36 AM
> To: Eads, Gage <gage.eads@intel.com>; dev@dpdk.org
> Cc: jerin.jacob@caviumnetworks.com; Richardson, Bruce
> <bruce.richardson@intel.com>; hemant.agrawal@nxp.com;
> nipun.gupta@nxp.com; santosh.shukla@caviumnetworks.com
> Subject: RE: [PATCH 1/2] eventdev: add implicit release disable capability
> 
> > -----Original Message-----
> > From: Eads, Gage
> > Sent: Thursday, November 30, 2017 4:21 AM
> > To: dev@dpdk.org
> > Cc: jerin.jacob@caviumnetworks.com; Van Haaren, Harry
> > <harry.van.haaren@intel.com>; Richardson, Bruce
> > <bruce.richardson@intel.com>; hemant.agrawal@nxp.com;
> > nipun.gupta@nxp.com; santosh.shukla@caviumnetworks.com
> > Subject: [PATCH 1/2] eventdev: add implicit release disable capability
> >
> > This commit introduces a capability for disabling the "implicit"
> > release functionality for a port, which prevents the eventdev PMD from
> > issuing outstanding releases for previously dequeued events when
> > dequeuing a new batch of events.
> >
> > If a PMD does not support this capability, the application will
> > receive an error if it attempts to setup a port with implicit releases disabled.
> > Otherwise, if the port is configured with implicit releases disabled,
> > the application must release each dequeued event by invoking
> > rte_event_enqueue_burst() with RTE_EVENT_OP_RELEASE or
> > RTE_EVENT_OP_FORWARD.
> >
> > Signed-off-by: Gage Eads <gage.eads@intel.com>
> 
> Some comments inline. In general, I think this makes sense, and is the easiest
> solution that I can see.
> 
> 
> >  drivers/event/dpaa2/dpaa2_eventdev.c       |  2 ++
> >  drivers/event/octeontx/ssovf_evdev.c       |  1 +
> >  drivers/event/skeleton/skeleton_eventdev.c |  1 +
> >  drivers/event/sw/sw_evdev.c                | 10 +++++++---
> >  drivers/event/sw/sw_evdev.h                |  1 +
> >  drivers/event/sw/sw_evdev_worker.c         | 16 ++++++++--------
> >  examples/eventdev_pipeline_sw_pmd/main.c   | 24
> +++++++++++++++++++++++-
> >  lib/librte_eventdev/rte_eventdev.c         |  9 +++++++++
> >  lib/librte_eventdev/rte_eventdev.h         | 23 ++++++++++++++++++++---
> >  test/test/test_eventdev.c                  |  9 +++++++++
> >  test/test/test_eventdev_sw.c               | 20 ++++++++++++++++++--
> >  11 files changed, 99 insertions(+), 17 deletions(-)
> >
> > diff --git a/drivers/event/dpaa2/dpaa2_eventdev.c
> > b/drivers/event/dpaa2/dpaa2_eventdev.c
> > index eeeb231..236b211 100644
> > --- a/drivers/event/dpaa2/dpaa2_eventdev.c
> > +++ b/drivers/event/dpaa2/dpaa2_eventdev.c
> > @@ -440,6 +440,8 @@ dpaa2_eventdev_port_def_conf(struct rte_eventdev
> > *dev, uint8_t port_id,
> >  		DPAA2_EVENT_MAX_PORT_DEQUEUE_DEPTH;
> >  	port_conf->enqueue_depth =
> >  		DPAA2_EVENT_MAX_PORT_ENQUEUE_DEPTH;
> > +	port_conf->disable_implicit_release =
> > +		0;
> 
> Merge "0;" onto previous line?
> 
> <snip>
> 
> > --- a/drivers/event/skeleton/skeleton_eventdev.c
> > +++ b/drivers/event/skeleton/skeleton_eventdev.c
> > @@ -237,6 +237,7 @@ skeleton_eventdev_port_def_conf(struct
> > rte_eventdev *dev, uint8_t port_id,
> >  	port_conf->new_event_threshold = 32 * 1024;
> >  	port_conf->dequeue_depth = 16;
> >  	port_conf->enqueue_depth = 16;
> > +	port_conf->disable_implicit_release = false;
> 
> Prefer 0 to false.
> 
> <snip>
> 
> > diff --git a/examples/eventdev_pipeline_sw_pmd/main.c
> > b/examples/eventdev_pipeline_sw_pmd/main.c
> > index 5f431d8..3910b53 100644
> > --- a/examples/eventdev_pipeline_sw_pmd/main.c
> > +++ b/examples/eventdev_pipeline_sw_pmd/main.c
> > @@ -62,6 +62,7 @@ struct prod_data {
> >  struct cons_data {
> >  	uint8_t dev_id;
> >  	uint8_t port_id;
> > +	bool release;
> 
> I'd personally uint8_t this instead of bool, which requires <stdbool.h>. I haven't
> seen stdbool.h in other DPDK headers, so suggesting stick with the good old
> byte-sized integers for flags..
> 
> 
> >  } __rte_cache_aligned;
> >
> >  static struct prod_data prod_data;
> > @@ -167,6 +168,18 @@ consumer(void)
> >  		uint8_t outport = packets[i].mbuf->port;
> >  		rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
> >  				packets[i].mbuf);
> > +
> > +		packets[i].op = RTE_EVENT_OP_RELEASE;
> > +	}
> > +
> > +	if (cons_data.release) {
> > +		uint16_t nb_tx;
> > +
> > +		nb_tx = rte_event_enqueue_burst(dev_id, port_id, packets, n);
> > +		while (nb_tx < n)
> > +			nb_tx += rte_event_enqueue_burst(dev_id, port_id,
> > +							 packets + nb_tx,
> > +							 n - nb_tx);
> >  	}
> >
> >  	/* Print out mpps every 1<22 packets */ @@ -702,6 +715,7 @@
> > setup_eventdev(struct prod_data *prod_data,
> >  	};
> >
> >  	struct port_link worker_queues[MAX_NUM_STAGES];
> > +	bool disable_implicit_release;
> 
> Same uint8_t over stdbool.h comment as above
> 
> 
> > @@ -3240,7 +3244,7 @@ test_sw_eventdev(void)
> >  	if (rte_lcore_count() >= 3) {
> >  		printf("*** Running Worker loopback test...\n");
> > -		ret = worker_loopback(t);
> > +		ret = worker_loopback(t, 0);
> >  		if (ret != 0) {
> >  			printf("ERROR - Worker loopback test FAILED.\n");
> >  			return ret;
> > @@ -3249,6 +3253,18 @@ test_sw_eventdev(void)
> >  		printf("### Not enough cores for worker loopback test.\n");
> >  		printf("### Need at least 3 cores for test.\n");
> >  	}
> > +	if (rte_lcore_count() >= 3) {
> > +		printf("*** Running Worker loopback test (implicit release
> > disabled)...\n");
> > +		ret = worker_loopback(t, 1);
> > +		if (ret != 0) {
> > +			printf("ERROR - Worker loopback test FAILED.\n");
> > +			return ret;
> > +		}
> > +	} else {
> > +		printf("### Not enough cores for worker loopback test.\n");
> > +		printf("### Need at least 3 cores for test.\n");
> > +	}
> 
> The double if (count >= 3) here looks like it could be removed and the
> loopback(t, 1) added to the first if()- but otherwise the logic is fine.
> 
> 
> With the above changes, this looks good to me!
> 
> Acked-by: Harry van Haaren <harry.van.haaren@intel.com>

Patch

diff --git a/drivers/event/dpaa2/dpaa2_eventdev.c b/drivers/event/dpaa2/dpaa2_eventdev.c
index eeeb231..236b211 100644
--- a/drivers/event/dpaa2/dpaa2_eventdev.c
+++ b/drivers/event/dpaa2/dpaa2_eventdev.c
@@ -440,6 +440,8 @@  dpaa2_eventdev_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
 		DPAA2_EVENT_MAX_PORT_DEQUEUE_DEPTH;
 	port_conf->enqueue_depth =
 		DPAA2_EVENT_MAX_PORT_ENQUEUE_DEPTH;
+	port_conf->disable_implicit_release =
+		0;
 }
 
 static void
diff --git a/drivers/event/octeontx/ssovf_evdev.c b/drivers/event/octeontx/ssovf_evdev.c
index 117b145..b80a6c0 100644
--- a/drivers/event/octeontx/ssovf_evdev.c
+++ b/drivers/event/octeontx/ssovf_evdev.c
@@ -252,6 +252,7 @@  ssovf_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
 	port_conf->new_event_threshold = edev->max_num_events;
 	port_conf->dequeue_depth = 1;
 	port_conf->enqueue_depth = 1;
+	port_conf->disable_implicit_release = 0;
 }
 
 static void
diff --git a/drivers/event/skeleton/skeleton_eventdev.c b/drivers/event/skeleton/skeleton_eventdev.c
index bb554c3..8b62361 100644
--- a/drivers/event/skeleton/skeleton_eventdev.c
+++ b/drivers/event/skeleton/skeleton_eventdev.c
@@ -237,6 +237,7 @@  skeleton_eventdev_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
 	port_conf->new_event_threshold = 32 * 1024;
 	port_conf->dequeue_depth = 16;
 	port_conf->enqueue_depth = 16;
+	port_conf->disable_implicit_release = false;
 }
 
 static void
diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c
index fd11079..64b02d1 100644
--- a/drivers/event/sw/sw_evdev.c
+++ b/drivers/event/sw/sw_evdev.c
@@ -181,6 +181,7 @@  sw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
 	}
 
 	p->inflight_max = conf->new_event_threshold;
+	p->implicit_release = !conf->disable_implicit_release;
 
 	/* check if ring exists, same as rx_worker above */
 	snprintf(buf, sizeof(buf), "sw%d_p%u, %s", dev->data->dev_id,
@@ -402,6 +403,7 @@  sw_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
 	port_conf->new_event_threshold = 1024;
 	port_conf->dequeue_depth = 16;
 	port_conf->enqueue_depth = 16;
+	port_conf->disable_implicit_release = 0;
 }
 
 static int
@@ -450,9 +452,11 @@  sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info)
 			.max_event_port_dequeue_depth = MAX_SW_CONS_Q_DEPTH,
 			.max_event_port_enqueue_depth = MAX_SW_PROD_Q_DEPTH,
 			.max_num_events = SW_INFLIGHT_EVENTS_TOTAL,
-			.event_dev_cap = (RTE_EVENT_DEV_CAP_QUEUE_QOS |
-					RTE_EVENT_DEV_CAP_BURST_MODE |
-					RTE_EVENT_DEV_CAP_EVENT_QOS),
+			.event_dev_cap = (
+				RTE_EVENT_DEV_CAP_QUEUE_QOS |
+				RTE_EVENT_DEV_CAP_BURST_MODE |
+				RTE_EVENT_DEV_CAP_EVENT_QOS |
+				RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE),
 	};
 
 	*info = evdev_sw_info;
diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
index e0dec91..3b5f64f 100644
--- a/drivers/event/sw/sw_evdev.h
+++ b/drivers/event/sw/sw_evdev.h
@@ -201,6 +201,7 @@  struct sw_port {
 	uint16_t outstanding_releases __rte_cache_aligned;
 	uint16_t inflight_max; /* app requested max inflights for this port */
 	uint16_t inflight_credits; /* num credits this port has right now */
+	uint8_t implicit_release; /* release events before dequeueing */
 
 	uint16_t last_dequeue_burst_sz; /* how big the burst was */
 	uint64_t last_dequeue_ticks; /* used to track burst processing time */
diff --git a/drivers/event/sw/sw_evdev_worker.c b/drivers/event/sw/sw_evdev_worker.c
index b3b3b17..93cd29b 100644
--- a/drivers/event/sw/sw_evdev_worker.c
+++ b/drivers/event/sw/sw_evdev_worker.c
@@ -109,7 +109,7 @@  sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 			return 0;
 	}
 
-	uint32_t forwards = 0;
+	uint32_t completions = 0;
 	for (i = 0; i < num; i++) {
 		int op = ev[i].op;
 		int outstanding = p->outstanding_releases > 0;
@@ -118,7 +118,6 @@  sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 		p->inflight_credits -= (op == RTE_EVENT_OP_NEW);
 		p->inflight_credits += (op == RTE_EVENT_OP_RELEASE) *
 					outstanding;
-		forwards += (op == RTE_EVENT_OP_FORWARD);
 
 		new_ops[i] = sw_qe_flag_map[op];
 		new_ops[i] &= ~(invalid_qid << QE_FLAG_VALID_SHIFT);
@@ -127,8 +126,10 @@  sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 		 * correct usage of the API), providing very high correct
 		 * prediction rate.
 		 */
-		if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding)
+		if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding) {
 			p->outstanding_releases--;
+			completions++;
+		}
 
 		/* error case: branch to avoid touching p->stats */
 		if (unlikely(invalid_qid)) {
@@ -137,8 +138,8 @@  sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
 		}
 	}
 
-	/* handle directed port forward credits */
-	p->inflight_credits -= forwards * p->is_directed;
+	/* handle directed port forward and release credits */
+	p->inflight_credits -= completions * p->is_directed;
 
 	/* returns number of events actually enqueued */
 	uint32_t enq = enqueue_burst_with_ops(p->rx_worker_ring, ev, i,
@@ -172,7 +173,7 @@  sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
 	uint32_t credit_update_quanta = sw->credit_update_quanta;
 
 	/* check that all previous dequeues have been released */
-	if (!p->is_directed) {
+	if (p->implicit_release && !p->is_directed) {
 		uint16_t out_rels = p->outstanding_releases;
 		uint16_t i;
 		for (i = 0; i < out_rels; i++)
@@ -182,7 +183,6 @@  sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
 	/* returns number of events actually dequeued */
 	uint16_t ndeq = rte_event_ring_dequeue_burst(ring, ev, num, NULL);
 	if (unlikely(ndeq == 0)) {
-		p->outstanding_releases = 0;
 		p->zero_polls++;
 		p->total_polls++;
 		goto end;
@@ -190,7 +190,7 @@  sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
 
 	/* only add credits for directed ports - LB ports send RELEASEs */
 	p->inflight_credits += ndeq * p->is_directed;
-	p->outstanding_releases = ndeq;
+	p->outstanding_releases += ndeq;
 	p->last_dequeue_burst_sz = ndeq;
 	p->last_dequeue_ticks = rte_get_timer_cycles();
 	p->poll_buckets[(ndeq - 1) >> SW_DEQ_STAT_BUCKET_SHIFT]++;
diff --git a/examples/eventdev_pipeline_sw_pmd/main.c b/examples/eventdev_pipeline_sw_pmd/main.c
index 5f431d8..3910b53 100644
--- a/examples/eventdev_pipeline_sw_pmd/main.c
+++ b/examples/eventdev_pipeline_sw_pmd/main.c
@@ -62,6 +62,7 @@  struct prod_data {
 struct cons_data {
 	uint8_t dev_id;
 	uint8_t port_id;
+	bool release;
 } __rte_cache_aligned;
 
 static struct prod_data prod_data;
@@ -167,6 +168,18 @@  consumer(void)
 		uint8_t outport = packets[i].mbuf->port;
 		rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
 				packets[i].mbuf);
+
+		packets[i].op = RTE_EVENT_OP_RELEASE;
+	}
+
+	if (cons_data.release) {
+		uint16_t nb_tx;
+
+		nb_tx = rte_event_enqueue_burst(dev_id, port_id, packets, n);
+		while (nb_tx < n)
+			nb_tx += rte_event_enqueue_burst(dev_id, port_id,
+							 packets + nb_tx,
+							 n - nb_tx);
 	}
 
 	/* Print out mpps every 1<22 packets */
@@ -702,6 +715,7 @@  setup_eventdev(struct prod_data *prod_data,
 	};
 
 	struct port_link worker_queues[MAX_NUM_STAGES];
+	bool disable_implicit_release;
 	struct port_link tx_queue;
 	unsigned int i;
 
@@ -715,6 +729,12 @@  setup_eventdev(struct prod_data *prod_data,
 	ret = rte_event_dev_info_get(dev_id, &dev_info);
 	printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
 
+	disable_implicit_release = (dev_info.event_dev_cap &
+				    RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE);
+
+	wkr_p_conf.disable_implicit_release = disable_implicit_release;
+	tx_p_conf.disable_implicit_release = disable_implicit_release;
+
 	if (dev_info.max_event_port_dequeue_depth <
 			config.nb_event_port_dequeue_depth)
 		config.nb_event_port_dequeue_depth =
@@ -823,6 +843,7 @@  setup_eventdev(struct prod_data *prod_data,
 			.dequeue_depth = 8,
 			.enqueue_depth = 8,
 			.new_event_threshold = 1200,
+			.disable_implicit_release = disable_implicit_release,
 	};
 
 	if (rx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
@@ -839,7 +860,8 @@  setup_eventdev(struct prod_data *prod_data,
 					.port_id = i + 1,
 					.qid = cdata.qid[0] };
 	*cons_data = (struct cons_data){.dev_id = dev_id,
-					.port_id = i };
+					.port_id = i,
+					.release = disable_implicit_release };
 
 	ret = rte_event_dev_service_id_get(dev_id,
 				&fdata->evdev_service_id);
diff --git a/lib/librte_eventdev/rte_eventdev.c b/lib/librte_eventdev/rte_eventdev.c
index ce6a5dc..eea8b7a 100644
--- a/lib/librte_eventdev/rte_eventdev.c
+++ b/lib/librte_eventdev/rte_eventdev.c
@@ -686,6 +686,15 @@  rte_event_port_setup(uint8_t dev_id, uint8_t port_id,
 		return -EINVAL;
 	}
 
+	if (port_conf && port_conf->disable_implicit_release &&
+	    !(dev->data->event_dev_cap &
+	      RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE)) {
+		RTE_EDEV_LOG_ERR(
+		   "dev%d port%d Implicit release disable not supported",
+			dev_id, port_id);
+		return -EINVAL;
+	}
+
 	if (dev->data->dev_started) {
 		RTE_EDEV_LOG_ERR(
 		    "device %d must be stopped to allow port setup", dev_id);
diff --git a/lib/librte_eventdev/rte_eventdev.h b/lib/librte_eventdev/rte_eventdev.h
index f1949ff..8377b3f 100644
--- a/lib/librte_eventdev/rte_eventdev.h
+++ b/lib/librte_eventdev/rte_eventdev.h
@@ -282,6 +282,16 @@  struct rte_mbuf; /* we just use mbuf pointers; no need to include rte_mbuf.h */
  *
  * @see rte_event_dequeue_burst() rte_event_enqueue_burst()
  */
+#define RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE    (1ULL << 5)
+/**< Event device ports support disabling the implicit release feature, in
+ * which the port will release all unreleased events in its dequeue operation.
+ * If this capability is set and the port is configured with implicit release
+ * disabled, the application is responsible for explicitly releasing events
+ * using either the RTE_EVENT_OP_FORWARD or the RTE_EVENT_OP_RELEASE event
+ * enqueue operations.
+ *
+ * @see rte_event_dequeue_burst() rte_event_enqueue_burst()
+ */
 
 /* Event device priority levels */
 #define RTE_EVENT_DEV_PRIORITY_HIGHEST   0
@@ -686,6 +696,13 @@  struct rte_event_port_conf {
 	 * which previously supplied to rte_event_dev_configure().
 	 * Ignored when device is not RTE_EVENT_DEV_CAP_BURST_MODE capable.
 	 */
+	uint8_t disable_implicit_release;
+	/**< Configure the port not to release outstanding events in
+	 * rte_event_dev_dequeue_burst(). If true, all events received through
+	 * the port must be explicitly released with RTE_EVENT_OP_RELEASE or
+	 * RTE_EVENT_OP_FORWARD. Must be false when the device is not
+	 * RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE capable.
+	 */
 };
 
 /**
@@ -1381,9 +1398,9 @@  rte_event_dequeue_timeout_ticks(uint8_t dev_id, uint64_t ns,
  *
  * The number of events dequeued is the number of scheduler contexts held by
  * this port. These contexts are automatically released in the next
- * rte_event_dequeue_burst() invocation, or invoking rte_event_enqueue_burst()
- * with RTE_EVENT_OP_RELEASE operation can be used to release the
- * contexts early.
+ * rte_event_dequeue_burst() invocation if the port supports implicit
+ * releases, or invoking rte_event_enqueue_burst() with RTE_EVENT_OP_RELEASE
+ * operation can be used to release the contexts early.
  *
  * Event operations RTE_EVENT_OP_FORWARD and RTE_EVENT_OP_RELEASE must only be
  * enqueued to the same port that their associated events were dequeued from.
diff --git a/test/test/test_eventdev.c b/test/test/test_eventdev.c
index ba39cba..85fe631 100644
--- a/test/test/test_eventdev.c
+++ b/test/test/test_eventdev.c
@@ -581,6 +581,15 @@  test_eventdev_port_setup(void)
 	ret = rte_event_port_setup(TEST_DEV_ID, 0, &pconf);
 	TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret);
 
+	if (!(info.event_dev_cap &
+	      RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE)) {
+		pconf.enqueue_depth = info.max_event_port_enqueue_depth;
+		pconf.disable_implicit_release = 1;
+		ret = rte_event_port_setup(TEST_DEV_ID, 0, &pconf);
+		TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret);
+		pconf.disable_implicit_release = 0;
+	}
+
 	ret = rte_event_port_setup(TEST_DEV_ID, info.max_event_ports,
 					&pconf);
 	TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret);
diff --git a/test/test/test_eventdev_sw.c b/test/test/test_eventdev_sw.c
index f524b6f..0f99160 100644
--- a/test/test/test_eventdev_sw.c
+++ b/test/test/test_eventdev_sw.c
@@ -200,6 +200,7 @@  create_ports(struct test *t, int num_ports)
 			.new_event_threshold = 1024,
 			.dequeue_depth = 32,
 			.enqueue_depth = 64,
+			.disable_implicit_release = 0,
 	};
 	if (num_ports > MAX_PORTS)
 		return -1;
@@ -1256,6 +1257,7 @@  port_reconfig_credits(struct test *t)
 				.new_event_threshold = 128,
 				.dequeue_depth = 32,
 				.enqueue_depth = 64,
+				.disable_implicit_release = 0,
 		};
 		if (rte_event_port_setup(evdev, 0, &port_conf) < 0) {
 			printf("%d Error setting up port\n", __LINE__);
@@ -1345,6 +1347,7 @@  port_single_lb_reconfig(struct test *t)
 		.new_event_threshold = 128,
 		.dequeue_depth = 32,
 		.enqueue_depth = 64,
+		.disable_implicit_release = 0,
 	};
 	if (rte_event_port_setup(evdev, 0, &port_conf) < 0) {
 		printf("%d Error setting up port\n", __LINE__);
@@ -2940,7 +2943,7 @@  worker_loopback_producer_fn(void *arg)
 }
 
 static int
-worker_loopback(struct test *t)
+worker_loopback(struct test *t, uint8_t disable_implicit_release)
 {
 	/* use a single producer core, and a worker core to see what happens
 	 * if the worker loops packets back multiple times
@@ -2966,6 +2969,7 @@  worker_loopback(struct test *t)
 	 * only be initialized once - and this needs to be set for multiple runs
 	 */
 	conf.new_event_threshold = 512;
+	conf.disable_implicit_release = disable_implicit_release;
 
 	if (rte_event_port_setup(evdev, 0, &conf) < 0) {
 		printf("Error setting up RX port\n");
@@ -3240,7 +3244,7 @@  test_sw_eventdev(void)
 	}
 	if (rte_lcore_count() >= 3) {
 		printf("*** Running Worker loopback test...\n");
-		ret = worker_loopback(t);
+		ret = worker_loopback(t, 0);
 		if (ret != 0) {
 			printf("ERROR - Worker loopback test FAILED.\n");
 			return ret;
@@ -3249,6 +3253,18 @@  test_sw_eventdev(void)
 		printf("### Not enough cores for worker loopback test.\n");
 		printf("### Need at least 3 cores for test.\n");
 	}
+	if (rte_lcore_count() >= 3) {
+		printf("*** Running Worker loopback test (implicit release disabled)...\n");
+		ret = worker_loopback(t, 1);
+		if (ret != 0) {
+			printf("ERROR - Worker loopback test FAILED.\n");
+			return ret;
+		}
+	} else {
+		printf("### Not enough cores for worker loopback test.\n");
+		printf("### Need at least 3 cores for test.\n");
+	}
+
 	/*
 	 * Free test instance, leaving mempool initialized, and a pointer to it
 	 * in static eventdev_func_mempool, as it is re-used on re-runs