diff mbox series

[1/1] ring: fix off by 1 mistake

Message ID 20220103142201.475552-2-amo@semihalf.com (mailing list archive)
State Superseded, archived
Delegated to: David Marchand
Headers show
Series Minor mistake in ring (en|de)queueing | expand

Checks

Context Check Description
ci/intel-Testing success Testing PASS
ci/Intel-compilation success Compilation OK
ci/iol-abi-testing success Testing PASS
ci/iol-x86_64-unit-testing success Testing PASS
ci/iol-x86_64-compile-testing success Testing PASS
ci/iol-aarch64-compile-testing success Testing PASS
ci/iol-aarch64-unit-testing success Testing PASS
ci/github-robot: build success github build: passed
ci/iol-intel-Performance success Performance Testing PASS
ci/iol-intel-Functional success Functional Testing PASS
ci/iol-broadcom-Functional success Functional Testing PASS
ci/iol-broadcom-Performance success Performance Testing PASS
ci/checkpatch success coding style OK

Commit Message

Andrzej Ostruszka Jan. 3, 2022, 2:22 p.m. UTC
When enqueueing/dequeueing to/from the ring we try to optimize by manual
loop unrolling.  The check for this optimization looks like:

	if (likely(idx + n < size)) {

where 'idx' points to the first usable element (empty slot for enqueue,
data for dequeue).  The correct comparison here should be '<=' instead
of '<'.

This is not a functional error since we fall back to the loop with
correct checks on indexes.  Just a minor suboptimal behaviour for the
case when we want to enqueue/dequeue exactly the number of elements that
we have in the ring before wrapping to its beginning.

Signed-off-by: Andrzej Ostruszka <amo@semihalf.com>
---
 lib/ring/rte_ring_elem_pvt.h | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

Comments

Morten Brørup Jan. 3, 2022, 2:56 p.m. UTC | #1
+Ring queue maintainers: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>, Konstantin Ananyev <konstantin.ananyev@intel.com>

> From: Andrzej Ostruszka [mailto:amo@semihalf.com]
> Sent: Monday, 3 January 2022 15.22
> 
> When enqueueing/dequeueing to/from the ring we try to optimize by
> manual
> loop unrolling.  The check for this optimization looks like:
> 
> 	if (likely(idx + n < size)) {
> 
> where 'idx' points to the first usable element (empty slot for enqueue,
> data for dequeue).  The correct comparison here should be '<=' instead
> of '<'.
> 
> This is not a functional error since we fall back to the loop with
> correct checks on indexes.  Just a minor suboptimal behaviour for the
> case when we want to enqueue/dequeue exactly the number of elements
> that
> we have in the ring before wrapping to its beginning.
> 
> Signed-off-by: Andrzej Ostruszka <amo@semihalf.com>
> ---
>  lib/ring/rte_ring_elem_pvt.h | 12 ++++++------
>  1 file changed, 6 insertions(+), 6 deletions(-)
> 
> diff --git a/lib/ring/rte_ring_elem_pvt.h
> b/lib/ring/rte_ring_elem_pvt.h
> index 275ec55393..83788c56e6 100644
> --- a/lib/ring/rte_ring_elem_pvt.h
> +++ b/lib/ring/rte_ring_elem_pvt.h
> @@ -17,7 +17,7 @@ __rte_ring_enqueue_elems_32(struct rte_ring *r, const
> uint32_t size,
>  	unsigned int i;
>  	uint32_t *ring = (uint32_t *)&r[1];
>  	const uint32_t *obj = (const uint32_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
>  			ring[idx] = obj[i];
>  			ring[idx + 1] = obj[i + 1];
> @@ -62,7 +62,7 @@ __rte_ring_enqueue_elems_64(struct rte_ring *r,
> uint32_t prod_head,
>  	uint32_t idx = prod_head & r->mask;
>  	uint64_t *ring = (uint64_t *)&r[1];
>  	const unaligned_uint64_t *obj = (const unaligned_uint64_t
> *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
>  			ring[idx] = obj[i];
>  			ring[idx + 1] = obj[i + 1];
> @@ -95,7 +95,7 @@ __rte_ring_enqueue_elems_128(struct rte_ring *r,
> uint32_t prod_head,
>  	uint32_t idx = prod_head & r->mask;
>  	rte_int128_t *ring = (rte_int128_t *)&r[1];
>  	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
>  			memcpy((void *)(ring + idx),
>  				(const void *)(obj + i), 32);
> @@ -151,7 +151,7 @@ __rte_ring_dequeue_elems_32(struct rte_ring *r,
> const uint32_t size,
>  	unsigned int i;
>  	uint32_t *ring = (uint32_t *)&r[1];
>  	uint32_t *obj = (uint32_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
>  			obj[i] = ring[idx];
>  			obj[i + 1] = ring[idx + 1];
> @@ -196,7 +196,7 @@ __rte_ring_dequeue_elems_64(struct rte_ring *r,
> uint32_t prod_head,
>  	uint32_t idx = prod_head & r->mask;
>  	uint64_t *ring = (uint64_t *)&r[1];
>  	unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
>  			obj[i] = ring[idx];
>  			obj[i + 1] = ring[idx + 1];
> @@ -229,7 +229,7 @@ __rte_ring_dequeue_elems_128(struct rte_ring *r,
> uint32_t prod_head,
>  	uint32_t idx = prod_head & r->mask;
>  	rte_int128_t *ring = (rte_int128_t *)&r[1];
>  	rte_int128_t *obj = (rte_int128_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
>  			memcpy((void *)(obj + i), (void *)(ring + idx), 32);
>  		switch (n & 0x1) {
> --
> 2.34.1.448.ga2b2bfdf31-goog
> 

Well spotted! I took a very good look at it, and came to the same conclusion: It not a functional bug; the only consequence is that the optimized code path may not be taken in a situation where it could be taken. But it should be fixed as suggested in your patch.

Reviewed-by: Morten Brørup <mb@smartsharesystems.com>
Olivier Matz Jan. 6, 2022, 10:45 a.m. UTC | #2
Hi Andrzej,

On Mon, Jan 03, 2022 at 03:22:01PM +0100, Andrzej Ostruszka wrote:
> ring: fix off by 1 mistake

I suggest something less scary for the title:

  ring: optimize corner case for enqueue/dequeue

> When enqueueing/dequeueing to/from the ring we try to optimize by manual
> loop unrolling.  The check for this optimization looks like:
> 
> 	if (likely(idx + n < size)) {
> 
> where 'idx' points to the first usable element (empty slot for enqueue,
> data for dequeue).  The correct comparison here should be '<=' instead
> of '<'.
> 
> This is not a functional error since we fall back to the loop with
> correct checks on indexes.  Just a minor suboptimal behaviour for the
> case when we want to enqueue/dequeue exactly the number of elements that
> we have in the ring before wrapping to its beginning.
> 
> Signed-off-by: Andrzej Ostruszka <amo@semihalf.com>

Reviewed-by: Olivier Matz <olivier.matz@6wind.com>

I'll tend to add:
Fixes: cc4b218790f6 ("ring: support configurable element size")

But the same error was in the ENQUEUE_PTRS()/DEQUEUE_PTRS() macros
since the beginning, so we may also add:
Fixes: 286bd05bf70d ("ring: optimisations")

This macro was removed in commit 2d6ed071a8b9 ("ring: use custom element
for fixed size API")

Thanks!
Ananyev, Konstantin Jan. 10, 2022, 3:09 p.m. UTC | #3
> When enqueueing/dequeueing to/from the ring we try to optimize by manual
> loop unrolling.  The check for this optimization looks like:
> 
> 	if (likely(idx + n < size)) {
> 
> where 'idx' points to the first usable element (empty slot for enqueue,
> data for dequeue).  The correct comparison here should be '<=' instead
> of '<'.
> 
> This is not a functional error since we fall back to the loop with
> correct checks on indexes.  Just a minor suboptimal behaviour for the
> case when we want to enqueue/dequeue exactly the number of elements that
> we have in the ring before wrapping to its beginning.
> 
> Signed-off-by: Andrzej Ostruszka <amo@semihalf.com>
> ---
>  lib/ring/rte_ring_elem_pvt.h | 12 ++++++------
>  1 file changed, 6 insertions(+), 6 deletions(-)
> 
> diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h
> index 275ec55393..83788c56e6 100644
> --- a/lib/ring/rte_ring_elem_pvt.h
> +++ b/lib/ring/rte_ring_elem_pvt.h
> @@ -17,7 +17,7 @@ __rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size,
>  	unsigned int i;
>  	uint32_t *ring = (uint32_t *)&r[1];
>  	const uint32_t *obj = (const uint32_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
>  			ring[idx] = obj[i];
>  			ring[idx + 1] = obj[i + 1];
> @@ -62,7 +62,7 @@ __rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
>  	uint32_t idx = prod_head & r->mask;
>  	uint64_t *ring = (uint64_t *)&r[1];
>  	const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
>  			ring[idx] = obj[i];
>  			ring[idx + 1] = obj[i + 1];
> @@ -95,7 +95,7 @@ __rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
>  	uint32_t idx = prod_head & r->mask;
>  	rte_int128_t *ring = (rte_int128_t *)&r[1];
>  	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
>  			memcpy((void *)(ring + idx),
>  				(const void *)(obj + i), 32);
> @@ -151,7 +151,7 @@ __rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size,
>  	unsigned int i;
>  	uint32_t *ring = (uint32_t *)&r[1];
>  	uint32_t *obj = (uint32_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
>  			obj[i] = ring[idx];
>  			obj[i + 1] = ring[idx + 1];
> @@ -196,7 +196,7 @@ __rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t prod_head,
>  	uint32_t idx = prod_head & r->mask;
>  	uint64_t *ring = (uint64_t *)&r[1];
>  	unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
>  			obj[i] = ring[idx];
>  			obj[i + 1] = ring[idx + 1];
> @@ -229,7 +229,7 @@ __rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t prod_head,
>  	uint32_t idx = prod_head & r->mask;
>  	rte_int128_t *ring = (rte_int128_t *)&r[1];
>  	rte_int128_t *obj = (rte_int128_t *)obj_table;
> -	if (likely(idx + n < size)) {
> +	if (likely(idx + n <= size)) {
>  		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
>  			memcpy((void *)(obj + i), (void *)(ring + idx), 32);
>  		switch (n & 0x1) {
> --

Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>

> 2.34.1.448.ga2b2bfdf31-goog
Andrzej Ostruszka Jan. 11, 2022, 11:49 a.m. UTC | #4
Thank you Morten, Olivier and Konstantin for taking look at it.
I've just sent another version, with updates in commit message suggested
by Olivier.

With regards
Andrzej Ostruszka
diff mbox series

Patch

diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h
index 275ec55393..83788c56e6 100644
--- a/lib/ring/rte_ring_elem_pvt.h
+++ b/lib/ring/rte_ring_elem_pvt.h
@@ -17,7 +17,7 @@  __rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size,
 	unsigned int i;
 	uint32_t *ring = (uint32_t *)&r[1];
 	const uint32_t *obj = (const uint32_t *)obj_table;
-	if (likely(idx + n < size)) {
+	if (likely(idx + n <= size)) {
 		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
 			ring[idx] = obj[i];
 			ring[idx + 1] = obj[i + 1];
@@ -62,7 +62,7 @@  __rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
 	uint32_t idx = prod_head & r->mask;
 	uint64_t *ring = (uint64_t *)&r[1];
 	const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
-	if (likely(idx + n < size)) {
+	if (likely(idx + n <= size)) {
 		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
 			ring[idx] = obj[i];
 			ring[idx + 1] = obj[i + 1];
@@ -95,7 +95,7 @@  __rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
 	uint32_t idx = prod_head & r->mask;
 	rte_int128_t *ring = (rte_int128_t *)&r[1];
 	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
-	if (likely(idx + n < size)) {
+	if (likely(idx + n <= size)) {
 		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
 			memcpy((void *)(ring + idx),
 				(const void *)(obj + i), 32);
@@ -151,7 +151,7 @@  __rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size,
 	unsigned int i;
 	uint32_t *ring = (uint32_t *)&r[1];
 	uint32_t *obj = (uint32_t *)obj_table;
-	if (likely(idx + n < size)) {
+	if (likely(idx + n <= size)) {
 		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
 			obj[i] = ring[idx];
 			obj[i + 1] = ring[idx + 1];
@@ -196,7 +196,7 @@  __rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t prod_head,
 	uint32_t idx = prod_head & r->mask;
 	uint64_t *ring = (uint64_t *)&r[1];
 	unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
-	if (likely(idx + n < size)) {
+	if (likely(idx + n <= size)) {
 		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
 			obj[i] = ring[idx];
 			obj[i + 1] = ring[idx + 1];
@@ -229,7 +229,7 @@  __rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t prod_head,
 	uint32_t idx = prod_head & r->mask;
 	rte_int128_t *ring = (rte_int128_t *)&r[1];
 	rte_int128_t *obj = (rte_int128_t *)obj_table;
-	if (likely(idx + n < size)) {
+	if (likely(idx + n <= size)) {
 		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
 			memcpy((void *)(obj + i), (void *)(ring + idx), 32);
 		switch (n & 0x1) {