[v2] ring: fix c11 memory ordering issue

Message ID 20180807031943.5331-1-gavin.hu@arm.com
State Superseded, archived
Delegated to: Thomas Monjalon
Headers show
Series
  • [v2] ring: fix c11 memory ordering issue
Related show

Checks

Context Check Description
ci/Intel-compilation success Compilation OK
ci/checkpatch success coding style OK

Commit Message

Gavin Hu (Arm Technology China) Aug. 7, 2018, 3:19 a.m.
This patch includes two bug fixes(#1 and 2) and two optimisations(#3
and #4).
1) In update_tail, read ht->tail using __atomic_load.Although the
   compiler currently seems to be doing the right thing even without
   _atomic_load, we don't want to give the compiler freedom to optimise
   what should be an atomic load, it should not be arbitarily moved
   around.
2) Synchronize the load-acquire of the tail and the store-release
   within update_tail, the store release ensures all the ring operations,
   engqueu or dequeue are seen by the observers as soon as they see
   the updated tail. The load-acquire is required for correctly compu-
   tate the free_entries or avail_entries, respectively for enqueue and
   dequeue operations, the data dependency is not reliable for ordering
   as the compiler might break it by saving to temporary values to boost
   performance.
3) In __rte_ring_move_prod_head, move the __atomic_load_n up and out of
   the do {} while loop as upon failure the old_head will be updated,
   another load is costy and not necessary.
4) When calling __atomic_compare_exchange_n, relaxed ordering for both
   success and failure, as multiple threads can work independently on
   the same end of the ring (either enqueue or dequeue) without
   synchronization, not as operating on tail, which has to be finished
   in sequence.

The patch was benchmarked with test/ring_perf_autotest and it decreases
the enqueue/dequeue latency by 5% ~ 24.6% with two lcores, the real gains
are dependent on the number of lcores, depth of the ring, SPSC or MPMC.
For 1 lcore, it also improves a little, about 3 ~ 4%.
It is a big improvement, in case of MPMC, with rings size of 32, it saves
latency up to (6.90-5.20)/6.90 = 24.6%.

Test result data:

SP/SC bulk enq/dequeue (size: 8): 13.19
MP/MC bulk enq/dequeue (size: 8): 25.79
SP/SC bulk enq/dequeue (size: 32): 3.85
MP/MC bulk enq/dequeue (size: 32): 6.90

SP/SC bulk enq/dequeue (size: 8): 12.05
MP/MC bulk enq/dequeue (size: 8): 23.06
SP/SC bulk enq/dequeue (size: 32): 3.62
MP/MC bulk enq/dequeue (size: 32): 5.20

Fixes: 39368ebfc6 ("ring: introduce C11 memory model barrier option")
Cc: stable@dpdk.org

Signed-off-by: Gavin Hu <gavin.hu@arm.com>
Reviewed-by: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>
Reviewed-by: Steve Capper <steve.capper@arm.com>
Reviewed-by: Ola Liljedahl <Ola.Liljedahl@arm.com>
---
 lib/librte_ring/rte_ring_c11_mem.h | 38 +++++++++++++++++++++++++-------------
 1 file changed, 25 insertions(+), 13 deletions(-)

Comments

He, Jia Aug. 7, 2018, 5:56 a.m. | #1
Hi Gavin
> -----Original Message-----
> From: Gavin Hu [mailto:gavin.hu@arm.com]
> Sent: 2018年8月7日 11:20
> To: dev@dpdk.org
> Cc: gavin.hu@arm.com; Honnappa.Nagarahalli@arm.com;
> steve.capper@arm.com; Ola.Liljedahl@arm.com;
> jerin.jacob@caviumnetworks.com; hemant.agrawal@nxp.com; He, Jia
> <jia.he@hxt-semitech.com>; stable@dpdk.org
> Subject: [PATCH v2] ring: fix c11 memory ordering issue
> 
> This patch includes two bug fixes(#1 and 2) and two optimisations(#3 and #4).

Maybe you need to split this into small parts.

> 1) In update_tail, read ht->tail using __atomic_load.Although the
>    compiler currently seems to be doing the right thing even without
>    _atomic_load, we don't want to give the compiler freedom to optimise
>    what should be an atomic load, it should not be arbitarily moved
>    around.
> 2) Synchronize the load-acquire of the tail and the store-release
>    within update_tail, the store release ensures all the ring operations,
>    engqueu or dequeue are seen by the observers as soon as they see
>    the updated tail. The load-acquire is required for correctly compu-
>    tate the free_entries or avail_entries, respectively for enqueue and
>    dequeue operations, the data dependency is not reliable for ordering
>    as the compiler might break it by saving to temporary values to boost
>    performance.

Could you describe the race condition in details?
e.g.
cpu 1			cpu2
code1
				code2

Cheers,
Jia
> 3) In __rte_ring_move_prod_head, move the __atomic_load_n up and out of
>    the do {} while loop as upon failure the old_head will be updated,
>    another load is costy and not necessary.
> 4) When calling __atomic_compare_exchange_n, relaxed ordering for both
>    success and failure, as multiple threads can work independently on
>    the same end of the ring (either enqueue or dequeue) without
>    synchronization, not as operating on tail, which has to be finished
>    in sequence.
> 
> The patch was benchmarked with test/ring_perf_autotest and it decreases the
> enqueue/dequeue latency by 5% ~ 24.6% with two lcores, the real gains are
> dependent on the number of lcores, depth of the ring, SPSC or MPMC.
> For 1 lcore, it also improves a little, about 3 ~ 4%.
> It is a big improvement, in case of MPMC, with rings size of 32, it saves latency up
> to (6.90-5.20)/6.90 = 24.6%.
> 
> Test result data:
> 
> SP/SC bulk enq/dequeue (size: 8): 13.19
> MP/MC bulk enq/dequeue (size: 8): 25.79
> SP/SC bulk enq/dequeue (size: 32): 3.85
> MP/MC bulk enq/dequeue (size: 32): 6.90
> 
> SP/SC bulk enq/dequeue (size: 8): 12.05
> MP/MC bulk enq/dequeue (size: 8): 23.06
> SP/SC bulk enq/dequeue (size: 32): 3.62
> MP/MC bulk enq/dequeue (size: 32): 5.20
> 
> Fixes: 39368ebfc6 ("ring: introduce C11 memory model barrier option")
> Cc: stable@dpdk.org
> 
> Signed-off-by: Gavin Hu <gavin.hu@arm.com>
> Reviewed-by: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>
> Reviewed-by: Steve Capper <steve.capper@arm.com>
> Reviewed-by: Ola Liljedahl <Ola.Liljedahl@arm.com>
> ---
>  lib/librte_ring/rte_ring_c11_mem.h | 38
> +++++++++++++++++++++++++-------------
>  1 file changed, 25 insertions(+), 13 deletions(-)
> 
> diff --git a/lib/librte_ring/rte_ring_c11_mem.h
> b/lib/librte_ring/rte_ring_c11_mem.h
> index 94df3c4a6..cfa3be4a7 100644
> --- a/lib/librte_ring/rte_ring_c11_mem.h
> +++ b/lib/librte_ring/rte_ring_c11_mem.h
> @@ -21,7 +21,8 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
> uint32_t new_val,
>  	 * we need to wait for them to complete
>  	 */
>  	if (!single)
> -		while (unlikely(ht->tail != old_val))
> +		while (unlikely(old_val != __atomic_load_n(&ht->tail,
> +						__ATOMIC_RELAXED)))
>  			rte_pause();
> 
>  	__atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE); @@ -60,20
> +61,24 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
>  	unsigned int max = n;
>  	int success;
> 
> +	*old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED);
>  	do {
>  		/* Reset n to the initial burst count */
>  		n = max;
> 
> -		*old_head = __atomic_load_n(&r->prod.head,
> -					__ATOMIC_ACQUIRE);
> 
> -		/*
> -		 *  The subtraction is done between two unsigned 32bits value
> +		/* load-acquire synchronize with store-release of ht->tail
> +		 * in update_tail.
> +		 */
> +		const uint32_t cons_tail = __atomic_load_n(&r->cons.tail,
> +							__ATOMIC_ACQUIRE);
> +
> +		/* The subtraction is done between two unsigned 32bits value
>  		 * (the result is always modulo 32 bits even if we have
>  		 * *old_head > cons_tail). So 'free_entries' is always between 0
>  		 * and capacity (which is < size).
>  		 */
> -		*free_entries = (capacity + r->cons.tail - *old_head);
> +		*free_entries = (capacity + cons_tail - *old_head);
> 
>  		/* check that we have enough room in ring */
>  		if (unlikely(n > *free_entries))
> @@ -87,9 +92,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned
> int is_sp,
>  		if (is_sp)
>  			r->prod.head = *new_head, success = 1;
>  		else
> +			/* on failure, *old_head is updated */
>  			success = __atomic_compare_exchange_n(&r->prod.head,
>  					old_head, *new_head,
> -					0, __ATOMIC_ACQUIRE,
> +					/*weak=*/0, __ATOMIC_RELAXED,
>  					__ATOMIC_RELAXED);
>  	} while (unlikely(success == 0));
>  	return n;
> @@ -128,18 +134,23 @@ __rte_ring_move_cons_head(struct rte_ring *r, int
> is_sc,
>  	int success;
> 
>  	/* move cons.head atomically */
> +	*old_head = __atomic_load_n(&r->cons.head, __ATOMIC_RELAXED);
>  	do {
>  		/* Restore n as it may change every loop */
>  		n = max;
> -		*old_head = __atomic_load_n(&r->cons.head,
> -					__ATOMIC_ACQUIRE);
> +
> +		/* this load-acquire synchronize with store-release of ht->tail
> +		 * in update_tail.
> +		 */
> +		const uint32_t prod_tail = __atomic_load_n(&r->prod.tail,
> +							__ATOMIC_ACQUIRE);
> 
>  		/* The subtraction is done between two unsigned 32bits value
>  		 * (the result is always modulo 32 bits even if we have
>  		 * cons_head > prod_tail). So 'entries' is always between 0
>  		 * and size(ring)-1.
>  		 */
> -		*entries = (r->prod.tail - *old_head);
> +		*entries = (prod_tail - *old_head);
> 
>  		/* Set the actual entries for dequeue */
>  		if (n > *entries)
> @@ -152,10 +163,11 @@ __rte_ring_move_cons_head(struct rte_ring *r, int
> is_sc,
>  		if (is_sc)
>  			r->cons.head = *new_head, success = 1;
>  		else
> +			/* on failure, *old_head will be updated */
>  			success = __atomic_compare_exchange_n(&r->cons.head,
> -							old_head, *new_head,
> -							0, __ATOMIC_ACQUIRE,
> -							__ATOMIC_RELAXED);
> +						old_head, *new_head,
> +						/*weak=*/0, __ATOMIC_RELAXED,
> +						__ATOMIC_RELAXED);
>  	} while (unlikely(success == 0));
>  	return n;
>  }
> --
> 2.11.0
Gavin Hu (Arm Technology China) Aug. 7, 2018, 7:56 a.m. | #2
Hi Jia,

Thanks for your feedback, let's see if there are requests from others to split the fix.

The is a race condition between updating the tail and getting free/avail_entries, which is dependent on the tails.
Which should be synchronized by load-acquire and store-release. In simple words below, step #1 and #5 should be synchronized with each other, mutually, otherwise the free_/avail_entries calculation possibly get wrong.

On each locre, either enque or deque, step #1 and #2 order should be maintained as #2 has dependency on #1,
That's why Acquire ordering is necessary.

Please raise new questions if I don't get across this clearly.

Ring enqueue / lcore #0                       Ring deque / lcore #1
1. load-acquire prod_tail                      1. Load-acquire cons_tail
2. get free_entries                                 2. Get avail_entries
3. move prod_head accordingly          3. Move cons_head accordingly
4. do enqueue operations                    4. Do dequeue operations
5. store-release prod_tail                     5. Store-release cons_tail

Best Regards,
Gavin
-----Original Message-----
From: He, Jia <jia.he@hxt-semitech.com>
Sent: Tuesday, August 7, 2018 1:57 PM
To: Gavin Hu <Gavin.Hu@arm.com>; dev@dpdk.org
Cc: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>; Steve Capper <Steve.Capper@arm.com>; Ola Liljedahl <Ola.Liljedahl@arm.com>; jerin.jacob@caviumnetworks.com; hemant.agrawal@nxp.com; stable@dpdk.org
Subject: RE: [PATCH v2] ring: fix c11 memory ordering issue

Hi Gavin
> -----Original Message-----
> From: Gavin Hu [mailto:gavin.hu@arm.com]
> Sent: 2018年8月7日 11:20
> To: dev@dpdk.org
> Cc: gavin.hu@arm.com; Honnappa.Nagarahalli@arm.com;
> steve.capper@arm.com; Ola.Liljedahl@arm.com;
> jerin.jacob@caviumnetworks.com; hemant.agrawal@nxp.com; He, Jia
> <jia.he@hxt-semitech.com>; stable@dpdk.org
> Subject: [PATCH v2] ring: fix c11 memory ordering issue
>
> This patch includes two bug fixes(#1 and 2) and two optimisations(#3 and #4).

Maybe you need to split this into small parts.

> 1) In update_tail, read ht->tail using __atomic_load.Although the
>    compiler currently seems to be doing the right thing even without
>    _atomic_load, we don't want to give the compiler freedom to optimise
>    what should be an atomic load, it should not be arbitarily moved
>    around.
> 2) Synchronize the load-acquire of the tail and the store-release
>    within update_tail, the store release ensures all the ring operations,
>    engqueu or dequeue are seen by the observers as soon as they see
>    the updated tail. The load-acquire is required for correctly compu-
>    tate the free_entries or avail_entries, respectively for enqueue and
>    dequeue operations, the data dependency is not reliable for ordering
>    as the compiler might break it by saving to temporary values to boost
>    performance.

Could you describe the race condition in details?
e.g.
cpu 1cpu2
code1
code2

Cheers,
Jia
> 3) In __rte_ring_move_prod_head, move the __atomic_load_n up and out of
>    the do {} while loop as upon failure the old_head will be updated,
>    another load is costy and not necessary.
> 4) When calling __atomic_compare_exchange_n, relaxed ordering for both
>    success and failure, as multiple threads can work independently on
>    the same end of the ring (either enqueue or dequeue) without
>    synchronization, not as operating on tail, which has to be finished
>    in sequence.
>
> The patch was benchmarked with test/ring_perf_autotest and it
> decreases the enqueue/dequeue latency by 5% ~ 24.6% with two lcores,
> the real gains are dependent on the number of lcores, depth of the ring, SPSC or MPMC.
> For 1 lcore, it also improves a little, about 3 ~ 4%.
> It is a big improvement, in case of MPMC, with rings size of 32, it
> saves latency up to (6.90-5.20)/6.90 = 24.6%.
>
> Test result data:
>
> SP/SC bulk enq/dequeue (size: 8): 13.19 MP/MC bulk enq/dequeue (size:
> 8): 25.79 SP/SC bulk enq/dequeue (size: 32): 3.85 MP/MC bulk
> enq/dequeue (size: 32): 6.90
>
> SP/SC bulk enq/dequeue (size: 8): 12.05 MP/MC bulk enq/dequeue (size:
> 8): 23.06 SP/SC bulk enq/dequeue (size: 32): 3.62 MP/MC bulk
> enq/dequeue (size: 32): 5.20
>
> Fixes: 39368ebfc6 ("ring: introduce C11 memory model barrier option")
> Cc: stable@dpdk.org
>
> Signed-off-by: Gavin Hu <gavin.hu@arm.com>
> Reviewed-by: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>
> Reviewed-by: Steve Capper <steve.capper@arm.com>
> Reviewed-by: Ola Liljedahl <Ola.Liljedahl@arm.com>
> ---
>  lib/librte_ring/rte_ring_c11_mem.h | 38
> +++++++++++++++++++++++++-------------
>  1 file changed, 25 insertions(+), 13 deletions(-)
>
> diff --git a/lib/librte_ring/rte_ring_c11_mem.h
> b/lib/librte_ring/rte_ring_c11_mem.h
> index 94df3c4a6..cfa3be4a7 100644
> --- a/lib/librte_ring/rte_ring_c11_mem.h
> +++ b/lib/librte_ring/rte_ring_c11_mem.h
> @@ -21,7 +21,8 @@ update_tail(struct rte_ring_headtail *ht, uint32_t
> old_val, uint32_t new_val,
>   * we need to wait for them to complete
>   */
>  if (!single)
> -while (unlikely(ht->tail != old_val))
> +while (unlikely(old_val != __atomic_load_n(&ht->tail,
> +__ATOMIC_RELAXED)))
>  rte_pause();
>
>  __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE); @@ -60,20
> +61,24 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int
> +is_sp,
>  unsigned int max = n;
>  int success;
>
> +*old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED);
>  do {
>  /* Reset n to the initial burst count */
>  n = max;
>
> -*old_head = __atomic_load_n(&r->prod.head,
> -__ATOMIC_ACQUIRE);
>
> -/*
> - *  The subtraction is done between two unsigned 32bits value
> +/* load-acquire synchronize with store-release of ht->tail
> + * in update_tail.
> + */
> +const uint32_t cons_tail = __atomic_load_n(&r->cons.tail,
> +__ATOMIC_ACQUIRE);
> +
> +/* The subtraction is done between two unsigned 32bits value
>   * (the result is always modulo 32 bits even if we have
>   * *old_head > cons_tail). So 'free_entries' is always between 0
>   * and capacity (which is < size).
>   */
> -*free_entries = (capacity + r->cons.tail - *old_head);
> +*free_entries = (capacity + cons_tail - *old_head);
>
>  /* check that we have enough room in ring */
>  if (unlikely(n > *free_entries))
> @@ -87,9 +92,10 @@ __rte_ring_move_prod_head(struct rte_ring *r,
> unsigned int is_sp,
>  if (is_sp)
>  r->prod.head = *new_head, success = 1;
>  else
> +/* on failure, *old_head is updated */
>  success = __atomic_compare_exchange_n(&r->prod.head,
>  old_head, *new_head,
> -0, __ATOMIC_ACQUIRE,
> +/*weak=*/0, __ATOMIC_RELAXED,
>  __ATOMIC_RELAXED);
>  } while (unlikely(success == 0));
>  return n;
> @@ -128,18 +134,23 @@ __rte_ring_move_cons_head(struct rte_ring *r,
> int is_sc,
>  int success;
>
>  /* move cons.head atomically */
> +*old_head = __atomic_load_n(&r->cons.head, __ATOMIC_RELAXED);
>  do {
>  /* Restore n as it may change every loop */
>  n = max;
> -*old_head = __atomic_load_n(&r->cons.head,
> -__ATOMIC_ACQUIRE);
> +
> +/* this load-acquire synchronize with store-release of ht->tail
> + * in update_tail.
> + */
> +const uint32_t prod_tail = __atomic_load_n(&r->prod.tail,
> +__ATOMIC_ACQUIRE);
>
>  /* The subtraction is done between two unsigned 32bits value
>   * (the result is always modulo 32 bits even if we have
>   * cons_head > prod_tail). So 'entries' is always between 0
>   * and size(ring)-1.
>   */
> -*entries = (r->prod.tail - *old_head);
> +*entries = (prod_tail - *old_head);
>
>  /* Set the actual entries for dequeue */
>  if (n > *entries)
> @@ -152,10 +163,11 @@ __rte_ring_move_cons_head(struct rte_ring *r,
> int is_sc,
>  if (is_sc)
>  r->cons.head = *new_head, success = 1;
>  else
> +/* on failure, *old_head will be updated */
>  success = __atomic_compare_exchange_n(&r->cons.head,
> -old_head, *new_head,
> -0, __ATOMIC_ACQUIRE,
> -__ATOMIC_RELAXED);
> +old_head, *new_head,
> +/*weak=*/0, __ATOMIC_RELAXED,
> +__ATOMIC_RELAXED);
>  } while (unlikely(success == 0));
>  return n;
>  }
> --
> 2.11.0

IMPORTANT NOTICE: The contents of this email and any attachments are confidential and may also be privileged. If you are not the intended recipient, please notify the sender immediately and do not disclose the contents to any other person, use it for any purpose, or store or copy the information in any medium. Thank you.
Jerin Jacob Aug. 8, 2018, 3:07 a.m. | #3
-----Original Message-----
> Date: Tue, 7 Aug 2018 07:56:08 +0000
> From: Gavin Hu <Gavin.Hu@arm.com>
> To: "He, Jia" <jia.he@hxt-semitech.com>, "dev@dpdk.org" <dev@dpdk.org>
> CC: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>, Steve Capper
>  <Steve.Capper@arm.com>, Ola Liljedahl <Ola.Liljedahl@arm.com>,
>  "jerin.jacob@caviumnetworks.com" <jerin.jacob@caviumnetworks.com>,
>  "hemant.agrawal@nxp.com" <hemant.agrawal@nxp.com>, "stable@dpdk.org"
>  <stable@dpdk.org>
> Subject: RE: [PATCH v2] ring: fix c11 memory ordering issue
> 
> 
> Hi Jia,
> 
> Thanks for your feedback, let's see if there are requests from others to split the fix.

+1 to split it as small patches. If possible, 4 patches, 2 for bug fix
and 2 for optimization.

Like you mentioned over all performance improvement data, Please add it
per optimization patch if possible.

> 
> The is a race condition between updating the tail and getting free/avail_entries, which is dependent on the tails.
> Which should be synchronized by load-acquire and store-release. In simple words below, step #1 and #5 should be synchronized with each other, mutually, otherwise the free_/avail_entries calculation possibly get wrong.
> 
> On each locre, either enque or deque, step #1 and #2 order should be maintained as #2 has dependency on #1,
> That's why Acquire ordering is necessary.
> 
> Please raise new questions if I don't get across this clearly.
> 
> Ring enqueue / lcore #0                       Ring deque / lcore #1
> 1. load-acquire prod_tail                      1. Load-acquire cons_tail
> 2. get free_entries                                 2. Get avail_entries
> 3. move prod_head accordingly          3. Move cons_head accordingly
> 4. do enqueue operations                    4. Do dequeue operations
> 5. store-release prod_tail                     5. Store-release cons_tail
> 
> Best Regards,
> Gavin
> -----Original Message-----
> From: He, Jia <jia.he@hxt-semitech.com>
> Sent: Tuesday, August 7, 2018 1:57 PM
> To: Gavin Hu <Gavin.Hu@arm.com>; dev@dpdk.org
> Cc: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>; Steve Capper <Steve.Capper@arm.com>; Ola Liljedahl <Ola.Liljedahl@arm.com>; jerin.jacob@caviumnetworks.com; hemant.agrawal@nxp.com; stable@dpdk.org
> Subject: RE: [PATCH v2] ring: fix c11 memory ordering issue
> 
> Hi Gavin
> > -----Original Message-----
> > From: Gavin Hu [mailto:gavin.hu@arm.com]
> > Sent: 2018年8月7日 11:20
> > To: dev@dpdk.org
> > Cc: gavin.hu@arm.com; Honnappa.Nagarahalli@arm.com;
> > steve.capper@arm.com; Ola.Liljedahl@arm.com;
> > jerin.jacob@caviumnetworks.com; hemant.agrawal@nxp.com; He, Jia
> > <jia.he@hxt-semitech.com>; stable@dpdk.org
> > Subject: [PATCH v2] ring: fix c11 memory ordering issue
> >
> > This patch includes two bug fixes(#1 and 2) and two optimisations(#3 and #4).
> 
> Maybe you need to split this into small parts.
> 
> > 1) In update_tail, read ht->tail using __atomic_load.Although the
> >    compiler currently seems to be doing the right thing even without
> >    _atomic_load, we don't want to give the compiler freedom to optimise
> >    what should be an atomic load, it should not be arbitarily moved
> >    around.
> > 2) Synchronize the load-acquire of the tail and the store-release
> >    within update_tail, the store release ensures all the ring operations,
> >    engqueu or dequeue are seen by the observers as soon as they see
> >    the updated tail. The load-acquire is required for correctly compu-
> >    tate the free_entries or avail_entries, respectively for enqueue and
> >    dequeue operations, the data dependency is not reliable for ordering
> >    as the compiler might break it by saving to temporary values to boost
> >    performance.
> 
> Could you describe the race condition in details?
> e.g.
> cpu 1cpu2
> code1
> code2
> 
> Cheers,
> Jia
> > 3) In __rte_ring_move_prod_head, move the __atomic_load_n up and out of
> >    the do {} while loop as upon failure the old_head will be updated,
> >    another load is costy and not necessary.
> > 4) When calling __atomic_compare_exchange_n, relaxed ordering for both
> >    success and failure, as multiple threads can work independently on
> >    the same end of the ring (either enqueue or dequeue) without
> >    synchronization, not as operating on tail, which has to be finished
> >    in sequence.
> >
> > The patch was benchmarked with test/ring_perf_autotest and it
> > decreases the enqueue/dequeue latency by 5% ~ 24.6% with two lcores,
> > the real gains are dependent on the number of lcores, depth of the ring, SPSC or MPMC.
> > For 1 lcore, it also improves a little, about 3 ~ 4%.
> > It is a big improvement, in case of MPMC, with rings size of 32, it
> > saves latency up to (6.90-5.20)/6.90 = 24.6%.
> >
> > Test result data:
> >
> > SP/SC bulk enq/dequeue (size: 8): 13.19 MP/MC bulk enq/dequeue (size:
> > 8): 25.79 SP/SC bulk enq/dequeue (size: 32): 3.85 MP/MC bulk
> > enq/dequeue (size: 32): 6.90
> >
> > SP/SC bulk enq/dequeue (size: 8): 12.05 MP/MC bulk enq/dequeue (size:
> > 8): 23.06 SP/SC bulk enq/dequeue (size: 32): 3.62 MP/MC bulk
> > enq/dequeue (size: 32): 5.20
> >
> > Fixes: 39368ebfc6 ("ring: introduce C11 memory model barrier option")
> > Cc: stable@dpdk.org
> >
> > Signed-off-by: Gavin Hu <gavin.hu@arm.com>
> > Reviewed-by: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>
> > Reviewed-by: Steve Capper <steve.capper@arm.com>
> > Reviewed-by: Ola Liljedahl <Ola.Liljedahl@arm.com>
> > ---
> >  lib/librte_ring/rte_ring_c11_mem.h | 38
> > +++++++++++++++++++++++++-------------
> >  1 file changed, 25 insertions(+), 13 deletions(-)
> >
> > diff --git a/lib/librte_ring/rte_ring_c11_mem.h
> > b/lib/librte_ring/rte_ring_c11_mem.h
> > index 94df3c4a6..cfa3be4a7 100644
> > --- a/lib/librte_ring/rte_ring_c11_mem.h
> > +++ b/lib/librte_ring/rte_ring_c11_mem.h
> > @@ -21,7 +21,8 @@ update_tail(struct rte_ring_headtail *ht, uint32_t
> > old_val, uint32_t new_val,
> >   * we need to wait for them to complete
> >   */
> >  if (!single)
> > -while (unlikely(ht->tail != old_val))
> > +while (unlikely(old_val != __atomic_load_n(&ht->tail,
> > +__ATOMIC_RELAXED)))
> >  rte_pause();
> >
> >  __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE); @@ -60,20
> > +61,24 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int
> > +is_sp,
> >  unsigned int max = n;
> >  int success;
> >
> > +*old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED);
> >  do {
> >  /* Reset n to the initial burst count */
> >  n = max;
> >
> > -*old_head = __atomic_load_n(&r->prod.head,
> > -__ATOMIC_ACQUIRE);
> >
> > -/*
> > - *  The subtraction is done between two unsigned 32bits value
> > +/* load-acquire synchronize with store-release of ht->tail
> > + * in update_tail.
> > + */
> > +const uint32_t cons_tail = __atomic_load_n(&r->cons.tail,
> > +__ATOMIC_ACQUIRE);
> > +
> > +/* The subtraction is done between two unsigned 32bits value
> >   * (the result is always modulo 32 bits even if we have
> >   * *old_head > cons_tail). So 'free_entries' is always between 0
> >   * and capacity (which is < size).
> >   */
> > -*free_entries = (capacity + r->cons.tail - *old_head);
> > +*free_entries = (capacity + cons_tail - *old_head);
> >
> >  /* check that we have enough room in ring */
> >  if (unlikely(n > *free_entries))
> > @@ -87,9 +92,10 @@ __rte_ring_move_prod_head(struct rte_ring *r,
> > unsigned int is_sp,
> >  if (is_sp)
> >  r->prod.head = *new_head, success = 1;
> >  else
> > +/* on failure, *old_head is updated */
> >  success = __atomic_compare_exchange_n(&r->prod.head,
> >  old_head, *new_head,
> > -0, __ATOMIC_ACQUIRE,
> > +/*weak=*/0, __ATOMIC_RELAXED,
> >  __ATOMIC_RELAXED);
> >  } while (unlikely(success == 0));
> >  return n;
> > @@ -128,18 +134,23 @@ __rte_ring_move_cons_head(struct rte_ring *r,
> > int is_sc,
> >  int success;
> >
> >  /* move cons.head atomically */
> > +*old_head = __atomic_load_n(&r->cons.head, __ATOMIC_RELAXED);
> >  do {
> >  /* Restore n as it may change every loop */
> >  n = max;
> > -*old_head = __atomic_load_n(&r->cons.head,
> > -__ATOMIC_ACQUIRE);
> > +
> > +/* this load-acquire synchronize with store-release of ht->tail
> > + * in update_tail.
> > + */
> > +const uint32_t prod_tail = __atomic_load_n(&r->prod.tail,
> > +__ATOMIC_ACQUIRE);
> >
> >  /* The subtraction is done between two unsigned 32bits value
> >   * (the result is always modulo 32 bits even if we have
> >   * cons_head > prod_tail). So 'entries' is always between 0
> >   * and size(ring)-1.
> >   */
> > -*entries = (r->prod.tail - *old_head);
> > +*entries = (prod_tail - *old_head);
> >
> >  /* Set the actual entries for dequeue */
> >  if (n > *entries)
> > @@ -152,10 +163,11 @@ __rte_ring_move_cons_head(struct rte_ring *r,
> > int is_sc,
> >  if (is_sc)
> >  r->cons.head = *new_head, success = 1;
> >  else
> > +/* on failure, *old_head will be updated */
> >  success = __atomic_compare_exchange_n(&r->cons.head,
> > -old_head, *new_head,
> > -0, __ATOMIC_ACQUIRE,
> > -__ATOMIC_RELAXED);
> > +old_head, *new_head,
> > +/*weak=*/0, __ATOMIC_RELAXED,
> > +__ATOMIC_RELAXED);
> >  } while (unlikely(success == 0));
> >  return n;
> >  }
> > --
> > 2.11.0
> 
> IMPORTANT NOTICE: The contents of this email and any attachments are confidential and may also be privileged. If you are not the intended recipient, please notify the sender immediately and do not disclose the contents to any other person, use it for any purpose, or store or copy the information in any medium. Thank you.
Thomas Monjalon Aug. 8, 2018, 7:23 a.m. | #4
08/08/2018 05:07, Jerin Jacob:
> From: Gavin Hu <Gavin.Hu@arm.com>
> > 
> > Hi Jia,
> > 
> > Thanks for your feedback, let's see if there are requests from others to split the fix.
> 
> +1 to split it as small patches. If possible, 4 patches, 2 for bug fix
> and 2 for optimization.
> 
> Like you mentioned over all performance improvement data, Please add it
> per optimization patch if possible.

Yes each fix deserves a separate patch with full explanation
of the bug, how it is fixed and how much it is improved.

Patch

diff --git a/lib/librte_ring/rte_ring_c11_mem.h b/lib/librte_ring/rte_ring_c11_mem.h
index 94df3c4a6..cfa3be4a7 100644
--- a/lib/librte_ring/rte_ring_c11_mem.h
+++ b/lib/librte_ring/rte_ring_c11_mem.h
@@ -21,7 +21,8 @@  update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
 	 * we need to wait for them to complete
 	 */
 	if (!single)
-		while (unlikely(ht->tail != old_val))
+		while (unlikely(old_val != __atomic_load_n(&ht->tail,
+						__ATOMIC_RELAXED)))
 			rte_pause();
 
 	__atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);
@@ -60,20 +61,24 @@  __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
 	unsigned int max = n;
 	int success;
 
+	*old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED);
 	do {
 		/* Reset n to the initial burst count */
 		n = max;
 
-		*old_head = __atomic_load_n(&r->prod.head,
-					__ATOMIC_ACQUIRE);
 
-		/*
-		 *  The subtraction is done between two unsigned 32bits value
+		/* load-acquire synchronize with store-release of ht->tail
+		 * in update_tail.
+		 */
+		const uint32_t cons_tail = __atomic_load_n(&r->cons.tail,
+							__ATOMIC_ACQUIRE);
+
+		/* The subtraction is done between two unsigned 32bits value
 		 * (the result is always modulo 32 bits even if we have
 		 * *old_head > cons_tail). So 'free_entries' is always between 0
 		 * and capacity (which is < size).
 		 */
-		*free_entries = (capacity + r->cons.tail - *old_head);
+		*free_entries = (capacity + cons_tail - *old_head);
 
 		/* check that we have enough room in ring */
 		if (unlikely(n > *free_entries))
@@ -87,9 +92,10 @@  __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
 		if (is_sp)
 			r->prod.head = *new_head, success = 1;
 		else
+			/* on failure, *old_head is updated */
 			success = __atomic_compare_exchange_n(&r->prod.head,
 					old_head, *new_head,
-					0, __ATOMIC_ACQUIRE,
+					/*weak=*/0, __ATOMIC_RELAXED,
 					__ATOMIC_RELAXED);
 	} while (unlikely(success == 0));
 	return n;
@@ -128,18 +134,23 @@  __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
 	int success;
 
 	/* move cons.head atomically */
+	*old_head = __atomic_load_n(&r->cons.head, __ATOMIC_RELAXED);
 	do {
 		/* Restore n as it may change every loop */
 		n = max;
-		*old_head = __atomic_load_n(&r->cons.head,
-					__ATOMIC_ACQUIRE);
+
+		/* this load-acquire synchronize with store-release of ht->tail
+		 * in update_tail.
+		 */
+		const uint32_t prod_tail = __atomic_load_n(&r->prod.tail,
+							__ATOMIC_ACQUIRE);
 
 		/* The subtraction is done between two unsigned 32bits value
 		 * (the result is always modulo 32 bits even if we have
 		 * cons_head > prod_tail). So 'entries' is always between 0
 		 * and size(ring)-1.
 		 */
-		*entries = (r->prod.tail - *old_head);
+		*entries = (prod_tail - *old_head);
 
 		/* Set the actual entries for dequeue */
 		if (n > *entries)
@@ -152,10 +163,11 @@  __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
 		if (is_sc)
 			r->cons.head = *new_head, success = 1;
 		else
+			/* on failure, *old_head will be updated */
 			success = __atomic_compare_exchange_n(&r->cons.head,
-							old_head, *new_head,
-							0, __ATOMIC_ACQUIRE,
-							__ATOMIC_RELAXED);
+						old_head, *new_head,
+						/*weak=*/0, __ATOMIC_RELAXED,
+						__ATOMIC_RELAXED);
 	} while (unlikely(success == 0));
 	return n;
 }