[v5,04/15] distributor: handle worker shutdown in burst mode

Message ID 20201008052323.11547-5-l.wojciechow@partner.samsung.com (mailing list archive)
State Superseded, archived
Delegated to: David Marchand
Headers
Series fix distributor synchronization issues |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Lukasz Wojciechowski Oct. 8, 2020, 5:23 a.m. UTC
  The burst version of distributor implementation was missing proper
handling of worker shutdown. A worker processing packets received
from distributor can call rte_distributor_return_pkt() function
informing distributor that it want no more packets. Further calls to
rte_distributor_request_pkt() or rte_distributor_get_pkt() however
should inform distributor that new packets are requested again.

Lack of the proper implementation has caused that even after worker
informed about returning last packets, new packets were still sent
from distributor causing deadlocks as no one could get them on worker
side.

This patch adds handling shutdown of the worker in following way:
1) It fixes usage of RTE_DISTRIB_VALID_BUF handshake flag. This flag
was formerly unused in burst implementation and now it is used
for marking valid packets in retptr64 replacing invalid use
of RTE_DISTRIB_RETURN_BUF flag.
2) Uses RTE_DISTRIB_RETURN_BUF as a worker to distributor handshake
in retptr64 to indicate that worker has shutdown.
3) Worker that shuts down blocks also bufptr for itself with
RTE_DISTRIB_RETURN_BUF flag allowing distributor to retrieve any
in flight packets.
4) When distributor receives information about shutdown of a worker,
it: marks worker as not active; retrieves any in flight and backlog
packets and process them to different workers; unlocks bufptr64
by clearing RTE_DISTRIB_RETURN_BUF flag and allowing use in
the future if worker requests any new packages.
5) Do not allow to: send or add to backlog any packets for not
active workers. Such workers are also ignored if matched.
6) Adjust calls to handle_returns() and tags matching procedure
to react for possible activation deactivation of workers.

Fixes: 775003ad2f96 ("distributor: add new burst-capable library")
Cc: david.hunt@intel.com
Cc: stable@dpdk.org

Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
---
 lib/librte_distributor/distributor_private.h |   3 +
 lib/librte_distributor/rte_distributor.c     | 175 +++++++++++++++----
 2 files changed, 146 insertions(+), 32 deletions(-)
  

Comments

Hunt, David Oct. 8, 2020, 2:26 p.m. UTC | #1
On 8/10/2020 6:23 AM, Lukasz Wojciechowski wrote:
> The burst version of distributor implementation was missing proper
> handling of worker shutdown. A worker processing packets received
> from distributor can call rte_distributor_return_pkt() function
> informing distributor that it want no more packets. Further calls to
> rte_distributor_request_pkt() or rte_distributor_get_pkt() however
> should inform distributor that new packets are requested again.
>
> Lack of the proper implementation has caused that even after worker
> informed about returning last packets, new packets were still sent
> from distributor causing deadlocks as no one could get them on worker
> side.
>
> This patch adds handling shutdown of the worker in following way:
> 1) It fixes usage of RTE_DISTRIB_VALID_BUF handshake flag. This flag
> was formerly unused in burst implementation and now it is used
> for marking valid packets in retptr64 replacing invalid use
> of RTE_DISTRIB_RETURN_BUF flag.
> 2) Uses RTE_DISTRIB_RETURN_BUF as a worker to distributor handshake
> in retptr64 to indicate that worker has shutdown.
> 3) Worker that shuts down blocks also bufptr for itself with
> RTE_DISTRIB_RETURN_BUF flag allowing distributor to retrieve any
> in flight packets.
> 4) When distributor receives information about shutdown of a worker,
> it: marks worker as not active; retrieves any in flight and backlog
> packets and process them to different workers; unlocks bufptr64
> by clearing RTE_DISTRIB_RETURN_BUF flag and allowing use in
> the future if worker requests any new packages.
> 5) Do not allow to: send or add to backlog any packets for not
> active workers. Such workers are also ignored if matched.
> 6) Adjust calls to handle_returns() and tags matching procedure
> to react for possible activation deactivation of workers.
>
> Fixes: 775003ad2f96 ("distributor: add new burst-capable library")
> Cc: david.hunt@intel.com
> Cc: stable@dpdk.org
>
> Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
> ---


Hi Lukasz,

    I spent the most amount of time going through this particular patch, 
and it looks good to me (even the bit where rte_distributor_process is 
called recursively) :)

I'll try and get some time to run through some more testing, but for now:

Acked-by: David Hunt <david.hunt@intel.com>
  
Lukasz Wojciechowski Oct. 8, 2020, 9:07 p.m. UTC | #2
W dniu 08.10.2020 o 16:26, David Hunt pisze:
>
> On 8/10/2020 6:23 AM, Lukasz Wojciechowski wrote:
>> The burst version of distributor implementation was missing proper
>> handling of worker shutdown. A worker processing packets received
>> from distributor can call rte_distributor_return_pkt() function
>> informing distributor that it want no more packets. Further calls to
>> rte_distributor_request_pkt() or rte_distributor_get_pkt() however
>> should inform distributor that new packets are requested again.
>>
>> Lack of the proper implementation has caused that even after worker
>> informed about returning last packets, new packets were still sent
>> from distributor causing deadlocks as no one could get them on worker
>> side.
>>
>> This patch adds handling shutdown of the worker in following way:
>> 1) It fixes usage of RTE_DISTRIB_VALID_BUF handshake flag. This flag
>> was formerly unused in burst implementation and now it is used
>> for marking valid packets in retptr64 replacing invalid use
>> of RTE_DISTRIB_RETURN_BUF flag.
>> 2) Uses RTE_DISTRIB_RETURN_BUF as a worker to distributor handshake
>> in retptr64 to indicate that worker has shutdown.
>> 3) Worker that shuts down blocks also bufptr for itself with
>> RTE_DISTRIB_RETURN_BUF flag allowing distributor to retrieve any
>> in flight packets.
>> 4) When distributor receives information about shutdown of a worker,
>> it: marks worker as not active; retrieves any in flight and backlog
>> packets and process them to different workers; unlocks bufptr64
>> by clearing RTE_DISTRIB_RETURN_BUF flag and allowing use in
>> the future if worker requests any new packages.
>> 5) Do not allow to: send or add to backlog any packets for not
>> active workers. Such workers are also ignored if matched.
>> 6) Adjust calls to handle_returns() and tags matching procedure
>> to react for possible activation deactivation of workers.
>>
>> Fixes: 775003ad2f96 ("distributor: add new burst-capable library")
>> Cc: david.hunt@intel.com
>> Cc: stable@dpdk.org
>>
>> Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
>> ---
>
>
> Hi Lukasz,
Hi David,
Many thanks for your review.
>
>    I spent the most amount of time going through this particular 
> patch, and it looks good to me (even the bit where 
> rte_distributor_process is called recursively) :)
That's the same trick that was used in the legacy single version. :)
>
> I'll try and get some time to run through some more testing, but for now:
>
> Acked-by: David Hunt <david.hunt@intel.com>
Thanks and if you'll run the test, please take a look at the 
performance. I think it has dropped because of these additional 
synchronizations and actions on activation/deactivation.

However the quality has increased much. With v5 version , I ran tests 
over 100000 times and didn't get a single failure!

Let me know about your results.


Best regards

Lukasz

>
>
>
>
>
  
Hunt, David Oct. 9, 2020, 12:13 p.m. UTC | #3
On 8/10/2020 10:07 PM, Lukasz Wojciechowski wrote:
> W dniu 08.10.2020 o 16:26, David Hunt pisze:
>> On 8/10/2020 6:23 AM, Lukasz Wojciechowski wrote:
>>> The burst version of distributor implementation was missing proper
>>> handling of worker shutdown. A worker processing packets received
>>> from distributor can call rte_distributor_return_pkt() function
>>> informing distributor that it want no more packets. Further calls to
>>> rte_distributor_request_pkt() or rte_distributor_get_pkt() however
>>> should inform distributor that new packets are requested again.
>>>
>>> Lack of the proper implementation has caused that even after worker
>>> informed about returning last packets, new packets were still sent
>>> from distributor causing deadlocks as no one could get them on worker
>>> side.
>>>
>>> This patch adds handling shutdown of the worker in following way:
>>> 1) It fixes usage of RTE_DISTRIB_VALID_BUF handshake flag. This flag
>>> was formerly unused in burst implementation and now it is used
>>> for marking valid packets in retptr64 replacing invalid use
>>> of RTE_DISTRIB_RETURN_BUF flag.
>>> 2) Uses RTE_DISTRIB_RETURN_BUF as a worker to distributor handshake
>>> in retptr64 to indicate that worker has shutdown.
>>> 3) Worker that shuts down blocks also bufptr for itself with
>>> RTE_DISTRIB_RETURN_BUF flag allowing distributor to retrieve any
>>> in flight packets.
>>> 4) When distributor receives information about shutdown of a worker,
>>> it: marks worker as not active; retrieves any in flight and backlog
>>> packets and process them to different workers; unlocks bufptr64
>>> by clearing RTE_DISTRIB_RETURN_BUF flag and allowing use in
>>> the future if worker requests any new packages.
>>> 5) Do not allow to: send or add to backlog any packets for not
>>> active workers. Such workers are also ignored if matched.
>>> 6) Adjust calls to handle_returns() and tags matching procedure
>>> to react for possible activation deactivation of workers.
>>>
>>> Fixes: 775003ad2f96 ("distributor: add new burst-capable library")
>>> Cc: david.hunt@intel.com
>>> Cc: stable@dpdk.org
>>>
>>> Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
>>> ---
>>
>> Hi Lukasz,
> Hi David,
> Many thanks for your review.
>>     I spent the most amount of time going through this particular
>> patch, and it looks good to me (even the bit where
>> rte_distributor_process is called recursively) :)
> That's the same trick that was used in the legacy single version. :)
>> I'll try and get some time to run through some more testing, but for now:
>>
>> Acked-by: David Hunt <david.hunt@intel.com>
> Thanks and if you'll run the test, please take a look at the
> performance. I think it has dropped because of these additional
> synchronizations and actions on activation/deactivation.
>
> However the quality has increased much. With v5 version , I ran tests
> over 100000 times and didn't get a single failure!
>
> Let me know about your results.
>

Going back through the patch set and running performance on each one, I 
see a 10% drop in performance at patch 2 in the series, which adds an 
extra handle_returns() call in the busy loop. Which avoids possible 
deadlock.

I played around with that patch for a while, only calling 
handle_returns() every x times aroudn the loop, but the performance was 
worse again, probably because of the extra branch I added.

However, it's more important to have stable performance than so it's 
still a good idea to have that fix applied, IMO.

Maybe we can get back some lost performance in future optimisation patches.

Thanks,
Dave.
  
Lukasz Wojciechowski Oct. 9, 2020, 8:43 p.m. UTC | #4
W dniu 09.10.2020 o 14:13, David Hunt pisze:
>
> On 8/10/2020 10:07 PM, Lukasz Wojciechowski wrote:
>> W dniu 08.10.2020 o 16:26, David Hunt pisze:
>>> On 8/10/2020 6:23 AM, Lukasz Wojciechowski wrote:
>>>> The burst version of distributor implementation was missing proper
>>>> handling of worker shutdown. A worker processing packets received
>>>> from distributor can call rte_distributor_return_pkt() function
>>>> informing distributor that it want no more packets. Further calls to
>>>> rte_distributor_request_pkt() or rte_distributor_get_pkt() however
>>>> should inform distributor that new packets are requested again.
>>>>
>>>> Lack of the proper implementation has caused that even after worker
>>>> informed about returning last packets, new packets were still sent
>>>> from distributor causing deadlocks as no one could get them on worker
>>>> side.
>>>>
>>>> This patch adds handling shutdown of the worker in following way:
>>>> 1) It fixes usage of RTE_DISTRIB_VALID_BUF handshake flag. This flag
>>>> was formerly unused in burst implementation and now it is used
>>>> for marking valid packets in retptr64 replacing invalid use
>>>> of RTE_DISTRIB_RETURN_BUF flag.
>>>> 2) Uses RTE_DISTRIB_RETURN_BUF as a worker to distributor handshake
>>>> in retptr64 to indicate that worker has shutdown.
>>>> 3) Worker that shuts down blocks also bufptr for itself with
>>>> RTE_DISTRIB_RETURN_BUF flag allowing distributor to retrieve any
>>>> in flight packets.
>>>> 4) When distributor receives information about shutdown of a worker,
>>>> it: marks worker as not active; retrieves any in flight and backlog
>>>> packets and process them to different workers; unlocks bufptr64
>>>> by clearing RTE_DISTRIB_RETURN_BUF flag and allowing use in
>>>> the future if worker requests any new packages.
>>>> 5) Do not allow to: send or add to backlog any packets for not
>>>> active workers. Such workers are also ignored if matched.
>>>> 6) Adjust calls to handle_returns() and tags matching procedure
>>>> to react for possible activation deactivation of workers.
>>>>
>>>> Fixes: 775003ad2f96 ("distributor: add new burst-capable library")
>>>> Cc: david.hunt@intel.com
>>>> Cc: stable@dpdk.org
>>>>
>>>> Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
>>>> ---
>>>
>>> Hi Lukasz,
>> Hi David,
>> Many thanks for your review.
>>>     I spent the most amount of time going through this particular
>>> patch, and it looks good to me (even the bit where
>>> rte_distributor_process is called recursively) :)
>> That's the same trick that was used in the legacy single version. :)
>>> I'll try and get some time to run through some more testing, but for 
>>> now:
>>>
>>> Acked-by: David Hunt <david.hunt@intel.com>
>> Thanks and if you'll run the test, please take a look at the
>> performance. I think it has dropped because of these additional
>> synchronizations and actions on activation/deactivation.
>>
>> However the quality has increased much. With v5 version , I ran tests
>> over 100000 times and didn't get a single failure!
>>
>> Let me know about your results.
>>
>
> Going back through the patch set and running performance on each one, 
> I see a 10% drop in performance at patch 2 in the series, which adds 
> an extra handle_returns() call in the busy loop. Which avoids possible 
> deadlock.
>
> I played around with that patch for a while, only calling 
> handle_returns() every x times aroudn the loop, but the performance 
> was worse again, probably because of the extra branch I added.
>
> However, it's more important to have stable performance than so it's 
> still a good idea to have that fix applied, IMO.
I agree
>
> Maybe we can get back some lost performance in future optimisation 
> patches.
That would be really nice. If i have some time, I would like to try some 
ideas I came with during work in the series.
>
> Thanks,
> Dave.
>
>
>
>
  

Patch

diff --git a/lib/librte_distributor/distributor_private.h b/lib/librte_distributor/distributor_private.h
index 489aef2ac..689fe3e18 100644
--- a/lib/librte_distributor/distributor_private.h
+++ b/lib/librte_distributor/distributor_private.h
@@ -155,6 +155,9 @@  struct rte_distributor {
 	enum rte_distributor_match_function dist_match_fn;
 
 	struct rte_distributor_single *d_single;
+
+	uint8_t active[RTE_DISTRIB_MAX_WORKERS];
+	uint8_t activesum;
 };
 
 void
diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
index b720abe03..115443fc0 100644
--- a/lib/librte_distributor/rte_distributor.c
+++ b/lib/librte_distributor/rte_distributor.c
@@ -51,7 +51,7 @@  rte_distributor_request_pkt(struct rte_distributor *d,
 	 * Sync with worker on GET_BUF flag.
 	 */
 	while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE)
-			& RTE_DISTRIB_GET_BUF)) {
+			& (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) {
 		rte_pause();
 		uint64_t t = rte_rdtsc()+100;
 
@@ -67,11 +67,11 @@  rte_distributor_request_pkt(struct rte_distributor *d,
 	for (i = count; i < RTE_DIST_BURST_SIZE; i++)
 		buf->retptr64[i] = 0;
 
-	/* Set Return bit for each packet returned */
+	/* Set VALID_BUF bit for each packet returned */
 	for (i = count; i-- > 0; )
 		buf->retptr64[i] =
 			(((int64_t)(uintptr_t)(oldpkt[i])) <<
-			RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
+			RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF;
 
 	/*
 	 * Finally, set the GET_BUF  to signal to distributor that cache
@@ -97,11 +97,13 @@  rte_distributor_poll_pkt(struct rte_distributor *d,
 		return (pkts[0]) ? 1 : 0;
 	}
 
-	/* If bit is set, return
+	/* If any of below bits is set, return.
+	 * GET_BUF is set when distributor hasn't sent any packets yet
+	 * RETURN_BUF is set when distributor must retrieve in-flight packets
 	 * Sync with distributor to acquire bufptrs
 	 */
 	if (__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)
-		& RTE_DISTRIB_GET_BUF)
+		& (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))
 		return -1;
 
 	/* since bufptr64 is signed, this should be an arithmetic shift */
@@ -113,7 +115,7 @@  rte_distributor_poll_pkt(struct rte_distributor *d,
 	}
 
 	/*
-	 * so now we've got the contents of the cacheline into an  array of
+	 * so now we've got the contents of the cacheline into an array of
 	 * mbuf pointers, so toggle the bit so scheduler can start working
 	 * on the next cacheline while we're working.
 	 * Sync with distributor on GET_BUF flag. Release bufptrs.
@@ -175,7 +177,7 @@  rte_distributor_return_pkt(struct rte_distributor *d,
 	 * Sync with worker on GET_BUF flag.
 	 */
 	while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE)
-			& RTE_DISTRIB_GET_BUF)) {
+			& (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) {
 		rte_pause();
 		uint64_t t = rte_rdtsc()+100;
 
@@ -187,17 +189,25 @@  rte_distributor_return_pkt(struct rte_distributor *d,
 	__atomic_thread_fence(__ATOMIC_ACQUIRE);
 	for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
 		/* Switch off the return bit first */
-		buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
+		buf->retptr64[i] = 0;
 
 	for (i = num; i-- > 0; )
 		buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) <<
-			RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
+			RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF;
+
+	/* Use RETURN_BUF on bufptr64 to notify distributor that
+	 * we won't read any mbufs from there even if GET_BUF is set.
+	 * This allows distributor to retrieve in-flight already sent packets.
+	 */
+	__atomic_or_fetch(&(buf->bufptr64[0]), RTE_DISTRIB_RETURN_BUF,
+		__ATOMIC_ACQ_REL);
 
-	/* set the GET_BUF but even if we got no returns.
-	 * Sync with distributor on GET_BUF flag. Release retptrs.
+	/* set the RETURN_BUF on retptr64 even if we got no returns.
+	 * Sync with distributor on RETURN_BUF flag. Release retptrs.
+	 * Notify distributor that we don't request more packets any more.
 	 */
 	__atomic_store_n(&(buf->retptr64[0]),
-		buf->retptr64[0] | RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);
+		buf->retptr64[0] | RTE_DISTRIB_RETURN_BUF, __ATOMIC_RELEASE);
 
 	return 0;
 }
@@ -267,6 +277,59 @@  find_match_scalar(struct rte_distributor *d,
 	 */
 }
 
+/*
+ * When worker called rte_distributor_return_pkt()
+ * and passed RTE_DISTRIB_RETURN_BUF handshake through retptr64,
+ * distributor must retrieve both inflight and backlog packets assigned
+ * to the worker and reprocess them to another worker.
+ */
+static void
+handle_worker_shutdown(struct rte_distributor *d, unsigned int wkr)
+{
+	struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
+	/* double BURST size for storing both inflights and backlog */
+	struct rte_mbuf *pkts[RTE_DIST_BURST_SIZE * 2];
+	unsigned int pkts_count = 0;
+	unsigned int i;
+
+	/* If GET_BUF is cleared there are in-flight packets sent
+	 * to worker which does not require new packets.
+	 * They must be retrieved and assigned to another worker.
+	 */
+	if (!(__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)
+		& RTE_DISTRIB_GET_BUF))
+		for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
+			if (buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)
+				pkts[pkts_count++] = (void *)((uintptr_t)
+					(buf->bufptr64[i]
+						>> RTE_DISTRIB_FLAG_BITS));
+
+	/* Make following operations on handshake flags on bufptr64:
+	 * - set GET_BUF to indicate that distributor can overwrite buffer
+	 *     with new packets if worker will make a new request.
+	 * - clear RETURN_BUF to unlock reads on worker side.
+	 */
+	__atomic_store_n(&(buf->bufptr64[0]), RTE_DISTRIB_GET_BUF,
+		__ATOMIC_RELEASE);
+
+	/* Collect backlog packets from worker */
+	for (i = 0; i < d->backlog[wkr].count; i++)
+		pkts[pkts_count++] = (void *)((uintptr_t)
+			(d->backlog[wkr].pkts[i] >> RTE_DISTRIB_FLAG_BITS));
+
+	d->backlog[wkr].count = 0;
+
+	/* Clear both inflight and backlog tags */
+	for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
+		d->in_flight_tags[wkr][i] = 0;
+		d->backlog[wkr].tags[i] = 0;
+	}
+
+	/* Recursive call */
+	if (pkts_count > 0)
+		rte_distributor_process(d, pkts, pkts_count);
+}
+
 
 /*
  * When the handshake bits indicate that there are packets coming
@@ -285,19 +348,33 @@  handle_returns(struct rte_distributor *d, unsigned int wkr)
 
 	/* Sync on GET_BUF flag. Acquire retptrs. */
 	if (__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_ACQUIRE)
-		& RTE_DISTRIB_GET_BUF) {
+		& (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF)) {
 		for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
-			if (buf->retptr64[i] & RTE_DISTRIB_RETURN_BUF) {
+			if (buf->retptr64[i] & RTE_DISTRIB_VALID_BUF) {
 				oldbuf = ((uintptr_t)(buf->retptr64[i] >>
 					RTE_DISTRIB_FLAG_BITS));
 				/* store returns in a circular buffer */
 				store_return(oldbuf, d, &ret_start, &ret_count);
 				count++;
-				buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
+				buf->retptr64[i] &= ~RTE_DISTRIB_VALID_BUF;
 			}
 		}
 		d->returns.start = ret_start;
 		d->returns.count = ret_count;
+
+		/* If worker requested packets with GET_BUF, set it to active
+		 * otherwise (RETURN_BUF), set it to not active.
+		 */
+		d->activesum -= d->active[wkr];
+		d->active[wkr] = !!(buf->retptr64[0] & RTE_DISTRIB_GET_BUF);
+		d->activesum += d->active[wkr];
+
+		/* If worker returned packets without requesting new ones,
+		 * handle all in-flights and backlog packets assigned to it.
+		 */
+		if (unlikely(buf->retptr64[0] & RTE_DISTRIB_RETURN_BUF))
+			handle_worker_shutdown(d, wkr);
+
 		/* Clear for the worker to populate with more returns.
 		 * Sync with distributor on GET_BUF flag. Release retptrs.
 		 */
@@ -322,11 +399,15 @@  release(struct rte_distributor *d, unsigned int wkr)
 	unsigned int i;
 
 	handle_returns(d, wkr);
+	if (unlikely(!d->active[wkr]))
+		return 0;
 
 	/* Sync with worker on GET_BUF flag */
 	while (!(__atomic_load_n(&(d->bufs[wkr].bufptr64[0]), __ATOMIC_ACQUIRE)
 		& RTE_DISTRIB_GET_BUF)) {
 		handle_returns(d, wkr);
+		if (unlikely(!d->active[wkr]))
+			return 0;
 		rte_pause();
 	}
 
@@ -366,7 +447,7 @@  rte_distributor_process(struct rte_distributor *d,
 	int64_t next_value = 0;
 	uint16_t new_tag = 0;
 	uint16_t flows[RTE_DIST_BURST_SIZE] __rte_cache_aligned;
-	unsigned int i, j, w, wid;
+	unsigned int i, j, w, wid, matching_required;
 
 	if (d->alg_type == RTE_DIST_ALG_SINGLE) {
 		/* Call the old API */
@@ -374,11 +455,13 @@  rte_distributor_process(struct rte_distributor *d,
 			mbufs, num_mbufs);
 	}
 
+	for (wid = 0 ; wid < d->num_workers; wid++)
+		handle_returns(d, wid);
+
 	if (unlikely(num_mbufs == 0)) {
 		/* Flush out all non-full cache-lines to workers. */
 		for (wid = 0 ; wid < d->num_workers; wid++) {
 			/* Sync with worker on GET_BUF flag. */
-			handle_returns(d, wid);
 			if (__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
 				__ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF) {
 				release(d, wid);
@@ -388,6 +471,9 @@  rte_distributor_process(struct rte_distributor *d,
 		return 0;
 	}
 
+	if (unlikely(!d->activesum))
+		return 0;
+
 	while (next_idx < num_mbufs) {
 		uint16_t matches[RTE_DIST_BURST_SIZE];
 		unsigned int pkts;
@@ -412,22 +498,30 @@  rte_distributor_process(struct rte_distributor *d,
 		for (; i < RTE_DIST_BURST_SIZE; i++)
 			flows[i] = 0;
 
-		switch (d->dist_match_fn) {
-		case RTE_DIST_MATCH_VECTOR:
-			find_match_vec(d, &flows[0], &matches[0]);
-			break;
-		default:
-			find_match_scalar(d, &flows[0], &matches[0]);
-		}
+		matching_required = 1;
 
+		for (j = 0; j < pkts; j++) {
+			if (unlikely(!d->activesum))
+				return next_idx;
+
+			if (unlikely(matching_required)) {
+				switch (d->dist_match_fn) {
+				case RTE_DIST_MATCH_VECTOR:
+					find_match_vec(d, &flows[0],
+						&matches[0]);
+					break;
+				default:
+					find_match_scalar(d, &flows[0],
+						&matches[0]);
+				}
+				matching_required = 0;
+			}
 		/*
 		 * Matches array now contain the intended worker ID (+1) of
 		 * the incoming packets. Any zeroes need to be assigned
 		 * workers.
 		 */
 
-		for (j = 0; j < pkts; j++) {
-
 			next_mb = mbufs[next_idx++];
 			next_value = (((int64_t)(uintptr_t)next_mb) <<
 					RTE_DISTRIB_FLAG_BITS);
@@ -447,12 +541,18 @@  rte_distributor_process(struct rte_distributor *d,
 			 */
 			/* matches[j] = 0; */
 
-			if (matches[j]) {
+			if (matches[j] && d->active[matches[j]-1]) {
 				struct rte_distributor_backlog *bl =
 						&d->backlog[matches[j]-1];
 				if (unlikely(bl->count ==
 						RTE_DIST_BURST_SIZE)) {
 					release(d, matches[j]-1);
+					if (!d->active[matches[j]-1]) {
+						j--;
+						next_idx--;
+						matching_required = 1;
+						continue;
+					}
 				}
 
 				/* Add to worker that already has flow */
@@ -462,11 +562,21 @@  rte_distributor_process(struct rte_distributor *d,
 				bl->pkts[idx] = next_value;
 
 			} else {
-				struct rte_distributor_backlog *bl =
-						&d->backlog[wkr];
+				struct rte_distributor_backlog *bl;
+
+				while (unlikely(!d->active[wkr]))
+					wkr = (wkr + 1) % d->num_workers;
+				bl = &d->backlog[wkr];
+
 				if (unlikely(bl->count ==
 						RTE_DIST_BURST_SIZE)) {
 					release(d, wkr);
+					if (!d->active[wkr]) {
+						j--;
+						next_idx--;
+						matching_required = 1;
+						continue;
+					}
 				}
 
 				/* Add to current worker worker */
@@ -485,9 +595,7 @@  rte_distributor_process(struct rte_distributor *d,
 						matches[w] = wkr+1;
 			}
 		}
-		wkr++;
-		if (wkr >= d->num_workers)
-			wkr = 0;
+		wkr = (wkr + 1) % d->num_workers;
 	}
 
 	/* Flush out all non-full cache-lines to workers. */
@@ -663,6 +771,9 @@  rte_distributor_create(const char *name,
 	for (i = 0 ; i < num_workers ; i++)
 		d->backlog[i].tags = &d->in_flight_tags[i][RTE_DIST_BURST_SIZE];
 
+	memset(d->active, 0, sizeof(d->active));
+	d->activesum = 0;
+
 	dist_burst_list = RTE_TAILQ_CAST(rte_dist_burst_tailq.head,
 					  rte_dist_burst_list);