[v7,02/16] distributor: fix handshake deadlock

Message ID 20201010160508.19709-3-l.wojciechow@partner.samsung.com (mailing list archive)
State Superseded, archived
Delegated to: David Marchand
Headers
Series fix distributor synchronization issues |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Lukasz Wojciechowski Oct. 10, 2020, 4:04 p.m. UTC
  Synchronization of data exchange between distributor and worker cores
is based on 2 handshakes: retptr64 for returning mbufs from workers
to distributor and bufptr64 for passing mbufs to workers.

Without proper order of verifying those 2 handshakes a deadlock may
occur. This can happen when worker core wants to return back mbufs
and waits for retptr handshake to be cleared while distributor core
waits for bufptr to send mbufs to worker.

This can happen as worker core first returns mbufs to distributor
and later gets new mbufs, while distributor first releases mbufs
to worker and later handle returning packets.

This patch fixes possibility of the deadlock by always taking care
of returning packets first on the distributor side and handling
packets while waiting to release new.

Fixes: 775003ad2f96 ("distributor: add new burst-capable library")
Cc: david.hunt@intel.com
Cc: stable@dpdk.org

Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Acked-by: David Hunt <david.hunt@intel.com>
---
 lib/librte_distributor/rte_distributor.c | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
  

Patch

diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
index 89493c331..12b3db33c 100644
--- a/lib/librte_distributor/rte_distributor.c
+++ b/lib/librte_distributor/rte_distributor.c
@@ -321,12 +321,14 @@  release(struct rte_distributor *d, unsigned int wkr)
 	struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
 	unsigned int i;
 
+	handle_returns(d, wkr);
+
 	/* Sync with worker on GET_BUF flag */
 	while (!(__atomic_load_n(&(d->bufs[wkr].bufptr64[0]), __ATOMIC_ACQUIRE)
-		& RTE_DISTRIB_GET_BUF))
+		& RTE_DISTRIB_GET_BUF)) {
+		handle_returns(d, wkr);
 		rte_pause();
-
-	handle_returns(d, wkr);
+	}
 
 	buf->count = 0;
 
@@ -376,6 +378,7 @@  rte_distributor_process(struct rte_distributor *d,
 		/* Flush out all non-full cache-lines to workers. */
 		for (wid = 0 ; wid < d->num_workers; wid++) {
 			/* Sync with worker on GET_BUF flag. */
+			handle_returns(d, wid);
 			if (__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
 				__ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF) {
 				release(d, wid);