Message ID | 20201017030701.16134-2-l.wojciechow@partner.samsung.com |
---|---|
State | Accepted, archived |
Delegated to: | David Marchand |
Headers | show |
Series |
|
Related | show |
Context | Check | Description |
---|---|---|
ci/checkpatch | success | coding style OK |
<snip> > > rte_distributor_return_pkt function which is run on worker cores must wait > for distributor core to clear handshake on retptr64 before using those > buffers. While the handshake is set distributor core controls buffers and any > operations on worker side might overwrite buffers which are unread yet. > Same situation appears in the legacy single distributor. Function > rte_distributor_return_pkt_single shouldn't modify the bufptr64 until > handshake on it is cleared by distributor lcore. > > Fixes: 775003ad2f96 ("distributor: add new burst-capable library") > Cc: david.hunt@intel.com > Cc: stable@dpdk.org > > Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com> > Acked-by: David Hunt <david.hunt@intel.com> Looks good. Reviewed-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com> > --- > lib/librte_distributor/rte_distributor.c | 12 ++++++++++++ > lib/librte_distributor/rte_distributor_single.c | 4 ++++ > 2 files changed, 16 insertions(+) > > diff --git a/lib/librte_distributor/rte_distributor.c > b/lib/librte_distributor/rte_distributor.c > index 1c047f065..c6b19a388 100644 > --- a/lib/librte_distributor/rte_distributor.c > +++ b/lib/librte_distributor/rte_distributor.c > @@ -169,6 +169,18 @@ rte_distributor_return_pkt(struct rte_distributor *d, > return -EINVAL; > } > > + /* Spin while handshake bits are set (scheduler clears it). > + * Sync with worker on GET_BUF flag. > + */ > + while (unlikely(__atomic_load_n(&(buf->retptr64[0]), > __ATOMIC_RELAXED) > + & RTE_DISTRIB_GET_BUF)) { > + rte_pause(); > + uint64_t t = rte_rdtsc()+100; > + > + while (rte_rdtsc() < t) > + rte_pause(); > + } > + > /* Sync with distributor to acquire retptrs */ > __atomic_thread_fence(__ATOMIC_ACQUIRE); > for (i = 0; i < RTE_DIST_BURST_SIZE; i++) diff --git > a/lib/librte_distributor/rte_distributor_single.c > b/lib/librte_distributor/rte_distributor_single.c > index abaf7730c..f4725b1d0 100644 > --- a/lib/librte_distributor/rte_distributor_single.c > +++ b/lib/librte_distributor/rte_distributor_single.c > @@ -74,6 +74,10 @@ rte_distributor_return_pkt_single(struct > rte_distributor_single *d, > union rte_distributor_buffer_single *buf = &d->bufs[worker_id]; > uint64_t req = (((int64_t)(uintptr_t)oldpkt) << > RTE_DISTRIB_FLAG_BITS) > | RTE_DISTRIB_RETURN_BUF; > + while (unlikely(__atomic_load_n(&buf->bufptr64, > __ATOMIC_RELAXED) > + & RTE_DISTRIB_FLAGS_MASK)) > + rte_pause(); > + > /* Sync with distributor on RETURN_BUF flag. */ > __atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE); > return 0; > -- > 2.17.1
diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c index 1c047f065..c6b19a388 100644 --- a/lib/librte_distributor/rte_distributor.c +++ b/lib/librte_distributor/rte_distributor.c @@ -169,6 +169,18 @@ rte_distributor_return_pkt(struct rte_distributor *d, return -EINVAL; } + /* Spin while handshake bits are set (scheduler clears it). + * Sync with worker on GET_BUF flag. + */ + while (unlikely(__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_RELAXED) + & RTE_DISTRIB_GET_BUF)) { + rte_pause(); + uint64_t t = rte_rdtsc()+100; + + while (rte_rdtsc() < t) + rte_pause(); + } + /* Sync with distributor to acquire retptrs */ __atomic_thread_fence(__ATOMIC_ACQUIRE); for (i = 0; i < RTE_DIST_BURST_SIZE; i++) diff --git a/lib/librte_distributor/rte_distributor_single.c b/lib/librte_distributor/rte_distributor_single.c index abaf7730c..f4725b1d0 100644 --- a/lib/librte_distributor/rte_distributor_single.c +++ b/lib/librte_distributor/rte_distributor_single.c @@ -74,6 +74,10 @@ rte_distributor_return_pkt_single(struct rte_distributor_single *d, union rte_distributor_buffer_single *buf = &d->bufs[worker_id]; uint64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF; + while (unlikely(__atomic_load_n(&buf->bufptr64, __ATOMIC_RELAXED) + & RTE_DISTRIB_FLAGS_MASK)) + rte_pause(); + /* Sync with distributor on RETURN_BUF flag. */ __atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE); return 0;