[v1] ring: enforce reading the tails before ring operations

Message ID 1551841661-42892-1-git-send-email-gavin.hu@arm.com (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers
Series [v1] ring: enforce reading the tails before ring operations |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/mellanox-Performance-Testing success Performance Testing PASS
ci/intel-Performance-Testing success Performance Testing PASS
ci/Intel-compilation success Compilation OK

Commit Message

Gavin Hu March 6, 2019, 3:07 a.m. UTC
  In weak memory models, like arm64, reading the {prod,cons}.tail may get
reordered after reading or writing the ring slots, which corrupts the ring
and stale data is observed.
This issue was reported by NXP on 8-A72 DPAA2 board. The problem is most
likely caused by missing the acquire semantics when reading cons.tail (in
SP enqueue) or prod.tail (in SC dequeue) which makes it possible to read a
stale value from the ring slots.  There must be a read fence before writing
or reading the ring slots, rte_atomic32_cmpset() provides the same ordering
for MP (and MC) case. This patch is to enforce this ordering for SP (and
SC) case.

Signed-off-by: gavin hu <gavin.hu@arm.com>
Reviewed-by: Ola Liljedahl <Ola.Liljedahl@arm.com>
Tested-by: Nipun Gupta <nipun.gupta@nxp.com>
---
 lib/librte_ring/rte_ring_generic.h | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)
  

Comments

Ilya Maximets March 6, 2019, 11:49 a.m. UTC | #1
On 06.03.2019 6:07, gavin hu wrote:
> In weak memory models, like arm64, reading the {prod,cons}.tail may get
> reordered after reading or writing the ring slots, which corrupts the ring
> and stale data is observed.
> This issue was reported by NXP on 8-A72 DPAA2 board. The problem is most
> likely caused by missing the acquire semantics when reading cons.tail (in
> SP enqueue) or prod.tail (in SC dequeue) which makes it possible to read a
> stale value from the ring slots.  There must be a read fence before writing

Sorry, but the phrase "There must be a read fence before writing" makes no sense.
Could you please rephrase or describe in details which reads you're trying to
keep in exact order?

> or reading the ring slots, rte_atomic32_cmpset() provides the same ordering
> for MP (and MC) case. This patch is to enforce this ordering for SP (and
> SC) case.
> 
> Signed-off-by: gavin hu <gavin.hu@arm.com>
> Reviewed-by: Ola Liljedahl <Ola.Liljedahl@arm.com>
> Tested-by: Nipun Gupta <nipun.gupta@nxp.com>
> ---
>  lib/librte_ring/rte_ring_generic.h | 16 ++++++++++------
>  1 file changed, 10 insertions(+), 6 deletions(-)
> 
> diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h
> index ea7dbe5..1bd3dfd 100644
> --- a/lib/librte_ring/rte_ring_generic.h
> +++ b/lib/librte_ring/rte_ring_generic.h
> @@ -90,9 +90,11 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
>  			return 0;
>  
>  		*new_head = *old_head + n;
> -		if (is_sp)
> -			r->prod.head = *new_head, success = 1;
> -		else
> +		if (is_sp) {
> +			r->prod.head = *new_head;
> +			rte_smp_rmb();
> +			success = 1;
> +		} else
>  			success = rte_atomic32_cmpset(&r->prod.head,
>  					*old_head, *new_head);
>  	} while (unlikely(success == 0));
> @@ -158,9 +160,11 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
>  			return 0;
>  
>  		*new_head = *old_head + n;
> -		if (is_sc)
> -			r->cons.head = *new_head, success = 1;
> -		else
> +		if (is_sc) {
> +			r->cons.head = *new_head;
> +			rte_smp_rmb();
> +			success = 1;
> +		} else
>  			success = rte_atomic32_cmpset(&r->cons.head, *old_head,
>  					*new_head);
>  	} while (unlikely(success == 0));
>
  
Gavin Hu March 7, 2019, 6:50 a.m. UTC | #2
Hi ilya,

Thanks for your comments, inline comments and new commit message in V2.

/Gavin

> -----Original Message-----
> From: Ilya Maximets <i.maximets@samsung.com>
> Sent: Wednesday, March 6, 2019 7:49 PM
> To: Gavin Hu (Arm Technology China) <Gavin.Hu@arm.com>;
> dev@dpdk.org
> Cc: nd <nd@arm.com>; thomas@monjalon.net; jerinj@marvell.com;
> hemant.agrawal@nxp.com; Nipun.gupta@nxp.com; Honnappa Nagarahalli
> <Honnappa.Nagarahalli@arm.com>; olivier.matz@6wind.com
> Subject: Re: [v1] ring: enforce reading the tails before ring operations
> 
> On 06.03.2019 6:07, gavin hu wrote:
> > In weak memory models, like arm64, reading the {prod,cons}.tail may get
> > reordered after reading or writing the ring slots, which corrupts the ring
> > and stale data is observed.
> > This issue was reported by NXP on 8-A72 DPAA2 board. The problem is
> most
> > likely caused by missing the acquire semantics when reading cons.tail (in
> > SP enqueue) or prod.tail (in SC dequeue) which makes it possible to read
> a
> > stale value from the ring slots.  There must be a read fence before
> writing
> 
> Sorry, but the phrase "There must be a read fence before writing" makes
> no sense.
> Could you please rephrase or describe in details which reads you're trying
> to
> keep in exact order?

This patch is to keep in exact order for reading the tails(to calculate the available
/free slots of ring when moving the heads) before reading or writing the ring slots.

I rephrased the commit message in V2, could you have a look? 

> > or reading the ring slots, rte_atomic32_cmpset() provides the same
> ordering
> > for MP (and MC) case. This patch is to enforce this ordering for SP (and
> > SC) case.
> >
> > Signed-off-by: gavin hu <gavin.hu@arm.com>
> > Reviewed-by: Ola Liljedahl <Ola.Liljedahl@arm.com>
> > Tested-by: Nipun Gupta <nipun.gupta@nxp.com>
> > ---
> >  lib/librte_ring/rte_ring_generic.h | 16 ++++++++++------
> >  1 file changed, 10 insertions(+), 6 deletions(-)
> >
> > diff --git a/lib/librte_ring/rte_ring_generic.h
> b/lib/librte_ring/rte_ring_generic.h
> > index ea7dbe5..1bd3dfd 100644
> > --- a/lib/librte_ring/rte_ring_generic.h
> > +++ b/lib/librte_ring/rte_ring_generic.h
> > @@ -90,9 +90,11 @@ __rte_ring_move_prod_head(struct rte_ring *r,
> unsigned int is_sp,
> >  			return 0;
> >
> >  		*new_head = *old_head + n;
> > -		if (is_sp)
> > -			r->prod.head = *new_head, success = 1;
> > -		else
> > +		if (is_sp) {
> > +			r->prod.head = *new_head;
> > +			rte_smp_rmb();
> > +			success = 1;
> > +		} else
> >  			success = rte_atomic32_cmpset(&r->prod.head,
> >  					*old_head, *new_head);
> >  	} while (unlikely(success == 0));
> > @@ -158,9 +160,11 @@ __rte_ring_move_cons_head(struct rte_ring *r,
> unsigned int is_sc,
> >  			return 0;
> >
> >  		*new_head = *old_head + n;
> > -		if (is_sc)
> > -			r->cons.head = *new_head, success = 1;
> > -		else
> > +		if (is_sc) {
> > +			r->cons.head = *new_head;
> > +			rte_smp_rmb();
> > +			success = 1;
> > +		} else
> >  			success = rte_atomic32_cmpset(&r->cons.head,
> *old_head,
> >  					*new_head);
> >  	} while (unlikely(success == 0));
> >
  

Patch

diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h
index ea7dbe5..1bd3dfd 100644
--- a/lib/librte_ring/rte_ring_generic.h
+++ b/lib/librte_ring/rte_ring_generic.h
@@ -90,9 +90,11 @@  __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
 			return 0;
 
 		*new_head = *old_head + n;
-		if (is_sp)
-			r->prod.head = *new_head, success = 1;
-		else
+		if (is_sp) {
+			r->prod.head = *new_head;
+			rte_smp_rmb();
+			success = 1;
+		} else
 			success = rte_atomic32_cmpset(&r->prod.head,
 					*old_head, *new_head);
 	} while (unlikely(success == 0));
@@ -158,9 +160,11 @@  __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
 			return 0;
 
 		*new_head = *old_head + n;
-		if (is_sc)
-			r->cons.head = *new_head, success = 1;
-		else
+		if (is_sc) {
+			r->cons.head = *new_head;
+			rte_smp_rmb();
+			success = 1;
+		} else
 			success = rte_atomic32_cmpset(&r->cons.head, *old_head,
 					*new_head);
 	} while (unlikely(success == 0));