ring: fix c11 memory ordering issue

Message ID 20180806011805.7857-1-gavin.hu@arm.com (mailing list archive)
State Superseded, archived
Headers
Series ring: fix c11 memory ordering issue |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Gavin Hu Aug. 6, 2018, 1:18 a.m. UTC
  1) In update_tail, read ht->tail using __atomic_load.
2) In __rte_ring_move_prod_head, move the __atomic_load_n up and out of
   the do {} while loop as upon failure the old_head will be updated,
   another load is not necessary.
3) Synchronize the load-acquires of prod.tail and cons.tail with store-
   releases of update_tail which releases all ring updates up to the
   value of ht->tail.
4) When calling __atomic_compare_exchange_n, relaxed ordering for both
   success and failure, as multiple threads can work independently on
   the same end of the ring (either enqueue or dequeue) without
   synchronization, not as operating on tail, which has to be finished
   in sequence.

Fixes: 39368ebfc6 ("ring: introduce C11 memory model barrier option")
Cc: stable@dpdk.org

Reviewed-by: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>
Reviewed-by: Steve Capper <steve.capper@arm.com>
Reviewed-by: Ola Liljedahl <Ola.Liljedahl@arm.com>
Signed-off-by: Gavin Hu <gavin.hu@arm.com>
---
 lib/librte_ring/rte_ring_c11_mem.h | 38 +++++++++++++++++++++++++-------------
 1 file changed, 25 insertions(+), 13 deletions(-)
  

Comments

Thomas Monjalon Aug. 6, 2018, 9:19 a.m. UTC | #1
Hi,

Please start your patch by explaining the issue
you are solving.
What is the consequence of the bug on the behaviour?
What is the scope of the bug? only PPC?

06/08/2018 03:18, Gavin Hu:
> 1) In update_tail, read ht->tail using __atomic_load.
> 2) In __rte_ring_move_prod_head, move the __atomic_load_n up and out of
>    the do {} while loop as upon failure the old_head will be updated,
>    another load is not necessary.
> 3) Synchronize the load-acquires of prod.tail and cons.tail with store-
>    releases of update_tail which releases all ring updates up to the
>    value of ht->tail.
> 4) When calling __atomic_compare_exchange_n, relaxed ordering for both
>    success and failure, as multiple threads can work independently on
>    the same end of the ring (either enqueue or dequeue) without
>    synchronization, not as operating on tail, which has to be finished
>    in sequence.
> 
> Fixes: 39368ebfc6 ("ring: introduce C11 memory model barrier option")
> Cc: stable@dpdk.org
> 
> Reviewed-by: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>
> Reviewed-by: Steve Capper <steve.capper@arm.com>
> Reviewed-by: Ola Liljedahl <Ola.Liljedahl@arm.com>
> Signed-off-by: Gavin Hu <gavin.hu@arm.com>

Your Signed-off-by should be first in the list.
These tags are added in the chronological order.
  
Gavin Hu Aug. 8, 2018, 1:39 a.m. UTC | #2
Hi Thomas,

I updated the commit message in my v2 patch.
Could you check if your questions get answered by the new commit message and this mail?

And what's your opinions of splitting the patch into 4 smaller ones(2 bug fixes and 2 optimizations)? I got this comment from Jia He, he is the author of this file.

>>What is the consequence of the bug on the behaviour?
Potential effects could be read of stale values.

>> What is the scope of the bug? only PPC?
The scope is all weakly ordered architectures such as ARM (32- and 64-bit) and POWER.


-----Original Message-----
From: Thomas Monjalon <thomas@monjalon.net>
Sent: 2018年8月6日 17:19
To: Gavin Hu <Gavin.Hu@arm.com>
Cc: dev@dpdk.org; Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>; Steve Capper <Steve.Capper@arm.com>; Ola Liljedahl <Ola.Liljedahl@arm.com>; jerin.jacob@caviumnetworks.com; hemant.agrawal@nxp.com; jia.he@hxt-semitech.com; stable@dpdk.org
Subject: Re: [dpdk-dev] [PATCH] ring: fix c11 memory ordering issue

Hi,

Please start your patch by explaining the issue you are solving.
What is the consequence of the bug on the behaviour?
What is the scope of the bug? only PPC?

06/08/2018 03:18, Gavin Hu:
> 1) In update_tail, read ht->tail using __atomic_load.
> 2) In __rte_ring_move_prod_head, move the __atomic_load_n up and out of
>    the do {} while loop as upon failure the old_head will be updated,
>    another load is not necessary.
> 3) Synchronize the load-acquires of prod.tail and cons.tail with store-
>    releases of update_tail which releases all ring updates up to the
>    value of ht->tail.
> 4) When calling __atomic_compare_exchange_n, relaxed ordering for both
>    success and failure, as multiple threads can work independently on
>    the same end of the ring (either enqueue or dequeue) without
>    synchronization, not as operating on tail, which has to be finished
>    in sequence.
>
> Fixes: 39368ebfc6 ("ring: introduce C11 memory model barrier option")
> Cc: stable@dpdk.org
>
> Reviewed-by: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>
> Reviewed-by: Steve Capper <steve.capper@arm.com>
> Reviewed-by: Ola Liljedahl <Ola.Liljedahl@arm.com>
> Signed-off-by: Gavin Hu <gavin.hu@arm.com>

Your Signed-off-by should be first in the list.
These tags are added in the chronological order.


IMPORTANT NOTICE: The contents of this email and any attachments are confidential and may also be privileged. If you are not the intended recipient, please notify the sender immediately and do not disclose the contents to any other person, use it for any purpose, or store or copy the information in any medium. Thank you.
  

Patch

diff --git a/lib/librte_ring/rte_ring_c11_mem.h b/lib/librte_ring/rte_ring_c11_mem.h
index 94df3c4a6..cfa3be4a7 100644
--- a/lib/librte_ring/rte_ring_c11_mem.h
+++ b/lib/librte_ring/rte_ring_c11_mem.h
@@ -21,7 +21,8 @@  update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
 	 * we need to wait for them to complete
 	 */
 	if (!single)
-		while (unlikely(ht->tail != old_val))
+		while (unlikely(old_val != __atomic_load_n(&ht->tail,
+						__ATOMIC_RELAXED)))
 			rte_pause();
 
 	__atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);
@@ -60,20 +61,24 @@  __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
 	unsigned int max = n;
 	int success;
 
+	*old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED);
 	do {
 		/* Reset n to the initial burst count */
 		n = max;
 
-		*old_head = __atomic_load_n(&r->prod.head,
-					__ATOMIC_ACQUIRE);
 
-		/*
-		 *  The subtraction is done between two unsigned 32bits value
+		/* load-acquire synchronize with store-release of ht->tail
+		 * in update_tail.
+		 */
+		const uint32_t cons_tail = __atomic_load_n(&r->cons.tail,
+							__ATOMIC_ACQUIRE);
+
+		/* The subtraction is done between two unsigned 32bits value
 		 * (the result is always modulo 32 bits even if we have
 		 * *old_head > cons_tail). So 'free_entries' is always between 0
 		 * and capacity (which is < size).
 		 */
-		*free_entries = (capacity + r->cons.tail - *old_head);
+		*free_entries = (capacity + cons_tail - *old_head);
 
 		/* check that we have enough room in ring */
 		if (unlikely(n > *free_entries))
@@ -87,9 +92,10 @@  __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
 		if (is_sp)
 			r->prod.head = *new_head, success = 1;
 		else
+			/* on failure, *old_head is updated */
 			success = __atomic_compare_exchange_n(&r->prod.head,
 					old_head, *new_head,
-					0, __ATOMIC_ACQUIRE,
+					/*weak=*/0, __ATOMIC_RELAXED,
 					__ATOMIC_RELAXED);
 	} while (unlikely(success == 0));
 	return n;
@@ -128,18 +134,23 @@  __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
 	int success;
 
 	/* move cons.head atomically */
+	*old_head = __atomic_load_n(&r->cons.head, __ATOMIC_RELAXED);
 	do {
 		/* Restore n as it may change every loop */
 		n = max;
-		*old_head = __atomic_load_n(&r->cons.head,
-					__ATOMIC_ACQUIRE);
+
+		/* this load-acquire synchronize with store-release of ht->tail
+		 * in update_tail.
+		 */
+		const uint32_t prod_tail = __atomic_load_n(&r->prod.tail,
+							__ATOMIC_ACQUIRE);
 
 		/* The subtraction is done between two unsigned 32bits value
 		 * (the result is always modulo 32 bits even if we have
 		 * cons_head > prod_tail). So 'entries' is always between 0
 		 * and size(ring)-1.
 		 */
-		*entries = (r->prod.tail - *old_head);
+		*entries = (prod_tail - *old_head);
 
 		/* Set the actual entries for dequeue */
 		if (n > *entries)
@@ -152,10 +163,11 @@  __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
 		if (is_sc)
 			r->cons.head = *new_head, success = 1;
 		else
+			/* on failure, *old_head will be updated */
 			success = __atomic_compare_exchange_n(&r->cons.head,
-							old_head, *new_head,
-							0, __ATOMIC_ACQUIRE,
-							__ATOMIC_RELAXED);
+						old_head, *new_head,
+						/*weak=*/0, __ATOMIC_RELAXED,
+						__ATOMIC_RELAXED);
 	} while (unlikely(success == 0));
 	return n;
 }