[1/1] ring: fix off by 1 mistake

Message ID 20220103142201.475552-2-amo@semihalf.com (mailing list archive)
State Superseded, archived
Delegated to: David Marchand
Headers
Series Minor mistake in ring (en|de)queueing |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/iol-broadcom-Performance success Performance Testing PASS
ci/iol-broadcom-Functional success Functional Testing PASS
ci/iol-intel-Functional success Functional Testing PASS
ci/iol-intel-Performance success Performance Testing PASS
ci/github-robot: build success github build: passed
ci/iol-aarch64-unit-testing success Testing PASS
ci/iol-aarch64-compile-testing success Testing PASS
ci/iol-x86_64-compile-testing success Testing PASS
ci/iol-x86_64-unit-testing success Testing PASS
ci/iol-abi-testing success Testing PASS
ci/Intel-compilation success Compilation OK
ci/intel-Testing success Testing PASS

Commit Message

Andrzej Ostruszka Jan. 3, 2022, 2:22 p.m. UTC
  When enqueueing/dequeueing to/from the ring we try to optimize by manual
loop unrolling.  The check for this optimization looks like:

	if (likely(idx + n < size)) {

where 'idx' points to the first usable element (empty slot for enqueue,
data for dequeue).  The correct comparison here should be '<=' instead
of '<'.

This is not a functional error since we fall back to the loop with
correct checks on indexes.  Just a minor suboptimal behaviour for the
case when we want to enqueue/dequeue exactly the number of elements that
we have in the ring before wrapping to its beginning.

Signed-off-by: Andrzej Ostruszka <amo@semihalf.com>
---
 lib/ring/rte_ring_elem_pvt.h | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)
  

Comments

Morten Brørup Jan. 3, 2022, 2:56 p.m. UTC | #1
+Ring queue maintainers: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>, Konstantin Ananyev <konstantin.ananyev@intel.com>

> From: Andrzej Ostruszka [mailto:amo@semihalf.com]
> Sent: Monday, 3 January 2022 15.22
> 
> When enqueueing/dequeueing to/from the ring we try to optimize by
> manual
> loop unrolling.  The check for this optimization looks like:
> 
> 	if (likely(idx + n < size)) {
> 
> where 'idx' points to the first usable element (empty slot for enqueue,
> data for dequeue).  The correct comparison here should be '<=' instead
> of '<'.
> 
> This is not a functional error since we fall back to the loop with
> correct checks on indexes.  Just a minor suboptimal behaviour for the
> case when we want to enqueue/dequeue exactly the number of elements
> that
> we have in the ring before wrapping to its beginning.
> 
> Signed-off-by: Andrzej Ostruszka <amo@semihalf.com>
> ---
>  lib/ring/rte_ring_elem_pvt.h | 12 ++++++------
>  1 file changed, 6 insertions(+), 6 deletions(-)
> 
> diff --git a/lib/ring/rte_ring_elem_pvt.h
> b/lib/ring/rte_ring_elem_pvt.h
> index 275ec55393..83788c56e6 100644
> --- a/lib/ring/rte_ring_elem_pvt.h
> +++ b/lib/ring/rte_ring_elem_pvt.h
> @@ -17,7 +17,7 @@ __rte_ring_enqueue_elems_32(struct rte_ring *r, const
> uint32_t size,
>  	unsigned int i;
>  	uint32_t *ring = (uint32_t *)&r[1];
>  	const uint32_t *obj = (const uint32_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
>  			ring[idx] = obj[i];
>  			ring[idx + 1] = obj[i + 1];
> @@ -62,7 +62,7 @@ __rte_ring_enqueue_elems_64(struct rte_ring *r,
> uint32_t prod_head,
>  	uint32_t idx = prod_head & r->mask;
>  	uint64_t *ring = (uint64_t *)&r[1];
>  	const unaligned_uint64_t *obj = (const unaligned_uint64_t
> *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
>  			ring[idx] = obj[i];
>  			ring[idx + 1] = obj[i + 1];
> @@ -95,7 +95,7 @@ __rte_ring_enqueue_elems_128(struct rte_ring *r,
> uint32_t prod_head,
>  	uint32_t idx = prod_head & r->mask;
>  	rte_int128_t *ring = (rte_int128_t *)&r[1];
>  	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
>  			memcpy((void *)(ring + idx),
>  				(const void *)(obj + i), 32);
> @@ -151,7 +151,7 @@ __rte_ring_dequeue_elems_32(struct rte_ring *r,
> const uint32_t size,
>  	unsigned int i;
>  	uint32_t *ring = (uint32_t *)&r[1];
>  	uint32_t *obj = (uint32_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
>  			obj[i] = ring[idx];
>  			obj[i + 1] = ring[idx + 1];
> @@ -196,7 +196,7 @@ __rte_ring_dequeue_elems_64(struct rte_ring *r,
> uint32_t prod_head,
>  	uint32_t idx = prod_head & r->mask;
>  	uint64_t *ring = (uint64_t *)&r[1];
>  	unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
>  			obj[i] = ring[idx];
>  			obj[i + 1] = ring[idx + 1];
> @@ -229,7 +229,7 @@ __rte_ring_dequeue_elems_128(struct rte_ring *r,
> uint32_t prod_head,
>  	uint32_t idx = prod_head & r->mask;
>  	rte_int128_t *ring = (rte_int128_t *)&r[1];
>  	rte_int128_t *obj = (rte_int128_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
>  			memcpy((void *)(obj + i), (void *)(ring + idx), 32);
>  		switch (n & 0x1) {
> --
> 2.34.1.448.ga2b2bfdf31-goog
> 

Well spotted! I took a very good look at it, and came to the same conclusion: It not a functional bug; the only consequence is that the optimized code path may not be taken in a situation where it could be taken. But it should be fixed as suggested in your patch.

Reviewed-by: Morten Brørup <mb@smartsharesystems.com>
  
Olivier Matz Jan. 6, 2022, 10:45 a.m. UTC | #2
Hi Andrzej,

On Mon, Jan 03, 2022 at 03:22:01PM +0100, Andrzej Ostruszka wrote:
> ring: fix off by 1 mistake

I suggest something less scary for the title:

  ring: optimize corner case for enqueue/dequeue

> When enqueueing/dequeueing to/from the ring we try to optimize by manual
> loop unrolling.  The check for this optimization looks like:
> 
> 	if (likely(idx + n < size)) {
> 
> where 'idx' points to the first usable element (empty slot for enqueue,
> data for dequeue).  The correct comparison here should be '<=' instead
> of '<'.
> 
> This is not a functional error since we fall back to the loop with
> correct checks on indexes.  Just a minor suboptimal behaviour for the
> case when we want to enqueue/dequeue exactly the number of elements that
> we have in the ring before wrapping to its beginning.
> 
> Signed-off-by: Andrzej Ostruszka <amo@semihalf.com>

Reviewed-by: Olivier Matz <olivier.matz@6wind.com>

I'll tend to add:
Fixes: cc4b218790f6 ("ring: support configurable element size")

But the same error was in the ENQUEUE_PTRS()/DEQUEUE_PTRS() macros
since the beginning, so we may also add:
Fixes: 286bd05bf70d ("ring: optimisations")

This macro was removed in commit 2d6ed071a8b9 ("ring: use custom element
for fixed size API")

Thanks!
  
Ananyev, Konstantin Jan. 10, 2022, 3:09 p.m. UTC | #3
> When enqueueing/dequeueing to/from the ring we try to optimize by manual
> loop unrolling.  The check for this optimization looks like:
> 
> 	if (likely(idx + n < size)) {
> 
> where 'idx' points to the first usable element (empty slot for enqueue,
> data for dequeue).  The correct comparison here should be '<=' instead
> of '<'.
> 
> This is not a functional error since we fall back to the loop with
> correct checks on indexes.  Just a minor suboptimal behaviour for the
> case when we want to enqueue/dequeue exactly the number of elements that
> we have in the ring before wrapping to its beginning.
> 
> Signed-off-by: Andrzej Ostruszka <amo@semihalf.com>
> ---
>  lib/ring/rte_ring_elem_pvt.h | 12 ++++++------
>  1 file changed, 6 insertions(+), 6 deletions(-)
> 
> diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h
> index 275ec55393..83788c56e6 100644
> --- a/lib/ring/rte_ring_elem_pvt.h
> +++ b/lib/ring/rte_ring_elem_pvt.h
> @@ -17,7 +17,7 @@ __rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size,
>  	unsigned int i;
>  	uint32_t *ring = (uint32_t *)&r[1];
>  	const uint32_t *obj = (const uint32_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
>  			ring[idx] = obj[i];
>  			ring[idx + 1] = obj[i + 1];
> @@ -62,7 +62,7 @@ __rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
>  	uint32_t idx = prod_head & r->mask;
>  	uint64_t *ring = (uint64_t *)&r[1];
>  	const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
>  			ring[idx] = obj[i];
>  			ring[idx + 1] = obj[i + 1];
> @@ -95,7 +95,7 @@ __rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
>  	uint32_t idx = prod_head & r->mask;
>  	rte_int128_t *ring = (rte_int128_t *)&r[1];
>  	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
>  			memcpy((void *)(ring + idx),
>  				(const void *)(obj + i), 32);
> @@ -151,7 +151,7 @@ __rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size,
>  	unsigned int i;
>  	uint32_t *ring = (uint32_t *)&r[1];
>  	uint32_t *obj = (uint32_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
>  			obj[i] = ring[idx];
>  			obj[i + 1] = ring[idx + 1];
> @@ -196,7 +196,7 @@ __rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t prod_head,
>  	uint32_t idx = prod_head & r->mask;
>  	uint64_t *ring = (uint64_t *)&r[1];
>  	unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
>  			obj[i] = ring[idx];
>  			obj[i + 1] = ring[idx + 1];
> @@ -229,7 +229,7 @@ __rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t prod_head,
>  	uint32_t idx = prod_head & r->mask;
>  	rte_int128_t *ring = (rte_int128_t *)&r[1];
>  	rte_int128_t *obj = (rte_int128_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
>  			memcpy((void *)(obj + i), (void *)(ring + idx), 32);
>  		switch (n & 0x1) {
> --

Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>

> 2.34.1.448.ga2b2bfdf31-goog
  
Andrzej Ostruszka Jan. 11, 2022, 11:49 a.m. UTC | #4
Thank you Morten, Olivier and Konstantin for taking look at it.
I've just sent another version, with updates in commit message suggested
by Olivier.

With regards
Andrzej Ostruszka
  

Patch

diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h
index 275ec55393..83788c56e6 100644
--- a/lib/ring/rte_ring_elem_pvt.h
+++ b/lib/ring/rte_ring_elem_pvt.h
@@ -17,7 +17,7 @@  __rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size,
 	unsigned int i;
 	uint32_t *ring = (uint32_t *)&r[1];
 	const uint32_t *obj = (const uint32_t *)obj_table;
-	if (likely(idx + n < size)) {
+	if (likely(idx + n <= size)) {
 		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
 			ring[idx] = obj[i];
 			ring[idx + 1] = obj[i + 1];
@@ -62,7 +62,7 @@  __rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
 	uint32_t idx = prod_head & r->mask;
 	uint64_t *ring = (uint64_t *)&r[1];
 	const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
-	if (likely(idx + n < size)) {
+	if (likely(idx + n <= size)) {
 		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
 			ring[idx] = obj[i];
 			ring[idx + 1] = obj[i + 1];
@@ -95,7 +95,7 @@  __rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
 	uint32_t idx = prod_head & r->mask;
 	rte_int128_t *ring = (rte_int128_t *)&r[1];
 	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
-	if (likely(idx + n < size)) {
+	if (likely(idx + n <= size)) {
 		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
 			memcpy((void *)(ring + idx),
 				(const void *)(obj + i), 32);
@@ -151,7 +151,7 @@  __rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size,
 	unsigned int i;
 	uint32_t *ring = (uint32_t *)&r[1];
 	uint32_t *obj = (uint32_t *)obj_table;
-	if (likely(idx + n < size)) {
+	if (likely(idx + n <= size)) {
 		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
 			obj[i] = ring[idx];
 			obj[i + 1] = ring[idx + 1];
@@ -196,7 +196,7 @@  __rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t prod_head,
 	uint32_t idx = prod_head & r->mask;
 	uint64_t *ring = (uint64_t *)&r[1];
 	unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
-	if (likely(idx + n < size)) {
+	if (likely(idx + n <= size)) {
 		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
 			obj[i] = ring[idx];
 			obj[i + 1] = ring[idx + 1];
@@ -229,7 +229,7 @@  __rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t prod_head,
 	uint32_t idx = prod_head & r->mask;
 	rte_int128_t *ring = (rte_int128_t *)&r[1];
 	rte_int128_t *obj = (rte_int128_t *)obj_table;
-	if (likely(idx + n < size)) {
+	if (likely(idx + n <= size)) {
 		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
 			memcpy((void *)(obj + i), (void *)(ring + idx), 32);
 		switch (n & 0x1) {