ring: optimize corner case for enqueue/dequeue
Checks
Commit Message
When enqueueing/dequeueing to/from the ring we try to optimize by manual
loop unrolling. The check for this optimization looks like:
if (likely(idx + n < size)) {
where 'idx' points to the first usable element (empty slot for enqueue,
data for dequeue). The correct comparison here should be '<=' instead
of '<'.
This is not a functional error since we fall back to the loop with
correct checks on indexes. Just a minor suboptimal behaviour for the
case when we want to enqueue/dequeue exactly the number of elements that
we have in the ring before wrapping to its beginning.
Fixes: cc4b218790f6 ("ring: support configurable element size")
Fixes: 286bd05bf70d ("ring: optimisations")
Signed-off-by: Andrzej Ostruszka <amo@semihalf.com>
Reviewed-by: Olivier Matz <olivier.matz@6wind.com>
Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
---
lib/ring/rte_ring_elem_pvt.h | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
Comments
> From: Andrzej Ostruszka [mailto:amo@semihalf.com]
> Sent: Tuesday, 11 January 2022 12.38
>
> When enqueueing/dequeueing to/from the ring we try to optimize by
> manual
> loop unrolling. The check for this optimization looks like:
>
> if (likely(idx + n < size)) {
>
> where 'idx' points to the first usable element (empty slot for enqueue,
> data for dequeue). The correct comparison here should be '<=' instead
> of '<'.
>
> This is not a functional error since we fall back to the loop with
> correct checks on indexes. Just a minor suboptimal behaviour for the
> case when we want to enqueue/dequeue exactly the number of elements
> that
> we have in the ring before wrapping to its beginning.
>
> Fixes: cc4b218790f6 ("ring: support configurable element size")
> Fixes: 286bd05bf70d ("ring: optimisations")
>
> Signed-off-by: Andrzej Ostruszka <amo@semihalf.com>
> Reviewed-by: Olivier Matz <olivier.matz@6wind.com>
> Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> ---
> lib/ring/rte_ring_elem_pvt.h | 12 ++++++------
> 1 file changed, 6 insertions(+), 6 deletions(-)
>
> diff --git a/lib/ring/rte_ring_elem_pvt.h
> b/lib/ring/rte_ring_elem_pvt.h
> index 275ec55393..83788c56e6 100644
> --- a/lib/ring/rte_ring_elem_pvt.h
> +++ b/lib/ring/rte_ring_elem_pvt.h
> @@ -17,7 +17,7 @@ __rte_ring_enqueue_elems_32(struct rte_ring *r, const
> uint32_t size,
> unsigned int i;
> uint32_t *ring = (uint32_t *)&r[1];
> const uint32_t *obj = (const uint32_t *)obj_table;
> - if (likely(idx + n < size)) {
> + if (likely(idx + n <= size)) {
> for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
> ring[idx] = obj[i];
> ring[idx + 1] = obj[i + 1];
> @@ -62,7 +62,7 @@ __rte_ring_enqueue_elems_64(struct rte_ring *r,
> uint32_t prod_head,
> uint32_t idx = prod_head & r->mask;
> uint64_t *ring = (uint64_t *)&r[1];
> const unaligned_uint64_t *obj = (const unaligned_uint64_t
> *)obj_table;
> - if (likely(idx + n < size)) {
> + if (likely(idx + n <= size)) {
> for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
> ring[idx] = obj[i];
> ring[idx + 1] = obj[i + 1];
> @@ -95,7 +95,7 @@ __rte_ring_enqueue_elems_128(struct rte_ring *r,
> uint32_t prod_head,
> uint32_t idx = prod_head & r->mask;
> rte_int128_t *ring = (rte_int128_t *)&r[1];
> const rte_int128_t *obj = (const rte_int128_t *)obj_table;
> - if (likely(idx + n < size)) {
> + if (likely(idx + n <= size)) {
> for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
> memcpy((void *)(ring + idx),
> (const void *)(obj + i), 32);
> @@ -151,7 +151,7 @@ __rte_ring_dequeue_elems_32(struct rte_ring *r,
> const uint32_t size,
> unsigned int i;
> uint32_t *ring = (uint32_t *)&r[1];
> uint32_t *obj = (uint32_t *)obj_table;
> - if (likely(idx + n < size)) {
> + if (likely(idx + n <= size)) {
> for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
> obj[i] = ring[idx];
> obj[i + 1] = ring[idx + 1];
> @@ -196,7 +196,7 @@ __rte_ring_dequeue_elems_64(struct rte_ring *r,
> uint32_t prod_head,
> uint32_t idx = prod_head & r->mask;
> uint64_t *ring = (uint64_t *)&r[1];
> unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
> - if (likely(idx + n < size)) {
> + if (likely(idx + n <= size)) {
> for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
> obj[i] = ring[idx];
> obj[i + 1] = ring[idx + 1];
> @@ -229,7 +229,7 @@ __rte_ring_dequeue_elems_128(struct rte_ring *r,
> uint32_t prod_head,
> uint32_t idx = prod_head & r->mask;
> rte_int128_t *ring = (rte_int128_t *)&r[1];
> rte_int128_t *obj = (rte_int128_t *)obj_table;
> - if (likely(idx + n < size)) {
> + if (likely(idx + n <= size)) {
> for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
> memcpy((void *)(obj + i), (void *)(ring + idx), 32);
> switch (n & 0x1) {
> --
> 2.34.1.575.g55b058a8bb-goog
>
Also this version of the patch:
Reviewed-by: Morten Brørup <mb@smartsharesystems.com>
On Tue, Jan 11, 2022 at 01:00:25PM +0100, Morten Brørup wrote:
[...]
> Also this version of the patch:
>
> Reviewed-by: Morten Brørup <mb@smartsharesystems.com>
My apologies Morten, don't know how I missed your tag.
With regards
Andrzej Ostruszka
> From: Andrzej Ostruszka [mailto:amo@semihalf.com]
> Sent: Tuesday, 11 January 2022 14.46
>
> On Tue, Jan 11, 2022 at 01:00:25PM +0100, Morten Brørup wrote:
> [...]
> > Also this version of the patch:
> >
> > Reviewed-by: Morten Brørup <mb@smartsharesystems.com>
>
> My apologies Morten, don't know how I missed your tag.
>
> With regards
> Andrzej Ostruszka
No worries. :-)
I added the tag again, for Patchwork to show a sufficient level of acceptance when the maintainer considers the patch for merging.
-Morten
On Tue, Jan 11, 2022 at 12:38 PM Andrzej Ostruszka <amo@semihalf.com> wrote:
> When enqueueing/dequeueing to/from the ring we try to optimize by manual
> loop unrolling. The check for this optimization looks like:
>
> if (likely(idx + n < size)) {
>
> where 'idx' points to the first usable element (empty slot for enqueue,
> data for dequeue). The correct comparison here should be '<=' instead
> of '<'.
>
> This is not a functional error since we fall back to the loop with
> correct checks on indexes. Just a minor suboptimal behaviour for the
> case when we want to enqueue/dequeue exactly the number of elements that
> we have in the ring before wrapping to its beginning.
>
> Fixes: cc4b218790f6 ("ring: support configurable element size")
> Fixes: 286bd05bf70d ("ring: optimisations")
>
> Signed-off-by: Andrzej Ostruszka <amo@semihalf.com>
> Reviewed-by: Olivier Matz <olivier.matz@6wind.com>
> Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
Reviewed-by: Morten Brørup <mb@smartsharesystems.com>
Applied, thanks.
@@ -17,7 +17,7 @@ __rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size,
unsigned int i;
uint32_t *ring = (uint32_t *)&r[1];
const uint32_t *obj = (const uint32_t *)obj_table;
- if (likely(idx + n < size)) {
+ if (likely(idx + n <= size)) {
for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
ring[idx] = obj[i];
ring[idx + 1] = obj[i + 1];
@@ -62,7 +62,7 @@ __rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
uint32_t idx = prod_head & r->mask;
uint64_t *ring = (uint64_t *)&r[1];
const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
- if (likely(idx + n < size)) {
+ if (likely(idx + n <= size)) {
for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
ring[idx] = obj[i];
ring[idx + 1] = obj[i + 1];
@@ -95,7 +95,7 @@ __rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
uint32_t idx = prod_head & r->mask;
rte_int128_t *ring = (rte_int128_t *)&r[1];
const rte_int128_t *obj = (const rte_int128_t *)obj_table;
- if (likely(idx + n < size)) {
+ if (likely(idx + n <= size)) {
for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
memcpy((void *)(ring + idx),
(const void *)(obj + i), 32);
@@ -151,7 +151,7 @@ __rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size,
unsigned int i;
uint32_t *ring = (uint32_t *)&r[1];
uint32_t *obj = (uint32_t *)obj_table;
- if (likely(idx + n < size)) {
+ if (likely(idx + n <= size)) {
for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
obj[i] = ring[idx];
obj[i + 1] = ring[idx + 1];
@@ -196,7 +196,7 @@ __rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t prod_head,
uint32_t idx = prod_head & r->mask;
uint64_t *ring = (uint64_t *)&r[1];
unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
- if (likely(idx + n < size)) {
+ if (likely(idx + n <= size)) {
for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
obj[i] = ring[idx];
obj[i + 1] = ring[idx + 1];
@@ -229,7 +229,7 @@ __rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t prod_head,
uint32_t idx = prod_head & r->mask;
rte_int128_t *ring = (rte_int128_t *)&r[1];
rte_int128_t *obj = (rte_int128_t *)obj_table;
- if (likely(idx + n < size)) {
+ if (likely(idx + n <= size)) {
for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
memcpy((void *)(obj + i), (void *)(ring + idx), 32);
switch (n & 0x1) {