Message ID | 20220103142201.475552-2-amo@semihalf.com (mailing list archive) |
---|---|
State | Superseded, archived |
Delegated to: | David Marchand |
Headers | show |
Series | Minor mistake in ring (en|de)queueing | expand |
Context | Check | Description |
---|---|---|
ci/intel-Testing | success | Testing PASS |
ci/Intel-compilation | success | Compilation OK |
ci/iol-abi-testing | success | Testing PASS |
ci/iol-x86_64-unit-testing | success | Testing PASS |
ci/iol-x86_64-compile-testing | success | Testing PASS |
ci/iol-aarch64-compile-testing | success | Testing PASS |
ci/iol-aarch64-unit-testing | success | Testing PASS |
ci/github-robot: build | success | github build: passed |
ci/iol-intel-Performance | success | Performance Testing PASS |
ci/iol-intel-Functional | success | Functional Testing PASS |
ci/iol-broadcom-Functional | success | Functional Testing PASS |
ci/iol-broadcom-Performance | success | Performance Testing PASS |
ci/checkpatch | success | coding style OK |
+Ring queue maintainers: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>, Konstantin Ananyev <konstantin.ananyev@intel.com> > From: Andrzej Ostruszka [mailto:amo@semihalf.com] > Sent: Monday, 3 January 2022 15.22 > > When enqueueing/dequeueing to/from the ring we try to optimize by > manual > loop unrolling. The check for this optimization looks like: > > if (likely(idx + n < size)) { > > where 'idx' points to the first usable element (empty slot for enqueue, > data for dequeue). The correct comparison here should be '<=' instead > of '<'. > > This is not a functional error since we fall back to the loop with > correct checks on indexes. Just a minor suboptimal behaviour for the > case when we want to enqueue/dequeue exactly the number of elements > that > we have in the ring before wrapping to its beginning. > > Signed-off-by: Andrzej Ostruszka <amo@semihalf.com> > --- > lib/ring/rte_ring_elem_pvt.h | 12 ++++++------ > 1 file changed, 6 insertions(+), 6 deletions(-) > > diff --git a/lib/ring/rte_ring_elem_pvt.h > b/lib/ring/rte_ring_elem_pvt.h > index 275ec55393..83788c56e6 100644 > --- a/lib/ring/rte_ring_elem_pvt.h > +++ b/lib/ring/rte_ring_elem_pvt.h > @@ -17,7 +17,7 @@ __rte_ring_enqueue_elems_32(struct rte_ring *r, const > uint32_t size, > unsigned int i; > uint32_t *ring = (uint32_t *)&r[1]; > const uint32_t *obj = (const uint32_t *)obj_table; > - if (likely(idx + n < size)) { > + if (likely(idx + n <= size)) { > for (i = 0; i < (n & ~0x7); i += 8, idx += 8) { > ring[idx] = obj[i]; > ring[idx + 1] = obj[i + 1]; > @@ -62,7 +62,7 @@ __rte_ring_enqueue_elems_64(struct rte_ring *r, > uint32_t prod_head, > uint32_t idx = prod_head & r->mask; > uint64_t *ring = (uint64_t *)&r[1]; > const unaligned_uint64_t *obj = (const unaligned_uint64_t > *)obj_table; > - if (likely(idx + n < size)) { > + if (likely(idx + n <= size)) { > for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { > ring[idx] = obj[i]; > ring[idx + 1] = obj[i + 1]; > @@ -95,7 +95,7 @@ __rte_ring_enqueue_elems_128(struct rte_ring *r, > uint32_t prod_head, > uint32_t idx = prod_head & r->mask; > rte_int128_t *ring = (rte_int128_t *)&r[1]; > const rte_int128_t *obj = (const rte_int128_t *)obj_table; > - if (likely(idx + n < size)) { > + if (likely(idx + n <= size)) { > for (i = 0; i < (n & ~0x1); i += 2, idx += 2) > memcpy((void *)(ring + idx), > (const void *)(obj + i), 32); > @@ -151,7 +151,7 @@ __rte_ring_dequeue_elems_32(struct rte_ring *r, > const uint32_t size, > unsigned int i; > uint32_t *ring = (uint32_t *)&r[1]; > uint32_t *obj = (uint32_t *)obj_table; > - if (likely(idx + n < size)) { > + if (likely(idx + n <= size)) { > for (i = 0; i < (n & ~0x7); i += 8, idx += 8) { > obj[i] = ring[idx]; > obj[i + 1] = ring[idx + 1]; > @@ -196,7 +196,7 @@ __rte_ring_dequeue_elems_64(struct rte_ring *r, > uint32_t prod_head, > uint32_t idx = prod_head & r->mask; > uint64_t *ring = (uint64_t *)&r[1]; > unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table; > - if (likely(idx + n < size)) { > + if (likely(idx + n <= size)) { > for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { > obj[i] = ring[idx]; > obj[i + 1] = ring[idx + 1]; > @@ -229,7 +229,7 @@ __rte_ring_dequeue_elems_128(struct rte_ring *r, > uint32_t prod_head, > uint32_t idx = prod_head & r->mask; > rte_int128_t *ring = (rte_int128_t *)&r[1]; > rte_int128_t *obj = (rte_int128_t *)obj_table; > - if (likely(idx + n < size)) { > + if (likely(idx + n <= size)) { > for (i = 0; i < (n & ~0x1); i += 2, idx += 2) > memcpy((void *)(obj + i), (void *)(ring + idx), 32); > switch (n & 0x1) { > -- > 2.34.1.448.ga2b2bfdf31-goog > Well spotted! I took a very good look at it, and came to the same conclusion: It not a functional bug; the only consequence is that the optimized code path may not be taken in a situation where it could be taken. But it should be fixed as suggested in your patch. Reviewed-by: Morten Brørup <mb@smartsharesystems.com>
Hi Andrzej, On Mon, Jan 03, 2022 at 03:22:01PM +0100, Andrzej Ostruszka wrote: > ring: fix off by 1 mistake I suggest something less scary for the title: ring: optimize corner case for enqueue/dequeue > When enqueueing/dequeueing to/from the ring we try to optimize by manual > loop unrolling. The check for this optimization looks like: > > if (likely(idx + n < size)) { > > where 'idx' points to the first usable element (empty slot for enqueue, > data for dequeue). The correct comparison here should be '<=' instead > of '<'. > > This is not a functional error since we fall back to the loop with > correct checks on indexes. Just a minor suboptimal behaviour for the > case when we want to enqueue/dequeue exactly the number of elements that > we have in the ring before wrapping to its beginning. > > Signed-off-by: Andrzej Ostruszka <amo@semihalf.com> Reviewed-by: Olivier Matz <olivier.matz@6wind.com> I'll tend to add: Fixes: cc4b218790f6 ("ring: support configurable element size") But the same error was in the ENQUEUE_PTRS()/DEQUEUE_PTRS() macros since the beginning, so we may also add: Fixes: 286bd05bf70d ("ring: optimisations") This macro was removed in commit 2d6ed071a8b9 ("ring: use custom element for fixed size API") Thanks!
> When enqueueing/dequeueing to/from the ring we try to optimize by manual > loop unrolling. The check for this optimization looks like: > > if (likely(idx + n < size)) { > > where 'idx' points to the first usable element (empty slot for enqueue, > data for dequeue). The correct comparison here should be '<=' instead > of '<'. > > This is not a functional error since we fall back to the loop with > correct checks on indexes. Just a minor suboptimal behaviour for the > case when we want to enqueue/dequeue exactly the number of elements that > we have in the ring before wrapping to its beginning. > > Signed-off-by: Andrzej Ostruszka <amo@semihalf.com> > --- > lib/ring/rte_ring_elem_pvt.h | 12 ++++++------ > 1 file changed, 6 insertions(+), 6 deletions(-) > > diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h > index 275ec55393..83788c56e6 100644 > --- a/lib/ring/rte_ring_elem_pvt.h > +++ b/lib/ring/rte_ring_elem_pvt.h > @@ -17,7 +17,7 @@ __rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size, > unsigned int i; > uint32_t *ring = (uint32_t *)&r[1]; > const uint32_t *obj = (const uint32_t *)obj_table; > - if (likely(idx + n < size)) { > + if (likely(idx + n <= size)) { > for (i = 0; i < (n & ~0x7); i += 8, idx += 8) { > ring[idx] = obj[i]; > ring[idx + 1] = obj[i + 1]; > @@ -62,7 +62,7 @@ __rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head, > uint32_t idx = prod_head & r->mask; > uint64_t *ring = (uint64_t *)&r[1]; > const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table; > - if (likely(idx + n < size)) { > + if (likely(idx + n <= size)) { > for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { > ring[idx] = obj[i]; > ring[idx + 1] = obj[i + 1]; > @@ -95,7 +95,7 @@ __rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head, > uint32_t idx = prod_head & r->mask; > rte_int128_t *ring = (rte_int128_t *)&r[1]; > const rte_int128_t *obj = (const rte_int128_t *)obj_table; > - if (likely(idx + n < size)) { > + if (likely(idx + n <= size)) { > for (i = 0; i < (n & ~0x1); i += 2, idx += 2) > memcpy((void *)(ring + idx), > (const void *)(obj + i), 32); > @@ -151,7 +151,7 @@ __rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size, > unsigned int i; > uint32_t *ring = (uint32_t *)&r[1]; > uint32_t *obj = (uint32_t *)obj_table; > - if (likely(idx + n < size)) { > + if (likely(idx + n <= size)) { > for (i = 0; i < (n & ~0x7); i += 8, idx += 8) { > obj[i] = ring[idx]; > obj[i + 1] = ring[idx + 1]; > @@ -196,7 +196,7 @@ __rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t prod_head, > uint32_t idx = prod_head & r->mask; > uint64_t *ring = (uint64_t *)&r[1]; > unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table; > - if (likely(idx + n < size)) { > + if (likely(idx + n <= size)) { > for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { > obj[i] = ring[idx]; > obj[i + 1] = ring[idx + 1]; > @@ -229,7 +229,7 @@ __rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t prod_head, > uint32_t idx = prod_head & r->mask; > rte_int128_t *ring = (rte_int128_t *)&r[1]; > rte_int128_t *obj = (rte_int128_t *)obj_table; > - if (likely(idx + n < size)) { > + if (likely(idx + n <= size)) { > for (i = 0; i < (n & ~0x1); i += 2, idx += 2) > memcpy((void *)(obj + i), (void *)(ring + idx), 32); > switch (n & 0x1) { > -- Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com> > 2.34.1.448.ga2b2bfdf31-goog
Thank you Morten, Olivier and Konstantin for taking look at it. I've just sent another version, with updates in commit message suggested by Olivier. With regards Andrzej Ostruszka
diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h index 275ec55393..83788c56e6 100644 --- a/lib/ring/rte_ring_elem_pvt.h +++ b/lib/ring/rte_ring_elem_pvt.h @@ -17,7 +17,7 @@ __rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size, unsigned int i; uint32_t *ring = (uint32_t *)&r[1]; const uint32_t *obj = (const uint32_t *)obj_table; - if (likely(idx + n < size)) { + if (likely(idx + n <= size)) { for (i = 0; i < (n & ~0x7); i += 8, idx += 8) { ring[idx] = obj[i]; ring[idx + 1] = obj[i + 1]; @@ -62,7 +62,7 @@ __rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head, uint32_t idx = prod_head & r->mask; uint64_t *ring = (uint64_t *)&r[1]; const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table; - if (likely(idx + n < size)) { + if (likely(idx + n <= size)) { for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { ring[idx] = obj[i]; ring[idx + 1] = obj[i + 1]; @@ -95,7 +95,7 @@ __rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head, uint32_t idx = prod_head & r->mask; rte_int128_t *ring = (rte_int128_t *)&r[1]; const rte_int128_t *obj = (const rte_int128_t *)obj_table; - if (likely(idx + n < size)) { + if (likely(idx + n <= size)) { for (i = 0; i < (n & ~0x1); i += 2, idx += 2) memcpy((void *)(ring + idx), (const void *)(obj + i), 32); @@ -151,7 +151,7 @@ __rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size, unsigned int i; uint32_t *ring = (uint32_t *)&r[1]; uint32_t *obj = (uint32_t *)obj_table; - if (likely(idx + n < size)) { + if (likely(idx + n <= size)) { for (i = 0; i < (n & ~0x7); i += 8, idx += 8) { obj[i] = ring[idx]; obj[i + 1] = ring[idx + 1]; @@ -196,7 +196,7 @@ __rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t prod_head, uint32_t idx = prod_head & r->mask; uint64_t *ring = (uint64_t *)&r[1]; unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table; - if (likely(idx + n < size)) { + if (likely(idx + n <= size)) { for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { obj[i] = ring[idx]; obj[i + 1] = ring[idx + 1]; @@ -229,7 +229,7 @@ __rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t prod_head, uint32_t idx = prod_head & r->mask; rte_int128_t *ring = (rte_int128_t *)&r[1]; rte_int128_t *obj = (rte_int128_t *)obj_table; - if (likely(idx + n < size)) { + if (likely(idx + n <= size)) { for (i = 0; i < (n & ~0x1); i += 2, idx += 2) memcpy((void *)(obj + i), (void *)(ring + idx), 32); switch (n & 0x1) {
When enqueueing/dequeueing to/from the ring we try to optimize by manual loop unrolling. The check for this optimization looks like: if (likely(idx + n < size)) { where 'idx' points to the first usable element (empty slot for enqueue, data for dequeue). The correct comparison here should be '<=' instead of '<'. This is not a functional error since we fall back to the loop with correct checks on indexes. Just a minor suboptimal behaviour for the case when we want to enqueue/dequeue exactly the number of elements that we have in the ring before wrapping to its beginning. Signed-off-by: Andrzej Ostruszka <amo@semihalf.com> --- lib/ring/rte_ring_elem_pvt.h | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-)