[v1,1/4] ring: introduce extra run-time checks

Message ID 20250521111432.207936-2-konstantin.ananyev@huawei.com (mailing list archive)
State New
Delegated to: Thomas Monjalon
Headers
Series ring: some fixes and improvements |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Konstantin Ananyev May 21, 2025, 11:14 a.m. UTC
Add RTE_ASSERT() to check that different move_tail() flavors
return meaningful  *entries value.
It also helps to ensure that inside move_tail(), it uses correct
head/tail values.

Signed-off-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
---
 lib/ring/rte_ring_c11_pvt.h      | 2 +-
 lib/ring/rte_ring_elem_pvt.h     | 8 ++++++--
 lib/ring/rte_ring_hts_elem_pvt.h | 8 ++++++--
 lib/ring/rte_ring_rts_elem_pvt.h | 8 ++++++--
 lib/ring/soring.c                | 2 ++
 5 files changed, 21 insertions(+), 7 deletions(-)
  

Comments

Morten Brørup May 21, 2025, 12:14 p.m. UTC | #1
> From: Konstantin Ananyev [mailto:konstantin.ananyev@huawei.com]
> Sent: Wednesday, 21 May 2025 13.14
> 
> Add RTE_ASSERT() to check that different move_tail() flavors
> return meaningful  *entries value.
> It also helps to ensure that inside move_tail(), it uses correct
> head/tail values.
> 
> Signed-off-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
> ---
>  lib/ring/rte_ring_c11_pvt.h      | 2 +-
>  lib/ring/rte_ring_elem_pvt.h     | 8 ++++++--
>  lib/ring/rte_ring_hts_elem_pvt.h | 8 ++++++--
>  lib/ring/rte_ring_rts_elem_pvt.h | 8 ++++++--
>  lib/ring/soring.c                | 2 ++
>  5 files changed, 21 insertions(+), 7 deletions(-)
> 
> diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h
> index b9388af0da..0845cd6dcf 100644
> --- a/lib/ring/rte_ring_c11_pvt.h
> +++ b/lib/ring/rte_ring_c11_pvt.h
> @@ -104,10 +104,10 @@ __rte_ring_headtail_move_head(struct
> rte_ring_headtail *d,
>  			n = (behavior == RTE_RING_QUEUE_FIXED) ?
>  					0 : *entries;
> 
> +		*new_head = *old_head + n;
>  		if (n == 0)
>  			return 0;
> 
> -		*new_head = *old_head + n;
>  		if (is_st) {
>  			d->head = *new_head;
>  			success = 1;

Is there a need to assign a value to *new_head if n==0?

I don't think your suggestion is multi-thread safe.
If d->head moves, the value in *new_head will be incorrect.

Instead, suggest:

-		if (n == 0)
-			return 0;

		*new_head = *old_head + n;
		if (is_st) {
			d->head = *new_head;
			success = 1;
		} else
			/* on failure, *old_head is updated */
			success = rte_atomic_compare_exchange_strong_explicit(
					&d->head, old_head, *new_head,
					rte_memory_order_relaxed,
					rte_memory_order_relaxed);
	} while (unlikely(success == 0));
  
Konstantin Ananyev May 21, 2025, 12:34 p.m. UTC | #2
> > From: Konstantin Ananyev [mailto:konstantin.ananyev@huawei.com]
> > Sent: Wednesday, 21 May 2025 13.14
> >
> > Add RTE_ASSERT() to check that different move_tail() flavors
> > return meaningful  *entries value.
> > It also helps to ensure that inside move_tail(), it uses correct
> > head/tail values.
> >
> > Signed-off-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
> > ---
> >  lib/ring/rte_ring_c11_pvt.h      | 2 +-
> >  lib/ring/rte_ring_elem_pvt.h     | 8 ++++++--
> >  lib/ring/rte_ring_hts_elem_pvt.h | 8 ++++++--
> >  lib/ring/rte_ring_rts_elem_pvt.h | 8 ++++++--
> >  lib/ring/soring.c                | 2 ++
> >  5 files changed, 21 insertions(+), 7 deletions(-)
> >
> > diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h
> > index b9388af0da..0845cd6dcf 100644
> > --- a/lib/ring/rte_ring_c11_pvt.h
> > +++ b/lib/ring/rte_ring_c11_pvt.h
> > @@ -104,10 +104,10 @@ __rte_ring_headtail_move_head(struct
> > rte_ring_headtail *d,
> >  			n = (behavior == RTE_RING_QUEUE_FIXED) ?
> >  					0 : *entries;
> >
> > +		*new_head = *old_head + n;
> >  		if (n == 0)
> >  			return 0;
> >
> > -		*new_head = *old_head + n;
> >  		if (is_st) {
> >  			d->head = *new_head;
> >  			success = 1;
> 
> Is there a need to assign a value to *new_head if n==0?

Not really, main reason I just moved this line up - to keep compiler happy.
Otherwise it complained that *new_head might be left uninitialized.
 
> I don't think your suggestion is multi-thread safe.
> If d->head moves, the value in *new_head will be incorrect.

If d->head moves, then *old_head will also be incorrect.
For that case we do have CAS() below, it will return zero if (d->head != *old_head)
and we shall go to the next iteration (attempt).
Basically - if n == 0, your *old_head and *new_head might be invalid and should not be used
(and they are not used).  

> Instead, suggest:
> 
> -		if (n == 0)
> -			return 0;
> 
> 		*new_head = *old_head + n;
> 		if (is_st) {
> 			d->head = *new_head;
> 			success = 1;
> 		} else
> 			/* on failure, *old_head is updated */
> 			success = rte_atomic_compare_exchange_strong_explicit(
> 					&d->head, old_head, *new_head,
> 					rte_memory_order_relaxed,
> 					rte_memory_order_relaxed);
> 	} while (unlikely(success == 0));

That's possible, but if (n ==0) we probably don't want to update the head -
as we are not moving head - it is pointless, while still expensive.
  
Morten Brørup May 21, 2025, 6:36 p.m. UTC | #3
> From: Konstantin Ananyev [mailto:konstantin.ananyev@huawei.com]
> Sent: Wednesday, 21 May 2025 14.35
> 
> > > From: Konstantin Ananyev [mailto:konstantin.ananyev@huawei.com]
> > > Sent: Wednesday, 21 May 2025 13.14
> > >
> > > Add RTE_ASSERT() to check that different move_tail() flavors
> > > return meaningful  *entries value.
> > > It also helps to ensure that inside move_tail(), it uses correct
> > > head/tail values.
> > >
> > > Signed-off-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
> > > ---
> > >  lib/ring/rte_ring_c11_pvt.h      | 2 +-
> > >  lib/ring/rte_ring_elem_pvt.h     | 8 ++++++--
> > >  lib/ring/rte_ring_hts_elem_pvt.h | 8 ++++++--
> > >  lib/ring/rte_ring_rts_elem_pvt.h | 8 ++++++--
> > >  lib/ring/soring.c                | 2 ++
> > >  5 files changed, 21 insertions(+), 7 deletions(-)
> > >
> > > diff --git a/lib/ring/rte_ring_c11_pvt.h
> b/lib/ring/rte_ring_c11_pvt.h
> > > index b9388af0da..0845cd6dcf 100644
> > > --- a/lib/ring/rte_ring_c11_pvt.h
> > > +++ b/lib/ring/rte_ring_c11_pvt.h
> > > @@ -104,10 +104,10 @@ __rte_ring_headtail_move_head(struct
> > > rte_ring_headtail *d,
> > >  			n = (behavior == RTE_RING_QUEUE_FIXED) ?
> > >  					0 : *entries;
> > >
> > > +		*new_head = *old_head + n;
> > >  		if (n == 0)
> > >  			return 0;
> > >
> > > -		*new_head = *old_head + n;
> > >  		if (is_st) {
> > >  			d->head = *new_head;
> > >  			success = 1;
> >
> > Is there a need to assign a value to *new_head if n==0?
> 
> Not really, main reason I just moved this line up - to keep compiler
> happy.
> Otherwise it complained that *new_head might be left uninitialized.

Your change might give the impression that *new_head is used by a caller. (Like I asked about.)
To please the compiler, you could mark new_head __rte_unused, or:

-		if (n == 0)
+		if (n == 0) {
+			RTE_SET_USED(new_head);
			return 0;
+		}

> 
> > I don't think your suggestion is multi-thread safe.
> > If d->head moves, the value in *new_head will be incorrect.
> 
> If d->head moves, then *old_head will also be incorrect.
> For that case we do have CAS() below, it will return zero if (d->head
> != *old_head)
> and we shall go to the next iteration (attempt).

Exactly.
And with my suggestion the same will happen if n==0, and the next attempt will update them both, until they are both correct.

> Basically - if n == 0, your *old_head and *new_head might be invalid
> and should not be used
> (and they are not used).
> 
> > Instead, suggest:
> >
> > -		if (n == 0)
> > -			return 0;
> >
> > 		*new_head = *old_head + n;
> > 		if (is_st) {
> > 			d->head = *new_head;
> > 			success = 1;
> > 		} else
> > 			/* on failure, *old_head is updated */
> > 			success =
> rte_atomic_compare_exchange_strong_explicit(
> > 					&d->head, old_head, *new_head,
> > 					rte_memory_order_relaxed,
> > 					rte_memory_order_relaxed);
> > 	} while (unlikely(success == 0));
> 
> That's possible, but if (n ==0) we probably don't want to update the
> head -
> as we are not moving head - it is pointless, while still expensive.

Agree. Let's avoid unnecessary cost!
My suggestion was only relevant if *new_head needed to be updated when n==0.
  
Konstantin Ananyev May 21, 2025, 7:38 p.m. UTC | #4
> > From: Konstantin Ananyev [mailto:konstantin.ananyev@huawei.com]
> > Sent: Wednesday, 21 May 2025 14.35
> >
> > > > From: Konstantin Ananyev [mailto:konstantin.ananyev@huawei.com]
> > > > Sent: Wednesday, 21 May 2025 13.14
> > > >
> > > > Add RTE_ASSERT() to check that different move_tail() flavors
> > > > return meaningful  *entries value.
> > > > It also helps to ensure that inside move_tail(), it uses correct
> > > > head/tail values.
> > > >
> > > > Signed-off-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
> > > > ---
> > > >  lib/ring/rte_ring_c11_pvt.h      | 2 +-
> > > >  lib/ring/rte_ring_elem_pvt.h     | 8 ++++++--
> > > >  lib/ring/rte_ring_hts_elem_pvt.h | 8 ++++++--
> > > >  lib/ring/rte_ring_rts_elem_pvt.h | 8 ++++++--
> > > >  lib/ring/soring.c                | 2 ++
> > > >  5 files changed, 21 insertions(+), 7 deletions(-)
> > > >
> > > > diff --git a/lib/ring/rte_ring_c11_pvt.h
> > b/lib/ring/rte_ring_c11_pvt.h
> > > > index b9388af0da..0845cd6dcf 100644
> > > > --- a/lib/ring/rte_ring_c11_pvt.h
> > > > +++ b/lib/ring/rte_ring_c11_pvt.h
> > > > @@ -104,10 +104,10 @@ __rte_ring_headtail_move_head(struct
> > > > rte_ring_headtail *d,
> > > >  			n = (behavior == RTE_RING_QUEUE_FIXED) ?
> > > >  					0 : *entries;
> > > >
> > > > +		*new_head = *old_head + n;
> > > >  		if (n == 0)
> > > >  			return 0;
> > > >
> > > > -		*new_head = *old_head + n;
> > > >  		if (is_st) {
> > > >  			d->head = *new_head;
> > > >  			success = 1;
> > >
> > > Is there a need to assign a value to *new_head if n==0?
> >
> > Not really, main reason I just moved this line up - to keep compiler
> > happy.
> > Otherwise it complained that *new_head might be left uninitialized.
> 
> Your change might give the impression that *new_head is used by a caller. (Like I asked about.)
> To please the compiler, you could mark new_head __rte_unused, or:
> 
> -		if (n == 0)
> +		if (n == 0) {
> +			RTE_SET_USED(new_head);
> 			return 0;
> +		}
> 
> >

Makes sense, will re-spin.
Do you have any comments for other patches in the series?
Thanks
Konstantin 


> > > I don't think your suggestion is multi-thread safe.
> > > If d->head moves, the value in *new_head will be incorrect.
> >
> > If d->head moves, then *old_head will also be incorrect.
> > For that case we do have CAS() below, it will return zero if (d->head
> > != *old_head)
> > and we shall go to the next iteration (attempt).
> 
> Exactly.
> And with my suggestion the same will happen if n==0, and the next attempt will update them both, until they are both correct.
> 
> > Basically - if n == 0, your *old_head and *new_head might be invalid
> > and should not be used
> > (and they are not used).
> >
> > > Instead, suggest:
> > >
> > > -		if (n == 0)
> > > -			return 0;
> > >
> > > 		*new_head = *old_head + n;
> > > 		if (is_st) {
> > > 			d->head = *new_head;
> > > 			success = 1;
> > > 		} else
> > > 			/* on failure, *old_head is updated */
> > > 			success =
> > rte_atomic_compare_exchange_strong_explicit(
> > > 					&d->head, old_head, *new_head,
> > > 					rte_memory_order_relaxed,
> > > 					rte_memory_order_relaxed);
> > > 	} while (unlikely(success == 0));
> >
> > That's possible, but if (n ==0) we probably don't want to update the
> > head -
> > as we are not moving head - it is pointless, while still expensive.
> 
> Agree. Let's avoid unnecessary cost!
> My suggestion was only relevant if *new_head needed to be updated when n==0.
>
  
Morten Brørup May 21, 2025, 10:02 p.m. UTC | #5
> From: Konstantin Ananyev [mailto:konstantin.ananyev@huawei.com]
> Sent: Wednesday, 21 May 2025 21.39
> 
> > > From: Konstantin Ananyev [mailto:konstantin.ananyev@huawei.com]
> > > Sent: Wednesday, 21 May 2025 14.35
> > >
> > > > > From: Konstantin Ananyev [mailto:konstantin.ananyev@huawei.com]
> > > > > Sent: Wednesday, 21 May 2025 13.14
> > > > >
> > > > > Add RTE_ASSERT() to check that different move_tail() flavors
> > > > > return meaningful  *entries value.
> > > > > It also helps to ensure that inside move_tail(), it uses
> correct
> > > > > head/tail values.
> > > > >
> > > > > Signed-off-by: Konstantin Ananyev
> <konstantin.ananyev@huawei.com>
> > > > > ---
> > > > >  lib/ring/rte_ring_c11_pvt.h      | 2 +-
> > > > >  lib/ring/rte_ring_elem_pvt.h     | 8 ++++++--
> > > > >  lib/ring/rte_ring_hts_elem_pvt.h | 8 ++++++--
> > > > >  lib/ring/rte_ring_rts_elem_pvt.h | 8 ++++++--
> > > > >  lib/ring/soring.c                | 2 ++
> > > > >  5 files changed, 21 insertions(+), 7 deletions(-)
> > > > >
> > > > > diff --git a/lib/ring/rte_ring_c11_pvt.h
> > > b/lib/ring/rte_ring_c11_pvt.h
> > > > > index b9388af0da..0845cd6dcf 100644
> > > > > --- a/lib/ring/rte_ring_c11_pvt.h
> > > > > +++ b/lib/ring/rte_ring_c11_pvt.h
> > > > > @@ -104,10 +104,10 @@ __rte_ring_headtail_move_head(struct
> > > > > rte_ring_headtail *d,
> > > > >  			n = (behavior == RTE_RING_QUEUE_FIXED) ?
> > > > >  					0 : *entries;
> > > > >
> > > > > +		*new_head = *old_head + n;
> > > > >  		if (n == 0)
> > > > >  			return 0;
> > > > >
> > > > > -		*new_head = *old_head + n;
> > > > >  		if (is_st) {
> > > > >  			d->head = *new_head;
> > > > >  			success = 1;
> > > >
> > > > Is there a need to assign a value to *new_head if n==0?
> > >
> > > Not really, main reason I just moved this line up - to keep
> compiler
> > > happy.
> > > Otherwise it complained that *new_head might be left uninitialized.
> >
> > Your change might give the impression that *new_head is used by a
> caller. (Like I asked about.)
> > To please the compiler, you could mark new_head __rte_unused, or:
> >
> > -		if (n == 0)
> > +		if (n == 0) {
> > +			RTE_SET_USED(new_head);
> > 			return 0;
> > +		}
> >
> > >
> 
> Makes sense, will re-spin.

I'm having second thoughts about treating this compiler warning as a false positive!

Are you 100 % sure that no caller uses *new_head?
I suppose you are, because it wasn't set before this patch either, so the existing code would have a bug if some caller uses it.

But the documentation does not mention that *new_head is not set if the function returns 0.
So, some future caller might use *new_head, thus introducing a bug when n==0.

But most importantly for the performance discussion, the costly CAS is only pointless when n==0.
So, if n==0 is very unlikely, we could accept this pointless cost.
And it would save us the cost of "if (n==0) return 0;" in the hot code path.

And, as a consequence, some of the callers of this function could also remove their special handing of __rte_ring_headtail_move_head() returning 0. (Likewise, only if a return value of 0 is unlikely, and the special handling has a cost in the hot cod path for non-zero return values.)

> Do you have any comments for other patches in the series?
> Thanks
> Konstantin
> 
> 
> > > > I don't think your suggestion is multi-thread safe.
> > > > If d->head moves, the value in *new_head will be incorrect.
> > >
> > > If d->head moves, then *old_head will also be incorrect.
> > > For that case we do have CAS() below, it will return zero if (d-
> >head
> > > != *old_head)
> > > and we shall go to the next iteration (attempt).
> >
> > Exactly.
> > And with my suggestion the same will happen if n==0, and the next
> attempt will update them both, until they are both correct.
> >
> > > Basically - if n == 0, your *old_head and *new_head might be
> invalid
> > > and should not be used
> > > (and they are not used).
> > >
> > > > Instead, suggest:
> > > >
> > > > -		if (n == 0)
> > > > -			return 0;
> > > >
> > > > 		*new_head = *old_head + n;
> > > > 		if (is_st) {
> > > > 			d->head = *new_head;
> > > > 			success = 1;
> > > > 		} else
> > > > 			/* on failure, *old_head is updated */
> > > > 			success =
> > > rte_atomic_compare_exchange_strong_explicit(
> > > > 					&d->head, old_head, *new_head,
> > > > 					rte_memory_order_relaxed,
> > > > 					rte_memory_order_relaxed);
> > > > 	} while (unlikely(success == 0));
> > >
> > > That's possible, but if (n ==0) we probably don't want to update
> the
> > > head -
> > > as we are not moving head - it is pointless, while still expensive.
> >
> > Agree. Let's avoid unnecessary cost!
> > My suggestion was only relevant if *new_head needed to be updated
> when n==0.
> >
  
Konstantin Ananyev May 26, 2025, 8:39 a.m. UTC | #6
> > > > From: Konstantin Ananyev [mailto:konstantin.ananyev@huawei.com]
> > > > Sent: Wednesday, 21 May 2025 14.35
> > > >
> > > > > > From: Konstantin Ananyev [mailto:konstantin.ananyev@huawei.com]
> > > > > > Sent: Wednesday, 21 May 2025 13.14
> > > > > >
> > > > > > Add RTE_ASSERT() to check that different move_tail() flavors
> > > > > > return meaningful  *entries value.
> > > > > > It also helps to ensure that inside move_tail(), it uses
> > correct
> > > > > > head/tail values.
> > > > > >
> > > > > > Signed-off-by: Konstantin Ananyev
> > <konstantin.ananyev@huawei.com>
> > > > > > ---
> > > > > >  lib/ring/rte_ring_c11_pvt.h      | 2 +-
> > > > > >  lib/ring/rte_ring_elem_pvt.h     | 8 ++++++--
> > > > > >  lib/ring/rte_ring_hts_elem_pvt.h | 8 ++++++--
> > > > > >  lib/ring/rte_ring_rts_elem_pvt.h | 8 ++++++--
> > > > > >  lib/ring/soring.c                | 2 ++
> > > > > >  5 files changed, 21 insertions(+), 7 deletions(-)
> > > > > >
> > > > > > diff --git a/lib/ring/rte_ring_c11_pvt.h
> > > > b/lib/ring/rte_ring_c11_pvt.h
> > > > > > index b9388af0da..0845cd6dcf 100644
> > > > > > --- a/lib/ring/rte_ring_c11_pvt.h
> > > > > > +++ b/lib/ring/rte_ring_c11_pvt.h
> > > > > > @@ -104,10 +104,10 @@ __rte_ring_headtail_move_head(struct
> > > > > > rte_ring_headtail *d,
> > > > > >  			n = (behavior == RTE_RING_QUEUE_FIXED) ?
> > > > > >  					0 : *entries;
> > > > > >
> > > > > > +		*new_head = *old_head + n;
> > > > > >  		if (n == 0)
> > > > > >  			return 0;
> > > > > >
> > > > > > -		*new_head = *old_head + n;
> > > > > >  		if (is_st) {
> > > > > >  			d->head = *new_head;
> > > > > >  			success = 1;
> > > > >
> > > > > Is there a need to assign a value to *new_head if n==0?
> > > >
> > > > Not really, main reason I just moved this line up - to keep
> > compiler
> > > > happy.
> > > > Otherwise it complained that *new_head might be left uninitialized.
> > >
> > > Your change might give the impression that *new_head is used by a
> > caller. (Like I asked about.)
> > > To please the compiler, you could mark new_head __rte_unused, or:
> > >
> > > -		if (n == 0)
> > > +		if (n == 0) {
> > > +			RTE_SET_USED(new_head);
> > > 			return 0;
> > > +		}

Actually, that wouldn't help.
By some reason, after introducing RTE_ASSERT()  gcc13 believes that now cons_next can
be used (stored) unfinalized here:

        n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
                        &cons_head, &cons_next, &entries);
        if (n == 0)
                goto end;

        __rte_ring_dequeue_elems(r, cons_head, obj_table, esize, n);

        __rte_ring_update_tail(&r->cons, cons_head, cons_next, is_sc, 0);

end:
   ...

For me it is a false positive, somehow it missed that if (n==0) then update_table()
wouldn't be called  at all. Full error message below.
So making new_head always initialized, even if we are not going to use, seems
like the simplest and cleanest way to fix it.

est-pipeline_runtime.c.o -c ../app/test-pipeline/runtime.c
In file included from ../lib/eal/include/rte_bitops.h:24,
                 from ../lib/eal/include/rte_memory.h:18,
                 from ../app/test-pipeline/runtime.c:19:
In function '__rte_ring_update_tail',
    inlined from '__rte_ring_do_dequeue_elem' at ../lib/ring/rte_ring_elem_pvt.h:472:2,
    inlined from 'rte_ring_sc_dequeue_bulk_elem' at ../lib/ring/rte_ring_elem.h:344:9,
    inlined from 'rte_ring_sc_dequeue_bulk' at ../lib/ring/rte_ring.h:402:9,
    inlined from 'app_main_loop_worker' at ../app/test-pipeline/runtime.c:91:10:
../lib/eal/include/rte_stdatomic.h:139:9: error: 'cons_next' may be used uninitialized [-Werror=maybe-uninitialized]
  139 |         __atomic_store_n(ptr, val, memorder)
      |         ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
../lib/ring/rte_ring_c11_pvt.h:39:9: note: in expansion of macro 'rte_atomic_store_explicit'
   39 |         rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release);
      |         ^~~~~~~~~~~~~~~~~~~~~~~~~
In file included from ../lib/ring/rte_ring_elem.h:20,
                 from ../lib/ring/rte_ring.h:38,
                 from ../lib/mempool/rte_mempool.h:49,
                 from ../lib/mbuf/rte_mbuf.h:38,
                 from ../lib/net/rte_ether.h:20,
                 from ../app/test-pipeline/runtime.c:31:
../lib/ring/rte_ring_elem_pvt.h: In function 'app_main_loop_worker':
../lib/ring/rte_ring_elem_pvt.h:462:29: note: 'cons_next' was declared here
  462 |         uint32_t cons_head, cons_next;
      |                             ^~~~~~~~~
In function '__rte_ring_update_tail',
    inlined from '__rte_ring_do_enqueue_elem' at ../lib/ring/rte_ring_elem_pvt.h:425:2,
    inlined from 'rte_ring_sp_enqueue_bulk_elem' at ../lib/ring/rte_ring_elem.h:159:9,
    inlined from 'rte_ring_sp_enqueue_bulk' at ../lib/ring/rte_ring.h:267:9,
    inlined from 'app_main_loop_worker' at ../app/test-pipeline/runtime.c:101:11 

> > >
> > > >
> >
> > Makes sense, will re-spin.


> I'm having second thoughts about treating this compiler warning as a false positive!
> 
> Are you 100 % sure that no caller uses *new_head?
 
Yes, I believe so. If not, then we do have a severe bug in our rt_ring,

> I suppose you are, because it wasn't set before this patch either, so the existing code would have a bug if some caller uses it.
> But the documentation does not mention that *new_head is not set if the function returns 0.
> So, some future caller might use *new_head, thus introducing a bug when n==0.
> 
> But most importantly for the performance discussion, the costly CAS is only pointless when n==0.
> So, if n==0 is very unlikely, we could accept this pointless cost.
> And it would save us the cost of "if (n==0) return 0;" in the hot code path.
> 
> And, as a consequence, some of the callers of this function could also remove their special handing of
> __rte_ring_headtail_move_head() returning 0. (Likewise, only if a return value of 0 is unlikely, and the special handling has a cost in
> the hot cod path for non-zero return values.)

I don't think it is a good idea.
First of all about the cost - I suppose that situation when 'n==0' is not so uncommon:
There is a contention on the ring (many threads try to enqueue/dequeue) to/from it.
Doing unnecessary CAS() (when n==0) we introduce extra cache-snooping to already busy memory sub-system,
i.e. we slowing down not only current threads, but all other producers/consumers of the ring.
About removing extra branches: I don't think there would be many to remove.
Usually after move_head() finishes we have to do two operations:
__rte_ring_dequeue_elems() ;
__rte_ring_update_tail();
Both have to be performed only when 'n != 0'.
Regarding the doc for the move_head() function - sure it can be updated to note explicitly
that both *old_head and *new_head will contain up-to-date values only when return value
is not zero. 

> > > > > I don't think your suggestion is multi-thread safe.
> > > > > If d->head moves, the value in *new_head will be incorrect.
> > > >
> > > > If d->head moves, then *old_head will also be incorrect.
> > > > For that case we do have CAS() below, it will return zero if (d-
> > >head
> > > > != *old_head)
> > > > and we shall go to the next iteration (attempt).
> > >
> > > Exactly.
> > > And with my suggestion the same will happen if n==0, and the next
> > attempt will update them both, until they are both correct.
> > >
> > > > Basically - if n == 0, your *old_head and *new_head might be
> > invalid
> > > > and should not be used
> > > > (and they are not used).
> > > >
> > > > > Instead, suggest:
> > > > >
> > > > > -		if (n == 0)
> > > > > -			return 0;
> > > > >
> > > > > 		*new_head = *old_head + n;
> > > > > 		if (is_st) {
> > > > > 			d->head = *new_head;
> > > > > 			success = 1;
> > > > > 		} else
> > > > > 			/* on failure, *old_head is updated */
> > > > > 			success =
> > > > rte_atomic_compare_exchange_strong_explicit(
> > > > > 					&d->head, old_head, *new_head,
> > > > > 					rte_memory_order_relaxed,
> > > > > 					rte_memory_order_relaxed);
> > > > > 	} while (unlikely(success == 0));
> > > >
> > > > That's possible, but if (n ==0) we probably don't want to update
> > the
> > > > head -
> > > > as we are not moving head - it is pointless, while still expensive.
> > >
> > > Agree. Let's avoid unnecessary cost!
> > > My suggestion was only relevant if *new_head needed to be updated
> > when n==0.
> > >
>
  
Morten Brørup May 26, 2025, 9:57 a.m. UTC | #7
> From: Konstantin Ananyev [mailto:konstantin.ananyev@huawei.com]
> Sent: Monday, 26 May 2025 10.39
> 
> > > > > From: Konstantin Ananyev [mailto:konstantin.ananyev@huawei.com]
> > > > > Sent: Wednesday, 21 May 2025 14.35
> > > > >
> > > > > > > From: Konstantin Ananyev
> [mailto:konstantin.ananyev@huawei.com]
> > > > > > > Sent: Wednesday, 21 May 2025 13.14
> > > > > > >
> > > > > > > Add RTE_ASSERT() to check that different move_tail()
> flavors
> > > > > > > return meaningful  *entries value.
> > > > > > > It also helps to ensure that inside move_tail(), it uses
> > > correct
> > > > > > > head/tail values.
> > > > > > >
> > > > > > > Signed-off-by: Konstantin Ananyev
> > > <konstantin.ananyev@huawei.com>
> > > > > > > ---
> > > > > > >  lib/ring/rte_ring_c11_pvt.h      | 2 +-
> > > > > > >  lib/ring/rte_ring_elem_pvt.h     | 8 ++++++--
> > > > > > >  lib/ring/rte_ring_hts_elem_pvt.h | 8 ++++++--
> > > > > > >  lib/ring/rte_ring_rts_elem_pvt.h | 8 ++++++--
> > > > > > >  lib/ring/soring.c                | 2 ++
> > > > > > >  5 files changed, 21 insertions(+), 7 deletions(-)
> > > > > > >
> > > > > > > diff --git a/lib/ring/rte_ring_c11_pvt.h
> > > > > b/lib/ring/rte_ring_c11_pvt.h
> > > > > > > index b9388af0da..0845cd6dcf 100644
> > > > > > > --- a/lib/ring/rte_ring_c11_pvt.h
> > > > > > > +++ b/lib/ring/rte_ring_c11_pvt.h
> > > > > > > @@ -104,10 +104,10 @@ __rte_ring_headtail_move_head(struct
> > > > > > > rte_ring_headtail *d,
> > > > > > >  			n = (behavior == RTE_RING_QUEUE_FIXED) ?
> > > > > > >  					0 : *entries;
> > > > > > >
> > > > > > > +		*new_head = *old_head + n;
> > > > > > >  		if (n == 0)
> > > > > > >  			return 0;
> > > > > > >
> > > > > > > -		*new_head = *old_head + n;
> > > > > > >  		if (is_st) {
> > > > > > >  			d->head = *new_head;
> > > > > > >  			success = 1;
> > > > > >
> > > > > > Is there a need to assign a value to *new_head if n==0?
> > > > >
> > > > > Not really, main reason I just moved this line up - to keep
> > > compiler
> > > > > happy.
> > > > > Otherwise it complained that *new_head might be left
> uninitialized.
> > > >
> > > > Your change might give the impression that *new_head is used by a
> > > caller. (Like I asked about.)
> > > > To please the compiler, you could mark new_head __rte_unused, or:
> > > >
> > > > -		if (n == 0)
> > > > +		if (n == 0) {
> > > > +			RTE_SET_USED(new_head);
> > > > 			return 0;
> > > > +		}
> 
> Actually, that wouldn't help.
> By some reason, after introducing RTE_ASSERT()  gcc13 believes that now
> cons_next can
> be used (stored) unfinalized here:
> 
>         n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
>                         &cons_head, &cons_next, &entries);
>         if (n == 0)
>                 goto end;
> 
>         __rte_ring_dequeue_elems(r, cons_head, obj_table, esize, n);
> 
>         __rte_ring_update_tail(&r->cons, cons_head, cons_next, is_sc,
> 0);
> 
> end:
>    ...
> 
> For me it is a false positive, somehow it missed that if (n==0) then
> update_table()
> wouldn't be called  at all. Full error message below.
> So making new_head always initialized, even if we are not going to use,
> seems
> like the simplest and cleanest way to fix it.

NAK.
Initializing new_head with potential garbage will prevent the compiler from detecting that it is being used uninitialized afterwards.

If the compiler is too stupid to understand "goto end", then please rewrite the affected code instead:
-	if (n == 0)
-		goto end;
+	if (n != 0) {
	__rte_ring_dequeue_elems();
	__rte_ring_update_tail();
-end:
+	}
	...

> 
> est-pipeline_runtime.c.o -c ../app/test-pipeline/runtime.c
> In file included from ../lib/eal/include/rte_bitops.h:24,
>                  from ../lib/eal/include/rte_memory.h:18,
>                  from ../app/test-pipeline/runtime.c:19:
> In function '__rte_ring_update_tail',
>     inlined from '__rte_ring_do_dequeue_elem' at
> ../lib/ring/rte_ring_elem_pvt.h:472:2,
>     inlined from 'rte_ring_sc_dequeue_bulk_elem' at
> ../lib/ring/rte_ring_elem.h:344:9,
>     inlined from 'rte_ring_sc_dequeue_bulk' at
> ../lib/ring/rte_ring.h:402:9,
>     inlined from 'app_main_loop_worker' at ../app/test-
> pipeline/runtime.c:91:10:
> ../lib/eal/include/rte_stdatomic.h:139:9: error: 'cons_next' may be
> used uninitialized [-Werror=maybe-uninitialized]
>   139 |         __atomic_store_n(ptr, val, memorder)
>       |         ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> ../lib/ring/rte_ring_c11_pvt.h:39:9: note: in expansion of macro
> 'rte_atomic_store_explicit'
>    39 |         rte_atomic_store_explicit(&ht->tail, new_val,
> rte_memory_order_release);
>       |         ^~~~~~~~~~~~~~~~~~~~~~~~~
> In file included from ../lib/ring/rte_ring_elem.h:20,
>                  from ../lib/ring/rte_ring.h:38,
>                  from ../lib/mempool/rte_mempool.h:49,
>                  from ../lib/mbuf/rte_mbuf.h:38,
>                  from ../lib/net/rte_ether.h:20,
>                  from ../app/test-pipeline/runtime.c:31:
> ../lib/ring/rte_ring_elem_pvt.h: In function 'app_main_loop_worker':
> ../lib/ring/rte_ring_elem_pvt.h:462:29: note: 'cons_next' was declared
> here
>   462 |         uint32_t cons_head, cons_next;
>       |                             ^~~~~~~~~
> In function '__rte_ring_update_tail',
>     inlined from '__rte_ring_do_enqueue_elem' at
> ../lib/ring/rte_ring_elem_pvt.h:425:2,
>     inlined from 'rte_ring_sp_enqueue_bulk_elem' at
> ../lib/ring/rte_ring_elem.h:159:9,
>     inlined from 'rte_ring_sp_enqueue_bulk' at
> ../lib/ring/rte_ring.h:267:9,
>     inlined from 'app_main_loop_worker' at ../app/test-
> pipeline/runtime.c:101:11
> 
> > > >
> > > > >
> > >
> > > Makes sense, will re-spin.
> 
> 
> > I'm having second thoughts about treating this compiler warning as a
> false positive!
> >
> > Are you 100 % sure that no caller uses *new_head?
> 
> Yes, I believe so. If not, then we do have a severe bug in our rt_ring,
> 
> > I suppose you are, because it wasn't set before this patch either, so
> the existing code would have a bug if some caller uses it.
> > But the documentation does not mention that *new_head is not set if
> the function returns 0.
> > So, some future caller might use *new_head, thus introducing a bug
> when n==0.
> >
> > But most importantly for the performance discussion, the costly CAS
> is only pointless when n==0.
> > So, if n==0 is very unlikely, we could accept this pointless cost.
> > And it would save us the cost of "if (n==0) return 0;" in the hot
> code path.
> >
> > And, as a consequence, some of the callers of this function could
> also remove their special handing of
> > __rte_ring_headtail_move_head() returning 0. (Likewise, only if a
> return value of 0 is unlikely, and the special handling has a cost in
> > the hot cod path for non-zero return values.)
> 
> I don't think it is a good idea.
> First of all about the cost - I suppose that situation when 'n==0' is
> not so uncommon:
> There is a contention on the ring (many threads try to enqueue/dequeue)
> to/from it.
> Doing unnecessary CAS() (when n==0) we introduce extra cache-snooping
> to already busy memory sub-system,
> i.e. we slowing down not only current threads, but all other
> producers/consumers of the ring.
> About removing extra branches: I don't think there would be many to
> remove.
> Usually after move_head() finishes we have to do two operations:
> __rte_ring_dequeue_elems() ;
> __rte_ring_update_tail();
> Both have to be performed only when 'n != 0'.

OK, thank you for the analysis.
You convinced me my suggestion was not a good idea.

> Regarding the doc for the move_head() function - sure it can be updated
> to note explicitly
> that both *old_head and *new_head will contain up-to-date values only
> when return value
> is not zero.

Maybe there's a ripple-down effect regarding documentation...
Some of the output parameters of the functions calling __rte_ring_headtail_move_head() are also conditionally valid.

And if any of these output parameters are used anyway, your patch series might have uncovered a bug.

Note:
For the multi-threaded rings, the output parameter *entries is only up-to-date when the return value is non-zero.
If d->head was fetched into *old_head, and another thread raced to update d->head before the CAS() (which is not called when n==0, but would have failed), then *entries was set based on the old *old_head.
But that race might as well occur when returning from the function, so it shouldn't be a problem.
Your added RTE_ASSERT(*entries <= r->capacity)'s should be valid, regardless if *entries is up-to-date or based on the old *old_head.
This also means that the documentation needs no update regarding the validity of the *entries output parameter.

> 
> > > > > > I don't think your suggestion is multi-thread safe.
> > > > > > If d->head moves, the value in *new_head will be incorrect.
> > > > >
> > > > > If d->head moves, then *old_head will also be incorrect.
> > > > > For that case we do have CAS() below, it will return zero if
> (d-
> > > >head
> > > > > != *old_head)
> > > > > and we shall go to the next iteration (attempt).
> > > >
> > > > Exactly.
> > > > And with my suggestion the same will happen if n==0, and the next
> > > attempt will update them both, until they are both correct.
> > > >
> > > > > Basically - if n == 0, your *old_head and *new_head might be
> > > invalid
> > > > > and should not be used
> > > > > (and they are not used).
> > > > >
> > > > > > Instead, suggest:
> > > > > >
> > > > > > -		if (n == 0)
> > > > > > -			return 0;
> > > > > >
> > > > > > 		*new_head = *old_head + n;
> > > > > > 		if (is_st) {
> > > > > > 			d->head = *new_head;
> > > > > > 			success = 1;
> > > > > > 		} else
> > > > > > 			/* on failure, *old_head is updated */
> > > > > > 			success =
> > > > > rte_atomic_compare_exchange_strong_explicit(
> > > > > > 					&d->head, old_head, *new_head,
> > > > > > 					rte_memory_order_relaxed,
> > > > > > 					rte_memory_order_relaxed);
> > > > > > 	} while (unlikely(success == 0));
> > > > >
> > > > > That's possible, but if (n ==0) we probably don't want to
> update
> > > the
> > > > > head -
> > > > > as we are not moving head - it is pointless, while still
> expensive.
> > > >
> > > > Agree. Let's avoid unnecessary cost!
> > > > My suggestion was only relevant if *new_head needed to be updated
> > > when n==0.
> > > >
> >
  

Patch

diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h
index b9388af0da..0845cd6dcf 100644
--- a/lib/ring/rte_ring_c11_pvt.h
+++ b/lib/ring/rte_ring_c11_pvt.h
@@ -104,10 +104,10 @@  __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
 			n = (behavior == RTE_RING_QUEUE_FIXED) ?
 					0 : *entries;
 
+		*new_head = *old_head + n;
 		if (n == 0)
 			return 0;
 
-		*new_head = *old_head + n;
 		if (is_st) {
 			d->head = *new_head;
 			success = 1;
diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h
index 6eafae121f..2e71040830 100644
--- a/lib/ring/rte_ring_elem_pvt.h
+++ b/lib/ring/rte_ring_elem_pvt.h
@@ -341,8 +341,10 @@  __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
 		uint32_t *old_head, uint32_t *new_head,
 		uint32_t *free_entries)
 {
-	return __rte_ring_headtail_move_head(&r->prod, &r->cons, r->capacity,
+	n = __rte_ring_headtail_move_head(&r->prod, &r->cons, r->capacity,
 			is_sp, n, behavior, old_head, new_head, free_entries);
+	RTE_ASSERT(*free_entries <= r->capacity);
+	return n;
 }
 
 /**
@@ -374,8 +376,10 @@  __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
 		uint32_t *old_head, uint32_t *new_head,
 		uint32_t *entries)
 {
-	return __rte_ring_headtail_move_head(&r->cons, &r->prod, 0,
+	n = __rte_ring_headtail_move_head(&r->cons, &r->prod, 0,
 			is_sc, n, behavior, old_head, new_head, entries);
+	RTE_ASSERT(*entries <= r->capacity);
+	return n;
 }
 
 /**
diff --git a/lib/ring/rte_ring_hts_elem_pvt.h b/lib/ring/rte_ring_hts_elem_pvt.h
index e2b82dd1e6..c59e5f6420 100644
--- a/lib/ring/rte_ring_hts_elem_pvt.h
+++ b/lib/ring/rte_ring_hts_elem_pvt.h
@@ -136,8 +136,10 @@  __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
 	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
 	uint32_t *free_entries)
 {
-	return __rte_ring_hts_move_head(&r->hts_prod, &r->cons,
+	num = __rte_ring_hts_move_head(&r->hts_prod, &r->cons,
 			r->capacity, num, behavior, old_head, free_entries);
+	RTE_ASSERT(*free_entries <= r->capacity);
+	return num;
 }
 
 /**
@@ -148,8 +150,10 @@  __rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num,
 	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
 	uint32_t *entries)
 {
-	return __rte_ring_hts_move_head(&r->hts_cons, &r->prod,
+	num = __rte_ring_hts_move_head(&r->hts_cons, &r->prod,
 			0, num, behavior, old_head, entries);
+	RTE_ASSERT(*entries <= r->capacity);
+	return num;
 }
 
 /**
diff --git a/lib/ring/rte_ring_rts_elem_pvt.h b/lib/ring/rte_ring_rts_elem_pvt.h
index 96825931f8..509fa674fb 100644
--- a/lib/ring/rte_ring_rts_elem_pvt.h
+++ b/lib/ring/rte_ring_rts_elem_pvt.h
@@ -152,8 +152,10 @@  __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
 	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
 	uint32_t *free_entries)
 {
-	return __rte_ring_rts_move_head(&r->rts_prod, &r->cons,
+	num = __rte_ring_rts_move_head(&r->rts_prod, &r->cons,
 			r->capacity, num, behavior, old_head, free_entries);
+	RTE_ASSERT(*free_entries <= r->capacity);
+	return num;
 }
 
 /**
@@ -164,8 +166,10 @@  __rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num,
 	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
 	uint32_t *entries)
 {
-	return __rte_ring_rts_move_head(&r->rts_cons, &r->prod,
+	num = __rte_ring_rts_move_head(&r->rts_cons, &r->prod,
 			0, num, behavior, old_head, entries);
+	RTE_ASSERT(*entries <= r->capacity);
+	return num;
 }
 
 /**
diff --git a/lib/ring/soring.c b/lib/ring/soring.c
index 797484d6bf..21a1a27e24 100644
--- a/lib/ring/soring.c
+++ b/lib/ring/soring.c
@@ -156,6 +156,7 @@  __rte_soring_move_prod_head(struct rte_soring *r, uint32_t num,
 		n = 0;
 	}
 
+	RTE_ASSERT(*free <= r->capacity);
 	return n;
 }
 
@@ -190,6 +191,7 @@  __rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num,
 		n = 0;
 	}
 
+	RTE_ASSERT(*avail <= r->capacity);
 	return n;
 }