[v4,2/8] test/distributor: synchronize lcores statistics
diff mbox series

Message ID 20200925224209.12173-3-l.wojciechow@partner.samsung.com
State Superseded
Delegated to: David Marchand
Headers show
Series
  • fix distributor synchronization issues
Related show

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Lukasz Wojciechowski Sept. 25, 2020, 10:42 p.m. UTC
Statistics of handled packets are cleared and read on main lcore,
while they are increased in workers handlers on different lcores.

Without synchronization occasionally showed invalid values.
This patch uses atomic acquire/release mechanisms to synchronize.

Fixes: c3eabff124e6 ("distributor: add unit tests")
Cc: bruce.richardson@intel.com
Cc: stable@dpdk.org

Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Acked-by: David Hunt <david.hunt@intel.com>
---
 app/test/test_distributor.c | 39 ++++++++++++++++++++++++-------------
 1 file changed, 26 insertions(+), 13 deletions(-)

Comments

Honnappa Nagarahalli Sept. 29, 2020, 5:49 a.m. UTC | #1
<snip>

> 
> Statistics of handled packets are cleared and read on main lcore, while they
> are increased in workers handlers on different lcores.
> 
> Without synchronization occasionally showed invalid values.
> This patch uses atomic acquire/release mechanisms to synchronize.
In general, load-acquire and store-release memory orderings are required while synchronizing data (that cannot be updated atomically) between threads. In the situation, making counters atomic is enough.

> 
> Fixes: c3eabff124e6 ("distributor: add unit tests")
> Cc: bruce.richardson@intel.com
> Cc: stable@dpdk.org
> 
> Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
> Acked-by: David Hunt <david.hunt@intel.com>
> ---
>  app/test/test_distributor.c | 39 ++++++++++++++++++++++++-------------
>  1 file changed, 26 insertions(+), 13 deletions(-)
> 
> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index
> 35b25463a..0e49e3714 100644
> --- a/app/test/test_distributor.c
> +++ b/app/test/test_distributor.c
> @@ -43,7 +43,8 @@ total_packet_count(void)  {
>  	unsigned i, count = 0;
>  	for (i = 0; i < worker_idx; i++)
> -		count += worker_stats[i].handled_packets;
> +		count +=
> __atomic_load_n(&worker_stats[i].handled_packets,
> +				__ATOMIC_ACQUIRE);
RELAXED memory order is sufficient. For ex: the worker threads are not 'releasing' any data that is not atomically updated to the main thread.

>  	return count;
>  }
> 
> @@ -52,6 +53,7 @@ static inline void
>  clear_packet_count(void)
>  {
>  	memset(&worker_stats, 0, sizeof(worker_stats));
> +	rte_atomic_thread_fence(__ATOMIC_RELEASE);
Ideally, the counters should be set to 0 atomically rather than using a memset.

>  }
> 
>  /* this is the basic worker function for sanity test @@ -72,13 +74,13 @@
> handle_work(void *arg)
>  	num = rte_distributor_get_pkt(db, id, buf, buf, num);
>  	while (!quit) {
>  		__atomic_fetch_add(&worker_stats[id].handled_packets,
> num,
> -				__ATOMIC_RELAXED);
> +				__ATOMIC_ACQ_REL);
Using the __ATOMIC_ACQ_REL order does not mean anything to the main thread. The main thread might still see the updates from different threads in different order.

>  		count += num;
>  		num = rte_distributor_get_pkt(db, id,
>  				buf, buf, num);
>  	}
>  	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
> -			__ATOMIC_RELAXED);
> +			__ATOMIC_ACQ_REL);
Same here, do not see why this change is required.

>  	count += num;
>  	rte_distributor_return_pkt(db, id, buf, num);
>  	return 0;
> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct
> rte_mempool *p)
> 
>  	for (i = 0; i < rte_lcore_count() - 1; i++)
>  		printf("Worker %u handled %u packets\n", i,
> -				worker_stats[i].handled_packets);
> +			__atomic_load_n(&worker_stats[i].handled_packets,
> +					__ATOMIC_ACQUIRE));
__ATOMIC_RELAXED is enough.

>  	printf("Sanity test with all zero hashes done.\n");
> 
>  	/* pick two flows and check they go correctly */ @@ -159,7 +162,9
> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
> 
>  		for (i = 0; i < rte_lcore_count() - 1; i++)
>  			printf("Worker %u handled %u packets\n", i,
> -					worker_stats[i].handled_packets);
> +				__atomic_load_n(
> +					&worker_stats[i].handled_packets,
> +					__ATOMIC_ACQUIRE));
__ATOMIC_RELAXED is enough

>  		printf("Sanity test with two hash values done\n");
>  	}
> 
> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct
> rte_mempool *p)
> 
>  	for (i = 0; i < rte_lcore_count() - 1; i++)
>  		printf("Worker %u handled %u packets\n", i,
> -				worker_stats[i].handled_packets);
> +			__atomic_load_n(&worker_stats[i].handled_packets,
> +					__ATOMIC_ACQUIRE));
__ATOMIC_RELAXED is enough

>  	printf("Sanity test with non-zero hashes done\n");
> 
>  	rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15
> +286,17 @@ handle_work_with_free_mbufs(void *arg)
>  		buf[i] = NULL;
>  	num = rte_distributor_get_pkt(d, id, buf, buf, num);
>  	while (!quit) {
> -		worker_stats[id].handled_packets += num;
>  		count += num;
> +		__atomic_fetch_add(&worker_stats[id].handled_packets,
> num,
> +				__ATOMIC_ACQ_REL);
IMO, the problem would be the non-atomic update of the statistics. So, __ATOMIC_RELAXED is enough

>  		for (i = 0; i < num; i++)
>  			rte_pktmbuf_free(buf[i]);
>  		num = rte_distributor_get_pkt(d,
>  				id, buf, buf, num);
>  	}
> -	worker_stats[id].handled_packets += num;
>  	count += num;
> +	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
> +			__ATOMIC_ACQ_REL);
Same here, the problem is non-atomic update of the statistics, __ATOMIC_RELAXED is enough.
Similarly, for changes below, __ATOMIC_RELAXED is enough.

>  	rte_distributor_return_pkt(d, id, buf, num);
>  	return 0;
>  }
> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg)
>  	/* wait for quit single globally, or for worker zero, wait
>  	 * for zero_quit */
>  	while (!quit && !(id == zero_id && zero_quit)) {
> -		worker_stats[id].handled_packets += num;
>  		count += num;
> +		__atomic_fetch_add(&worker_stats[id].handled_packets,
> num,
> +				__ATOMIC_ACQ_REL);
>  		for (i = 0; i < num; i++)
>  			rte_pktmbuf_free(buf[i]);
>  		num = rte_distributor_get_pkt(d,
> @@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg)
> 
>  		total += num;
>  	}
> -	worker_stats[id].handled_packets += num;
>  	count += num;
>  	returned = rte_distributor_return_pkt(d, id, buf, num);
> 
> +	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
> +			__ATOMIC_ACQ_REL);
>  	if (id == zero_id) {
>  		/* for worker zero, allow it to restart to pick up last packet
>  		 * when all workers are shutting down.
> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg)
>  				id, buf, buf, num);
> 
>  		while (!quit) {
> -			worker_stats[id].handled_packets += num;
>  			count += num;
>  			rte_pktmbuf_free(pkt);
>  			num = rte_distributor_get_pkt(d, id, buf, buf, num);
> +
> 	__atomic_fetch_add(&worker_stats[id].handled_packets,
> +					num, __ATOMIC_ACQ_REL);
>  		}
>  		returned = rte_distributor_return_pkt(d,
>  				id, buf, num);
> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct
> worker_params *wp,
> 
>  	for (i = 0; i < rte_lcore_count() - 1; i++)
>  		printf("Worker %u handled %u packets\n", i,
> -				worker_stats[i].handled_packets);
> +			__atomic_load_n(&worker_stats[i].handled_packets,
> +					__ATOMIC_ACQUIRE));
> 
>  	if (total_packet_count() != BURST * 2) {
>  		printf("Line %d: Error, not all packets flushed. "
> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct
> worker_params *wp,
>  	zero_quit = 0;
>  	for (i = 0; i < rte_lcore_count() - 1; i++)
>  		printf("Worker %u handled %u packets\n", i,
> -				worker_stats[i].handled_packets);
> +			__atomic_load_n(&worker_stats[i].handled_packets,
> +					__ATOMIC_ACQUIRE));
> 
>  	if (total_packet_count() != BURST) {
>  		printf("Line %d: Error, not all packets flushed. "
> --
> 2.17.1
Lukasz Wojciechowski Oct. 2, 2020, 11:25 a.m. UTC | #2
Hi Honnappa,

Many thanks for the review!

I'll write my answers here not inline as it would be easier to read them 
in one place, I think.
So first of all I agree with you in 2 things:
1) all uses of statistics must be atomic and lack of that caused most of 
the problems
2) it would be better to replace barrier and memset in 
clear_packet_count() with atomic stores as you suggested

So I will apply both of above.

However I wasn't not fully convinced on changing acquire/release to 
relaxed. It wood be perfectly ok
if it would look like in this Herb Sutter's example: 
https://youtu.be/KeLBd2EJLOU?t=4170
But in his case the counters are cleared before worker threads start and 
are printout after they are completed.

In case of the dpdk distributor tests both worker and main cores are 
running at the same time. In the sanity_test, the statistics are cleared 
and verified few times for different hashes of packages. The worker 
cores are not stopped at this time and they continue their loops in 
handle procedure. Verification made in main core is an exchange of data 
as the current statistics indicate how the test will result.

So as I wasn't convinced, I run some tests with both both relaxed and 
acquire/release modes and they both fail :(
The failures caused by statistics errors to number of tests ratio for 
200000 tests was:
for relaxed: 0,000790562
for acq/rel: 0,000091321


That's why I'm going to modify tests in such way, that they would:
1) clear statistics
2) launch worker threads
3) run test
4) wait for workers procedures to complete
5) check stats, verify results and print them out

This way worker main core will use (clear or verify) stats only when 
there are no worker threads. This would make things simpler and allowing 
to focus on testing the distributor not tests. And of course relaxed 
mode would be enough!


Best regards
Lukasz


W dniu 29.09.2020 o 07:49, Honnappa Nagarahalli pisze:
> <snip>
>
>> Statistics of handled packets are cleared and read on main lcore, while they
>> are increased in workers handlers on different lcores.
>>
>> Without synchronization occasionally showed invalid values.
>> This patch uses atomic acquire/release mechanisms to synchronize.
> In general, load-acquire and store-release memory orderings are required while synchronizing data (that cannot be updated atomically) between threads. In the situation, making counters atomic is enough.
>
>> Fixes: c3eabff124e6 ("distributor: add unit tests")
>> Cc: bruce.richardson@intel.com
>> Cc: stable@dpdk.org
>>
>> Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
>> Acked-by: David Hunt <david.hunt@intel.com>
>> ---
>>   app/test/test_distributor.c | 39 ++++++++++++++++++++++++-------------
>>   1 file changed, 26 insertions(+), 13 deletions(-)
>>
>> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index
>> 35b25463a..0e49e3714 100644
>> --- a/app/test/test_distributor.c
>> +++ b/app/test/test_distributor.c
>> @@ -43,7 +43,8 @@ total_packet_count(void)  {
>>   	unsigned i, count = 0;
>>   	for (i = 0; i < worker_idx; i++)
>> -		count += worker_stats[i].handled_packets;
>> +		count +=
>> __atomic_load_n(&worker_stats[i].handled_packets,
>> +				__ATOMIC_ACQUIRE);
> RELAXED memory order is sufficient. For ex: the worker threads are not 'releasing' any data that is not atomically updated to the main thread.
>
>>   	return count;
>>   }
>>
>> @@ -52,6 +53,7 @@ static inline void
>>   clear_packet_count(void)
>>   {
>>   	memset(&worker_stats, 0, sizeof(worker_stats));
>> +	rte_atomic_thread_fence(__ATOMIC_RELEASE);
> Ideally, the counters should be set to 0 atomically rather than using a memset.
>
>>   }
>>
>>   /* this is the basic worker function for sanity test @@ -72,13 +74,13 @@
>> handle_work(void *arg)
>>   	num = rte_distributor_get_pkt(db, id, buf, buf, num);
>>   	while (!quit) {
>>   		__atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> -				__ATOMIC_RELAXED);
>> +				__ATOMIC_ACQ_REL);
> Using the __ATOMIC_ACQ_REL order does not mean anything to the main thread. The main thread might still see the updates from different threads in different order.
>
>>   		count += num;
>>   		num = rte_distributor_get_pkt(db, id,
>>   				buf, buf, num);
>>   	}
>>   	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> -			__ATOMIC_RELAXED);
>> +			__ATOMIC_ACQ_REL);
> Same here, do not see why this change is required.
>
>>   	count += num;
>>   	rte_distributor_return_pkt(db, id, buf, num);
>>   	return 0;
>> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct
>> rte_mempool *p)
>>
>>   	for (i = 0; i < rte_lcore_count() - 1; i++)
>>   		printf("Worker %u handled %u packets\n", i,
>> -				worker_stats[i].handled_packets);
>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>> +					__ATOMIC_ACQUIRE));
> __ATOMIC_RELAXED is enough.
>
>>   	printf("Sanity test with all zero hashes done.\n");
>>
>>   	/* pick two flows and check they go correctly */ @@ -159,7 +162,9
>> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
>>
>>   		for (i = 0; i < rte_lcore_count() - 1; i++)
>>   			printf("Worker %u handled %u packets\n", i,
>> -					worker_stats[i].handled_packets);
>> +				__atomic_load_n(
>> +					&worker_stats[i].handled_packets,
>> +					__ATOMIC_ACQUIRE));
> __ATOMIC_RELAXED is enough
>
>>   		printf("Sanity test with two hash values done\n");
>>   	}
>>
>> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct
>> rte_mempool *p)
>>
>>   	for (i = 0; i < rte_lcore_count() - 1; i++)
>>   		printf("Worker %u handled %u packets\n", i,
>> -				worker_stats[i].handled_packets);
>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>> +					__ATOMIC_ACQUIRE));
> __ATOMIC_RELAXED is enough
>
>>   	printf("Sanity test with non-zero hashes done\n");
>>
>>   	rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15
>> +286,17 @@ handle_work_with_free_mbufs(void *arg)
>>   		buf[i] = NULL;
>>   	num = rte_distributor_get_pkt(d, id, buf, buf, num);
>>   	while (!quit) {
>> -		worker_stats[id].handled_packets += num;
>>   		count += num;
>> +		__atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> +				__ATOMIC_ACQ_REL);
> IMO, the problem would be the non-atomic update of the statistics. So, __ATOMIC_RELAXED is enough
>
>>   		for (i = 0; i < num; i++)
>>   			rte_pktmbuf_free(buf[i]);
>>   		num = rte_distributor_get_pkt(d,
>>   				id, buf, buf, num);
>>   	}
>> -	worker_stats[id].handled_packets += num;
>>   	count += num;
>> +	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> +			__ATOMIC_ACQ_REL);
> Same here, the problem is non-atomic update of the statistics, __ATOMIC_RELAXED is enough.
> Similarly, for changes below, __ATOMIC_RELAXED is enough.
>
>>   	rte_distributor_return_pkt(d, id, buf, num);
>>   	return 0;
>>   }
>> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg)
>>   	/* wait for quit single globally, or for worker zero, wait
>>   	 * for zero_quit */
>>   	while (!quit && !(id == zero_id && zero_quit)) {
>> -		worker_stats[id].handled_packets += num;
>>   		count += num;
>> +		__atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> +				__ATOMIC_ACQ_REL);
>>   		for (i = 0; i < num; i++)
>>   			rte_pktmbuf_free(buf[i]);
>>   		num = rte_distributor_get_pkt(d,
>> @@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg)
>>
>>   		total += num;
>>   	}
>> -	worker_stats[id].handled_packets += num;
>>   	count += num;
>>   	returned = rte_distributor_return_pkt(d, id, buf, num);
>>
>> +	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> +			__ATOMIC_ACQ_REL);
>>   	if (id == zero_id) {
>>   		/* for worker zero, allow it to restart to pick up last packet
>>   		 * when all workers are shutting down.
>> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg)
>>   				id, buf, buf, num);
>>
>>   		while (!quit) {
>> -			worker_stats[id].handled_packets += num;
>>   			count += num;
>>   			rte_pktmbuf_free(pkt);
>>   			num = rte_distributor_get_pkt(d, id, buf, buf, num);
>> +
>> 	__atomic_fetch_add(&worker_stats[id].handled_packets,
>> +					num, __ATOMIC_ACQ_REL);
>>   		}
>>   		returned = rte_distributor_return_pkt(d,
>>   				id, buf, num);
>> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct
>> worker_params *wp,
>>
>>   	for (i = 0; i < rte_lcore_count() - 1; i++)
>>   		printf("Worker %u handled %u packets\n", i,
>> -				worker_stats[i].handled_packets);
>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>> +					__ATOMIC_ACQUIRE));
>>
>>   	if (total_packet_count() != BURST * 2) {
>>   		printf("Line %d: Error, not all packets flushed. "
>> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct
>> worker_params *wp,
>>   	zero_quit = 0;
>>   	for (i = 0; i < rte_lcore_count() - 1; i++)
>>   		printf("Worker %u handled %u packets\n", i,
>> -				worker_stats[i].handled_packets);
>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>> +					__ATOMIC_ACQUIRE));
>>
>>   	if (total_packet_count() != BURST) {
>>   		printf("Line %d: Error, not all packets flushed. "
>> --
>> 2.17.1
Lukasz Wojciechowski Oct. 8, 2020, 8:47 p.m. UTC | #3
Hi Honappa,

I pushed v5 of the patches today.

However all I fixed in this patch is replacing memset to loop of 
operations on atomic. I didn't move clearing and checking the statistics 
out of the test yet (to make sure no worker is running).

After fixing few more distributor issues I wasn't able now to catch even 
a single failure using the ACQ/REL memory models.
I'll test it maybe tomorrow with RELAXED to see if it will work.

Best regards

Lukasz

W dniu 02.10.2020 o 13:25, Lukasz Wojciechowski pisze:
> Hi Honnappa,
>
> Many thanks for the review!
>
> I'll write my answers here not inline as it would be easier to read them
> in one place, I think.
> So first of all I agree with you in 2 things:
> 1) all uses of statistics must be atomic and lack of that caused most of
> the problems
> 2) it would be better to replace barrier and memset in
> clear_packet_count() with atomic stores as you suggested
>
> So I will apply both of above.
>
> However I wasn't not fully convinced on changing acquire/release to
> relaxed. It wood be perfectly ok
> if it would look like in this Herb Sutter's example:
> https://youtu.be/KeLBd2EJLOU?t=4170
> But in his case the counters are cleared before worker threads start and
> are printout after they are completed.
>
> In case of the dpdk distributor tests both worker and main cores are
> running at the same time. In the sanity_test, the statistics are cleared
> and verified few times for different hashes of packages. The worker
> cores are not stopped at this time and they continue their loops in
> handle procedure. Verification made in main core is an exchange of data
> as the current statistics indicate how the test will result.
>
> So as I wasn't convinced, I run some tests with both both relaxed and
> acquire/release modes and they both fail :(
> The failures caused by statistics errors to number of tests ratio for
> 200000 tests was:
> for relaxed: 0,000790562
> for acq/rel: 0,000091321
>
>
> That's why I'm going to modify tests in such way, that they would:
> 1) clear statistics
> 2) launch worker threads
> 3) run test
> 4) wait for workers procedures to complete
> 5) check stats, verify results and print them out
>
> This way worker main core will use (clear or verify) stats only when
> there are no worker threads. This would make things simpler and allowing
> to focus on testing the distributor not tests. And of course relaxed
> mode would be enough!
>
>
> Best regards
> Lukasz
>
>
> W dniu 29.09.2020 o 07:49, Honnappa Nagarahalli pisze:
>> <snip>
>>
>>> Statistics of handled packets are cleared and read on main lcore, while they
>>> are increased in workers handlers on different lcores.
>>>
>>> Without synchronization occasionally showed invalid values.
>>> This patch uses atomic acquire/release mechanisms to synchronize.
>> In general, load-acquire and store-release memory orderings are required while synchronizing data (that cannot be updated atomically) between threads. In the situation, making counters atomic is enough.
>>
>>> Fixes: c3eabff124e6 ("distributor: add unit tests")
>>> Cc: bruce.richardson@intel.com
>>> Cc: stable@dpdk.org
>>>
>>> Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
>>> Acked-by: David Hunt <david.hunt@intel.com>
>>> ---
>>>    app/test/test_distributor.c | 39 ++++++++++++++++++++++++-------------
>>>    1 file changed, 26 insertions(+), 13 deletions(-)
>>>
>>> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index
>>> 35b25463a..0e49e3714 100644
>>> --- a/app/test/test_distributor.c
>>> +++ b/app/test/test_distributor.c
>>> @@ -43,7 +43,8 @@ total_packet_count(void)  {
>>>    	unsigned i, count = 0;
>>>    	for (i = 0; i < worker_idx; i++)
>>> -		count += worker_stats[i].handled_packets;
>>> +		count +=
>>> __atomic_load_n(&worker_stats[i].handled_packets,
>>> +				__ATOMIC_ACQUIRE);
>> RELAXED memory order is sufficient. For ex: the worker threads are not 'releasing' any data that is not atomically updated to the main thread.
>>
>>>    	return count;
>>>    }
>>>
>>> @@ -52,6 +53,7 @@ static inline void
>>>    clear_packet_count(void)
>>>    {
>>>    	memset(&worker_stats, 0, sizeof(worker_stats));
>>> +	rte_atomic_thread_fence(__ATOMIC_RELEASE);
>> Ideally, the counters should be set to 0 atomically rather than using a memset.
>>
>>>    }
>>>
>>>    /* this is the basic worker function for sanity test @@ -72,13 +74,13 @@
>>> handle_work(void *arg)
>>>    	num = rte_distributor_get_pkt(db, id, buf, buf, num);
>>>    	while (!quit) {
>>>    		__atomic_fetch_add(&worker_stats[id].handled_packets,
>>> num,
>>> -				__ATOMIC_RELAXED);
>>> +				__ATOMIC_ACQ_REL);
>> Using the __ATOMIC_ACQ_REL order does not mean anything to the main thread. The main thread might still see the updates from different threads in different order.
>>
>>>    		count += num;
>>>    		num = rte_distributor_get_pkt(db, id,
>>>    				buf, buf, num);
>>>    	}
>>>    	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>>> -			__ATOMIC_RELAXED);
>>> +			__ATOMIC_ACQ_REL);
>> Same here, do not see why this change is required.
>>
>>>    	count += num;
>>>    	rte_distributor_return_pkt(db, id, buf, num);
>>>    	return 0;
>>> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct
>>> rte_mempool *p)
>>>
>>>    	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>    		printf("Worker %u handled %u packets\n", i,
>>> -				worker_stats[i].handled_packets);
>>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>>> +					__ATOMIC_ACQUIRE));
>> __ATOMIC_RELAXED is enough.
>>
>>>    	printf("Sanity test with all zero hashes done.\n");
>>>
>>>    	/* pick two flows and check they go correctly */ @@ -159,7 +162,9
>>> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
>>>
>>>    		for (i = 0; i < rte_lcore_count() - 1; i++)
>>>    			printf("Worker %u handled %u packets\n", i,
>>> -					worker_stats[i].handled_packets);
>>> +				__atomic_load_n(
>>> +					&worker_stats[i].handled_packets,
>>> +					__ATOMIC_ACQUIRE));
>> __ATOMIC_RELAXED is enough
>>
>>>    		printf("Sanity test with two hash values done\n");
>>>    	}
>>>
>>> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct
>>> rte_mempool *p)
>>>
>>>    	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>    		printf("Worker %u handled %u packets\n", i,
>>> -				worker_stats[i].handled_packets);
>>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>>> +					__ATOMIC_ACQUIRE));
>> __ATOMIC_RELAXED is enough
>>
>>>    	printf("Sanity test with non-zero hashes done\n");
>>>
>>>    	rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15
>>> +286,17 @@ handle_work_with_free_mbufs(void *arg)
>>>    		buf[i] = NULL;
>>>    	num = rte_distributor_get_pkt(d, id, buf, buf, num);
>>>    	while (!quit) {
>>> -		worker_stats[id].handled_packets += num;
>>>    		count += num;
>>> +		__atomic_fetch_add(&worker_stats[id].handled_packets,
>>> num,
>>> +				__ATOMIC_ACQ_REL);
>> IMO, the problem would be the non-atomic update of the statistics. So, __ATOMIC_RELAXED is enough
>>
>>>    		for (i = 0; i < num; i++)
>>>    			rte_pktmbuf_free(buf[i]);
>>>    		num = rte_distributor_get_pkt(d,
>>>    				id, buf, buf, num);
>>>    	}
>>> -	worker_stats[id].handled_packets += num;
>>>    	count += num;
>>> +	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>>> +			__ATOMIC_ACQ_REL);
>> Same here, the problem is non-atomic update of the statistics, __ATOMIC_RELAXED is enough.
>> Similarly, for changes below, __ATOMIC_RELAXED is enough.
>>
>>>    	rte_distributor_return_pkt(d, id, buf, num);
>>>    	return 0;
>>>    }
>>> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg)
>>>    	/* wait for quit single globally, or for worker zero, wait
>>>    	 * for zero_quit */
>>>    	while (!quit && !(id == zero_id && zero_quit)) {
>>> -		worker_stats[id].handled_packets += num;
>>>    		count += num;
>>> +		__atomic_fetch_add(&worker_stats[id].handled_packets,
>>> num,
>>> +				__ATOMIC_ACQ_REL);
>>>    		for (i = 0; i < num; i++)
>>>    			rte_pktmbuf_free(buf[i]);
>>>    		num = rte_distributor_get_pkt(d,
>>> @@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg)
>>>
>>>    		total += num;
>>>    	}
>>> -	worker_stats[id].handled_packets += num;
>>>    	count += num;
>>>    	returned = rte_distributor_return_pkt(d, id, buf, num);
>>>
>>> +	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>>> +			__ATOMIC_ACQ_REL);
>>>    	if (id == zero_id) {
>>>    		/* for worker zero, allow it to restart to pick up last packet
>>>    		 * when all workers are shutting down.
>>> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg)
>>>    				id, buf, buf, num);
>>>
>>>    		while (!quit) {
>>> -			worker_stats[id].handled_packets += num;
>>>    			count += num;
>>>    			rte_pktmbuf_free(pkt);
>>>    			num = rte_distributor_get_pkt(d, id, buf, buf, num);
>>> +
>>> 	__atomic_fetch_add(&worker_stats[id].handled_packets,
>>> +					num, __ATOMIC_ACQ_REL);
>>>    		}
>>>    		returned = rte_distributor_return_pkt(d,
>>>    				id, buf, num);
>>> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct
>>> worker_params *wp,
>>>
>>>    	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>    		printf("Worker %u handled %u packets\n", i,
>>> -				worker_stats[i].handled_packets);
>>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>>> +					__ATOMIC_ACQUIRE));
>>>
>>>    	if (total_packet_count() != BURST * 2) {
>>>    		printf("Line %d: Error, not all packets flushed. "
>>> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct
>>> worker_params *wp,
>>>    	zero_quit = 0;
>>>    	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>    		printf("Worker %u handled %u packets\n", i,
>>> -				worker_stats[i].handled_packets);
>>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>>> +					__ATOMIC_ACQUIRE));
>>>
>>>    	if (total_packet_count() != BURST) {
>>>    		printf("Line %d: Error, not all packets flushed. "
>>> --
>>> 2.17.1
Honnappa Nagarahalli Oct. 16, 2020, 5:43 a.m. UTC | #4
<snip>

> 
> Hi Honnappa,
> 
> Many thanks for the review!
> 
> I'll write my answers here not inline as it would be easier to read them in one
> place, I think.
> So first of all I agree with you in 2 things:
> 1) all uses of statistics must be atomic and lack of that caused most of the
> problems
> 2) it would be better to replace barrier and memset in
> clear_packet_count() with atomic stores as you suggested
> 
> So I will apply both of above.
> 
> However I wasn't not fully convinced on changing acquire/release to relaxed.
> It wood be perfectly ok if it would look like in this Herb Sutter's example:
> https://youtu.be/KeLBd2[]  EJLOU?t=4170
> But in his case the counters are cleared before worker threads start and are
> printout after they are completed.
> 
> In case of the dpdk distributor tests both worker and main cores are running
> at the same time. In the sanity_test, the statistics are cleared and verified
> few times for different hashes of packages. The worker cores are not
> stopped at this time and they continue their loops in handle procedure.
> Verification made in main core is an exchange of data as the current statistics
> indicate how the test will result.
Agree. The key point we have to note is that the data that is exchanged between the two threads is already atomic (handled_packets is atomic).

> 
> So as I wasn't convinced, I run some tests with both both relaxed and
> acquire/release modes and they both fail :( The failures caused by statistics
> errors to number of tests ratio for
> 200000 tests was:
> for relaxed: 0,000790562
> for acq/rel: 0,000091321
> 
> 
> That's why I'm going to modify tests in such way, that they would:
> 1) clear statistics
> 2) launch worker threads
> 3) run test
> 4) wait for workers procedures to complete
> 5) check stats, verify results and print them out
> 
> This way worker main core will use (clear or verify) stats only when there are
> no worker threads. This would make things simpler and allowing to focus on
> testing the distributor not tests. And of course relaxed mode would be
> enough!
Agree, this would be the only way to ensure that the main thread sees the correct statistics (just like in the video)

> 
> 
> Best regards
> Lukasz
> 
> 
> W dniu 29.09.2020 o 07:49, Honnappa Nagarahalli pisze:
> > <snip>
> >
> >> Statistics of handled packets are cleared and read on main lcore,
> >> while they are increased in workers handlers on different lcores.
> >>
> >> Without synchronization occasionally showed invalid values.
> >> This patch uses atomic acquire/release mechanisms to synchronize.
> > In general, load-acquire and store-release memory orderings are required
> while synchronizing data (that cannot be updated atomically) between
> threads. In the situation, making counters atomic is enough.
> >
> >> Fixes: c3eabff124e6 ("distributor: add unit tests")
> >> Cc: bruce.richardson@intel.com
> >> Cc: stable@dpdk.org
> >>
> >> Signed-off-by: Lukasz Wojciechowski
> >> <l.wojciechow@partner.samsung.com>
> >> Acked-by: David Hunt <david.hunt@intel.com>
> >> ---
> >>   app/test/test_distributor.c | 39 ++++++++++++++++++++++++-----------
> --
> >>   1 file changed, 26 insertions(+), 13 deletions(-)
> >>
> >> diff --git a/app/test/test_distributor.c
> >> b/app/test/test_distributor.c index
> >> 35b25463a..0e49e3714 100644
> >> --- a/app/test/test_distributor.c
> >> +++ b/app/test/test_distributor.c
> >> @@ -43,7 +43,8 @@ total_packet_count(void)  {
> >>   	unsigned i, count = 0;
> >>   	for (i = 0; i < worker_idx; i++)
> >> -		count += worker_stats[i].handled_packets;
> >> +		count +=
> >> __atomic_load_n(&worker_stats[i].handled_packets,
> >> +				__ATOMIC_ACQUIRE);
> > RELAXED memory order is sufficient. For ex: the worker threads are not
> 'releasing' any data that is not atomically updated to the main thread.
> >
> >>   	return count;
> >>   }
> >>
> >> @@ -52,6 +53,7 @@ static inline void
> >>   clear_packet_count(void)
> >>   {
> >>   	memset(&worker_stats, 0, sizeof(worker_stats));
> >> +	rte_atomic_thread_fence(__ATOMIC_RELEASE);
> > Ideally, the counters should be set to 0 atomically rather than using a
> memset.
> >
> >>   }
> >>
> >>   /* this is the basic worker function for sanity test @@ -72,13
> >> +74,13 @@ handle_work(void *arg)
> >>   	num = rte_distributor_get_pkt(db, id, buf, buf, num);
> >>   	while (!quit) {
> >>   		__atomic_fetch_add(&worker_stats[id].handled_packets,
> >> num,
> >> -				__ATOMIC_RELAXED);
> >> +				__ATOMIC_ACQ_REL);
> > Using the __ATOMIC_ACQ_REL order does not mean anything to the main
> thread. The main thread might still see the updates from different threads in
> different order.
> >
> >>   		count += num;
> >>   		num = rte_distributor_get_pkt(db, id,
> >>   				buf, buf, num);
> >>   	}
> >>   	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
> >> -			__ATOMIC_RELAXED);
> >> +			__ATOMIC_ACQ_REL);
> > Same here, do not see why this change is required.
> >
> >>   	count += num;
> >>   	rte_distributor_return_pkt(db, id, buf, num);
> >>   	return 0;
> >> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct
> >> rte_mempool *p)
> >>
> >>   	for (i = 0; i < rte_lcore_count() - 1; i++)
> >>   		printf("Worker %u handled %u packets\n", i,
> >> -				worker_stats[i].handled_packets);
> >> +			__atomic_load_n(&worker_stats[i].handled_packets,
> >> +					__ATOMIC_ACQUIRE));
> > __ATOMIC_RELAXED is enough.
> >
> >>   	printf("Sanity test with all zero hashes done.\n");
> >>
> >>   	/* pick two flows and check they go correctly */ @@ -159,7 +162,9
> >> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
> >>
> >>   		for (i = 0; i < rte_lcore_count() - 1; i++)
> >>   			printf("Worker %u handled %u packets\n", i,
> >> -					worker_stats[i].handled_packets);
> >> +				__atomic_load_n(
> >> +					&worker_stats[i].handled_packets,
> >> +					__ATOMIC_ACQUIRE));
> > __ATOMIC_RELAXED is enough
> >
> >>   		printf("Sanity test with two hash values done\n");
> >>   	}
> >>
> >> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct
> >> rte_mempool *p)
> >>
> >>   	for (i = 0; i < rte_lcore_count() - 1; i++)
> >>   		printf("Worker %u handled %u packets\n", i,
> >> -				worker_stats[i].handled_packets);
> >> +			__atomic_load_n(&worker_stats[i].handled_packets,
> >> +					__ATOMIC_ACQUIRE));
> > __ATOMIC_RELAXED is enough
> >
> >>   	printf("Sanity test with non-zero hashes done\n");
> >>
> >>   	rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15
> >> +286,17 @@ handle_work_with_free_mbufs(void *arg)
> >>   		buf[i] = NULL;
> >>   	num = rte_distributor_get_pkt(d, id, buf, buf, num);
> >>   	while (!quit) {
> >> -		worker_stats[id].handled_packets += num;
> >>   		count += num;
> >> +		__atomic_fetch_add(&worker_stats[id].handled_packets,
> >> num,
> >> +				__ATOMIC_ACQ_REL);
> > IMO, the problem would be the non-atomic update of the statistics. So,
> > __ATOMIC_RELAXED is enough
> >
> >>   		for (i = 0; i < num; i++)
> >>   			rte_pktmbuf_free(buf[i]);
> >>   		num = rte_distributor_get_pkt(d,
> >>   				id, buf, buf, num);
> >>   	}
> >> -	worker_stats[id].handled_packets += num;
> >>   	count += num;
> >> +	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
> >> +			__ATOMIC_ACQ_REL);
> > Same here, the problem is non-atomic update of the statistics,
> __ATOMIC_RELAXED is enough.
> > Similarly, for changes below, __ATOMIC_RELAXED is enough.
> >
> >>   	rte_distributor_return_pkt(d, id, buf, num);
> >>   	return 0;
> >>   }
> >> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg)
> >>   	/* wait for quit single globally, or for worker zero, wait
> >>   	 * for zero_quit */
> >>   	while (!quit && !(id == zero_id && zero_quit)) {
> >> -		worker_stats[id].handled_packets += num;
> >>   		count += num;
> >> +		__atomic_fetch_add(&worker_stats[id].handled_packets,
> >> num,
> >> +				__ATOMIC_ACQ_REL);
> >>   		for (i = 0; i < num; i++)
> >>   			rte_pktmbuf_free(buf[i]);
> >>   		num = rte_distributor_get_pkt(d,
> >> @@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg)
> >>
> >>   		total += num;
> >>   	}
> >> -	worker_stats[id].handled_packets += num;
> >>   	count += num;
> >>   	returned = rte_distributor_return_pkt(d, id, buf, num);
> >>
> >> +	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
> >> +			__ATOMIC_ACQ_REL);
> >>   	if (id == zero_id) {
> >>   		/* for worker zero, allow it to restart to pick up last packet
> >>   		 * when all workers are shutting down.
> >> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg)
> >>   				id, buf, buf, num);
> >>
> >>   		while (!quit) {
> >> -			worker_stats[id].handled_packets += num;
> >>   			count += num;
> >>   			rte_pktmbuf_free(pkt);
> >>   			num = rte_distributor_get_pkt(d, id, buf, buf, num);
> >> +
> >> 	__atomic_fetch_add(&worker_stats[id].handled_packets,
> >> +					num, __ATOMIC_ACQ_REL);
> >>   		}
> >>   		returned = rte_distributor_return_pkt(d,
> >>   				id, buf, num);
> >> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct
> >> worker_params *wp,
> >>
> >>   	for (i = 0; i < rte_lcore_count() - 1; i++)
> >>   		printf("Worker %u handled %u packets\n", i,
> >> -				worker_stats[i].handled_packets);
> >> +			__atomic_load_n(&worker_stats[i].handled_packets,
> >> +					__ATOMIC_ACQUIRE));
> >>
> >>   	if (total_packet_count() != BURST * 2) {
> >>   		printf("Line %d: Error, not all packets flushed. "
> >> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct
> >> worker_params *wp,
> >>   	zero_quit = 0;
> >>   	for (i = 0; i < rte_lcore_count() - 1; i++)
> >>   		printf("Worker %u handled %u packets\n", i,
> >> -				worker_stats[i].handled_packets);
> >> +			__atomic_load_n(&worker_stats[i].handled_packets,
> >> +					__ATOMIC_ACQUIRE));
> >>
> >>   	if (total_packet_count() != BURST) {
> >>   		printf("Line %d: Error, not all packets flushed. "
> >> --
> >> 2.17.1
> 
> --
> Lukasz Wojciechowski
> Principal Software Engineer
> 
> Samsung R&D Institute Poland
> Samsung Electronics
> Office +48 22 377 88 25
> l.wojciechow@partner.samsung.com
Lukasz Wojciechowski Oct. 16, 2020, 12:43 p.m. UTC | #5
Hi Honnappa,

Thank you for your answer.
In the current v7 version I followed your advise and used RELAXED memory model.
And it works without any issues. I guess after fixing other issues found since v4 the distributor works more stable.
I didn't have time to rearrange all tests in the way I proposed, but I guess if they work like this it's not a top priority.

Can you give an ack on the series? I believe David Marchand is waiting for your opinion to process it.

Best regards
Lukasz

W dniu 16.10.2020 o 07:43, Honnappa Nagarahalli pisze:
> <snip>
>
>> Hi Honnappa,
>>
>> Many thanks for the review!
>>
>> I'll write my answers here not inline as it would be easier to read them in one
>> place, I think.
>> So first of all I agree with you in 2 things:
>> 1) all uses of statistics must be atomic and lack of that caused most of the
>> problems
>> 2) it would be better to replace barrier and memset in
>> clear_packet_count() with atomic stores as you suggested
>>
>> So I will apply both of above.
>>
>> However I wasn't not fully convinced on changing acquire/release to relaxed.
>> It wood be perfectly ok if it would look like in this Herb Sutter's example:
>> https://youtu.be/KeLBd2[]  EJLOU?t=4170
>> But in his case the counters are cleared before worker threads start and are
>> printout after they are completed.
>>
>> In case of the dpdk distributor tests both worker and main cores are running
>> at the same time. In the sanity_test, the statistics are cleared and verified
>> few times for different hashes of packages. The worker cores are not
>> stopped at this time and they continue their loops in handle procedure.
>> Verification made in main core is an exchange of data as the current statistics
>> indicate how the test will result.
> Agree. The key point we have to note is that the data that is exchanged between the two threads is already atomic (handled_packets is atomic).
>
>> So as I wasn't convinced, I run some tests with both both relaxed and
>> acquire/release modes and they both fail :( The failures caused by statistics
>> errors to number of tests ratio for
>> 200000 tests was:
>> for relaxed: 0,000790562
>> for acq/rel: 0,000091321
>>
>>
>> That's why I'm going to modify tests in such way, that they would:
>> 1) clear statistics
>> 2) launch worker threads
>> 3) run test
>> 4) wait for workers procedures to complete
>> 5) check stats, verify results and print them out
>>
>> This way worker main core will use (clear or verify) stats only when there are
>> no worker threads. This would make things simpler and allowing to focus on
>> testing the distributor not tests. And of course relaxed mode would be
>> enough!
> Agree, this would be the only way to ensure that the main thread sees the correct statistics (just like in the video)
>
>>
>> Best regards
>> Lukasz
>>
>>
>> W dniu 29.09.2020 o 07:49, Honnappa Nagarahalli pisze:
>>> <snip>
>>>
>>>> Statistics of handled packets are cleared and read on main lcore,
>>>> while they are increased in workers handlers on different lcores.
>>>>
>>>> Without synchronization occasionally showed invalid values.
>>>> This patch uses atomic acquire/release mechanisms to synchronize.
>>> In general, load-acquire and store-release memory orderings are required
>> while synchronizing data (that cannot be updated atomically) between
>> threads. In the situation, making counters atomic is enough.
>>>> Fixes: c3eabff124e6 ("distributor: add unit tests")
>>>> Cc: bruce.richardson@intel.com
>>>> Cc: stable@dpdk.org
>>>>
>>>> Signed-off-by: Lukasz Wojciechowski
>>>> <l.wojciechow@partner.samsung.com>
>>>> Acked-by: David Hunt <david.hunt@intel.com>
>>>> ---
>>>>    app/test/test_distributor.c | 39 ++++++++++++++++++++++++-----------
>> --
>>>>    1 file changed, 26 insertions(+), 13 deletions(-)
>>>>
>>>> diff --git a/app/test/test_distributor.c
>>>> b/app/test/test_distributor.c index
>>>> 35b25463a..0e49e3714 100644
>>>> --- a/app/test/test_distributor.c
>>>> +++ b/app/test/test_distributor.c
>>>> @@ -43,7 +43,8 @@ total_packet_count(void)  {
>>>>    	unsigned i, count = 0;
>>>>    	for (i = 0; i < worker_idx; i++)
>>>> -		count += worker_stats[i].handled_packets;
>>>> +		count +=
>>>> __atomic_load_n(&worker_stats[i].handled_packets,
>>>> +				__ATOMIC_ACQUIRE);
>>> RELAXED memory order is sufficient. For ex: the worker threads are not
>> 'releasing' any data that is not atomically updated to the main thread.
>>>>    	return count;
>>>>    }
>>>>
>>>> @@ -52,6 +53,7 @@ static inline void
>>>>    clear_packet_count(void)
>>>>    {
>>>>    	memset(&worker_stats, 0, sizeof(worker_stats));
>>>> +	rte_atomic_thread_fence(__ATOMIC_RELEASE);
>>> Ideally, the counters should be set to 0 atomically rather than using a
>> memset.
>>>>    }
>>>>
>>>>    /* this is the basic worker function for sanity test @@ -72,13
>>>> +74,13 @@ handle_work(void *arg)
>>>>    	num = rte_distributor_get_pkt(db, id, buf, buf, num);
>>>>    	while (!quit) {
>>>>    		__atomic_fetch_add(&worker_stats[id].handled_packets,
>>>> num,
>>>> -				__ATOMIC_RELAXED);
>>>> +				__ATOMIC_ACQ_REL);
>>> Using the __ATOMIC_ACQ_REL order does not mean anything to the main
>> thread. The main thread might still see the updates from different threads in
>> different order.
>>>>    		count += num;
>>>>    		num = rte_distributor_get_pkt(db, id,
>>>>    				buf, buf, num);
>>>>    	}
>>>>    	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>>>> -			__ATOMIC_RELAXED);
>>>> +			__ATOMIC_ACQ_REL);
>>> Same here, do not see why this change is required.
>>>
>>>>    	count += num;
>>>>    	rte_distributor_return_pkt(db, id, buf, num);
>>>>    	return 0;
>>>> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct
>>>> rte_mempool *p)
>>>>
>>>>    	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>>    		printf("Worker %u handled %u packets\n", i,
>>>> -				worker_stats[i].handled_packets);
>>>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>>>> +					__ATOMIC_ACQUIRE));
>>> __ATOMIC_RELAXED is enough.
>>>
>>>>    	printf("Sanity test with all zero hashes done.\n");
>>>>
>>>>    	/* pick two flows and check they go correctly */ @@ -159,7 +162,9
>>>> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
>>>>
>>>>    		for (i = 0; i < rte_lcore_count() - 1; i++)
>>>>    			printf("Worker %u handled %u packets\n", i,
>>>> -					worker_stats[i].handled_packets);
>>>> +				__atomic_load_n(
>>>> +					&worker_stats[i].handled_packets,
>>>> +					__ATOMIC_ACQUIRE));
>>> __ATOMIC_RELAXED is enough
>>>
>>>>    		printf("Sanity test with two hash values done\n");
>>>>    	}
>>>>
>>>> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct
>>>> rte_mempool *p)
>>>>
>>>>    	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>>    		printf("Worker %u handled %u packets\n", i,
>>>> -				worker_stats[i].handled_packets);
>>>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>>>> +					__ATOMIC_ACQUIRE));
>>> __ATOMIC_RELAXED is enough
>>>
>>>>    	printf("Sanity test with non-zero hashes done\n");
>>>>
>>>>    	rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15
>>>> +286,17 @@ handle_work_with_free_mbufs(void *arg)
>>>>    		buf[i] = NULL;
>>>>    	num = rte_distributor_get_pkt(d, id, buf, buf, num);
>>>>    	while (!quit) {
>>>> -		worker_stats[id].handled_packets += num;
>>>>    		count += num;
>>>> +		__atomic_fetch_add(&worker_stats[id].handled_packets,
>>>> num,
>>>> +				__ATOMIC_ACQ_REL);
>>> IMO, the problem would be the non-atomic update of the statistics. So,
>>> __ATOMIC_RELAXED is enough
>>>
>>>>    		for (i = 0; i < num; i++)
>>>>    			rte_pktmbuf_free(buf[i]);
>>>>    		num = rte_distributor_get_pkt(d,
>>>>    				id, buf, buf, num);
>>>>    	}
>>>> -	worker_stats[id].handled_packets += num;
>>>>    	count += num;
>>>> +	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>>>> +			__ATOMIC_ACQ_REL);
>>> Same here, the problem is non-atomic update of the statistics,
>> __ATOMIC_RELAXED is enough.
>>> Similarly, for changes below, __ATOMIC_RELAXED is enough.
>>>
>>>>    	rte_distributor_return_pkt(d, id, buf, num);
>>>>    	return 0;
>>>>    }
>>>> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg)
>>>>    	/* wait for quit single globally, or for worker zero, wait
>>>>    	 * for zero_quit */
>>>>    	while (!quit && !(id == zero_id && zero_quit)) {
>>>> -		worker_stats[id].handled_packets += num;
>>>>    		count += num;
>>>> +		__atomic_fetch_add(&worker_stats[id].handled_packets,
>>>> num,
>>>> +				__ATOMIC_ACQ_REL);
>>>>    		for (i = 0; i < num; i++)
>>>>    			rte_pktmbuf_free(buf[i]);
>>>>    		num = rte_distributor_get_pkt(d,
>>>> @@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg)
>>>>
>>>>    		total += num;
>>>>    	}
>>>> -	worker_stats[id].handled_packets += num;
>>>>    	count += num;
>>>>    	returned = rte_distributor_return_pkt(d, id, buf, num);
>>>>
>>>> +	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>>>> +			__ATOMIC_ACQ_REL);
>>>>    	if (id == zero_id) {
>>>>    		/* for worker zero, allow it to restart to pick up last packet
>>>>    		 * when all workers are shutting down.
>>>> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg)
>>>>    				id, buf, buf, num);
>>>>
>>>>    		while (!quit) {
>>>> -			worker_stats[id].handled_packets += num;
>>>>    			count += num;
>>>>    			rte_pktmbuf_free(pkt);
>>>>    			num = rte_distributor_get_pkt(d, id, buf, buf, num);
>>>> +
>>>> 	__atomic_fetch_add(&worker_stats[id].handled_packets,
>>>> +					num, __ATOMIC_ACQ_REL);
>>>>    		}
>>>>    		returned = rte_distributor_return_pkt(d,
>>>>    				id, buf, num);
>>>> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct
>>>> worker_params *wp,
>>>>
>>>>    	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>>    		printf("Worker %u handled %u packets\n", i,
>>>> -				worker_stats[i].handled_packets);
>>>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>>>> +					__ATOMIC_ACQUIRE));
>>>>
>>>>    	if (total_packet_count() != BURST * 2) {
>>>>    		printf("Line %d: Error, not all packets flushed. "
>>>> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct
>>>> worker_params *wp,
>>>>    	zero_quit = 0;
>>>>    	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>>    		printf("Worker %u handled %u packets\n", i,
>>>> -				worker_stats[i].handled_packets);
>>>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>>>> +					__ATOMIC_ACQUIRE));
>>>>
>>>>    	if (total_packet_count() != BURST) {
>>>>    		printf("Line %d: Error, not all packets flushed. "
>>>> --
>>>> 2.17.1
>> --
>> Lukasz Wojciechowski
>> Principal Software Engineer
>>
>> Samsung R&D Institute Poland
>> Samsung Electronics
>> Office +48 22 377 88 25
>> l.wojciechow@partner.samsung.com
Lukasz Wojciechowski Oct. 16, 2020, 12:58 p.m. UTC | #6
W dniu 16.10.2020 o 14:43, Lukasz Wojciechowski pisze:
> Hi Honnappa,
>
> Thank you for your answer.
> In the current v7 version I followed your advise and used RELAXED memory model.
> And it works without any issues. I guess after fixing other issues found since v4 the distributor works more stable.
> I didn't have time to rearrange all tests in the way I proposed, but I guess if they work like this it's not a top priority.
>
> Can you give an ack on the series? I believe David Marchand is waiting for your opinion to process it.
I'm sorry I didn't see your other comments. I'll try to fix them them today.
>
> Best regards
> Lukasz
>
> W dniu 16.10.2020 o 07:43, Honnappa Nagarahalli pisze:
>> <snip>
>>
>>> Hi Honnappa,
>>>
>>> Many thanks for the review!
>>>
>>> I'll write my answers here not inline as it would be easier to read them in one
>>> place, I think.
>>> So first of all I agree with you in 2 things:
>>> 1) all uses of statistics must be atomic and lack of that caused most of the
>>> problems
>>> 2) it would be better to replace barrier and memset in
>>> clear_packet_count() with atomic stores as you suggested
>>>
>>> So I will apply both of above.
>>>
>>> However I wasn't not fully convinced on changing acquire/release to relaxed.
>>> It wood be perfectly ok if it would look like in this Herb Sutter's example:
>>> https://youtu.be/KeLBd2[]  EJLOU?t=4170
>>> But in his case the counters are cleared before worker threads start and are
>>> printout after they are completed.
>>>
>>> In case of the dpdk distributor tests both worker and main cores are running
>>> at the same time. In the sanity_test, the statistics are cleared and verified
>>> few times for different hashes of packages. The worker cores are not
>>> stopped at this time and they continue their loops in handle procedure.
>>> Verification made in main core is an exchange of data as the current statistics
>>> indicate how the test will result.
>> Agree. The key point we have to note is that the data that is exchanged between the two threads is already atomic (handled_packets is atomic).
>>
>>> So as I wasn't convinced, I run some tests with both both relaxed and
>>> acquire/release modes and they both fail :( The failures caused by statistics
>>> errors to number of tests ratio for
>>> 200000 tests was:
>>> for relaxed: 0,000790562
>>> for acq/rel: 0,000091321
>>>
>>>
>>> That's why I'm going to modify tests in such way, that they would:
>>> 1) clear statistics
>>> 2) launch worker threads
>>> 3) run test
>>> 4) wait for workers procedures to complete
>>> 5) check stats, verify results and print them out
>>>
>>> This way worker main core will use (clear or verify) stats only when there are
>>> no worker threads. This would make things simpler and allowing to focus on
>>> testing the distributor not tests. And of course relaxed mode would be
>>> enough!
>> Agree, this would be the only way to ensure that the main thread sees the correct statistics (just like in the video)
>>
>>> Best regards
>>> Lukasz
>>>
>>>
>>> W dniu 29.09.2020 o 07:49, Honnappa Nagarahalli pisze:
>>>> <snip>
>>>>
>>>>> Statistics of handled packets are cleared and read on main lcore,
>>>>> while they are increased in workers handlers on different lcores.
>>>>>
>>>>> Without synchronization occasionally showed invalid values.
>>>>> This patch uses atomic acquire/release mechanisms to synchronize.
>>>> In general, load-acquire and store-release memory orderings are required
>>> while synchronizing data (that cannot be updated atomically) between
>>> threads. In the situation, making counters atomic is enough.
>>>>> Fixes: c3eabff124e6 ("distributor: add unit tests")
>>>>> Cc: bruce.richardson@intel.com
>>>>> Cc: stable@dpdk.org
>>>>>
>>>>> Signed-off-by: Lukasz Wojciechowski
>>>>> <l.wojciechow@partner.samsung.com>
>>>>> Acked-by: David Hunt <david.hunt@intel.com>
>>>>> ---
>>>>>     app/test/test_distributor.c | 39 ++++++++++++++++++++++++-----------
>>> --
>>>>>     1 file changed, 26 insertions(+), 13 deletions(-)
>>>>>
>>>>> diff --git a/app/test/test_distributor.c
>>>>> b/app/test/test_distributor.c index
>>>>> 35b25463a..0e49e3714 100644
>>>>> --- a/app/test/test_distributor.c
>>>>> +++ b/app/test/test_distributor.c
>>>>> @@ -43,7 +43,8 @@ total_packet_count(void)  {
>>>>>     	unsigned i, count = 0;
>>>>>     	for (i = 0; i < worker_idx; i++)
>>>>> -		count += worker_stats[i].handled_packets;
>>>>> +		count +=
>>>>> __atomic_load_n(&worker_stats[i].handled_packets,
>>>>> +				__ATOMIC_ACQUIRE);
>>>> RELAXED memory order is sufficient. For ex: the worker threads are not
>>> 'releasing' any data that is not atomically updated to the main thread.
>>>>>     	return count;
>>>>>     }
>>>>>
>>>>> @@ -52,6 +53,7 @@ static inline void
>>>>>     clear_packet_count(void)
>>>>>     {
>>>>>     	memset(&worker_stats, 0, sizeof(worker_stats));
>>>>> +	rte_atomic_thread_fence(__ATOMIC_RELEASE);
>>>> Ideally, the counters should be set to 0 atomically rather than using a
>>> memset.
>>>>>     }
>>>>>
>>>>>     /* this is the basic worker function for sanity test @@ -72,13
>>>>> +74,13 @@ handle_work(void *arg)
>>>>>     	num = rte_distributor_get_pkt(db, id, buf, buf, num);
>>>>>     	while (!quit) {
>>>>>     		__atomic_fetch_add(&worker_stats[id].handled_packets,
>>>>> num,
>>>>> -				__ATOMIC_RELAXED);
>>>>> +				__ATOMIC_ACQ_REL);
>>>> Using the __ATOMIC_ACQ_REL order does not mean anything to the main
>>> thread. The main thread might still see the updates from different threads in
>>> different order.
>>>>>     		count += num;
>>>>>     		num = rte_distributor_get_pkt(db, id,
>>>>>     				buf, buf, num);
>>>>>     	}
>>>>>     	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>>>>> -			__ATOMIC_RELAXED);
>>>>> +			__ATOMIC_ACQ_REL);
>>>> Same here, do not see why this change is required.
>>>>
>>>>>     	count += num;
>>>>>     	rte_distributor_return_pkt(db, id, buf, num);
>>>>>     	return 0;
>>>>> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct
>>>>> rte_mempool *p)
>>>>>
>>>>>     	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>>>     		printf("Worker %u handled %u packets\n", i,
>>>>> -				worker_stats[i].handled_packets);
>>>>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>>>>> +					__ATOMIC_ACQUIRE));
>>>> __ATOMIC_RELAXED is enough.
>>>>
>>>>>     	printf("Sanity test with all zero hashes done.\n");
>>>>>
>>>>>     	/* pick two flows and check they go correctly */ @@ -159,7 +162,9
>>>>> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
>>>>>
>>>>>     		for (i = 0; i < rte_lcore_count() - 1; i++)
>>>>>     			printf("Worker %u handled %u packets\n", i,
>>>>> -					worker_stats[i].handled_packets);
>>>>> +				__atomic_load_n(
>>>>> +					&worker_stats[i].handled_packets,
>>>>> +					__ATOMIC_ACQUIRE));
>>>> __ATOMIC_RELAXED is enough
>>>>
>>>>>     		printf("Sanity test with two hash values done\n");
>>>>>     	}
>>>>>
>>>>> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct
>>>>> rte_mempool *p)
>>>>>
>>>>>     	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>>>     		printf("Worker %u handled %u packets\n", i,
>>>>> -				worker_stats[i].handled_packets);
>>>>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>>>>> +					__ATOMIC_ACQUIRE));
>>>> __ATOMIC_RELAXED is enough
>>>>
>>>>>     	printf("Sanity test with non-zero hashes done\n");
>>>>>
>>>>>     	rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15
>>>>> +286,17 @@ handle_work_with_free_mbufs(void *arg)
>>>>>     		buf[i] = NULL;
>>>>>     	num = rte_distributor_get_pkt(d, id, buf, buf, num);
>>>>>     	while (!quit) {
>>>>> -		worker_stats[id].handled_packets += num;
>>>>>     		count += num;
>>>>> +		__atomic_fetch_add(&worker_stats[id].handled_packets,
>>>>> num,
>>>>> +				__ATOMIC_ACQ_REL);
>>>> IMO, the problem would be the non-atomic update of the statistics. So,
>>>> __ATOMIC_RELAXED is enough
>>>>
>>>>>     		for (i = 0; i < num; i++)
>>>>>     			rte_pktmbuf_free(buf[i]);
>>>>>     		num = rte_distributor_get_pkt(d,
>>>>>     				id, buf, buf, num);
>>>>>     	}
>>>>> -	worker_stats[id].handled_packets += num;
>>>>>     	count += num;
>>>>> +	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>>>>> +			__ATOMIC_ACQ_REL);
>>>> Same here, the problem is non-atomic update of the statistics,
>>> __ATOMIC_RELAXED is enough.
>>>> Similarly, for changes below, __ATOMIC_RELAXED is enough.
>>>>
>>>>>     	rte_distributor_return_pkt(d, id, buf, num);
>>>>>     	return 0;
>>>>>     }
>>>>> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg)
>>>>>     	/* wait for quit single globally, or for worker zero, wait
>>>>>     	 * for zero_quit */
>>>>>     	while (!quit && !(id == zero_id && zero_quit)) {
>>>>> -		worker_stats[id].handled_packets += num;
>>>>>     		count += num;
>>>>> +		__atomic_fetch_add(&worker_stats[id].handled_packets,
>>>>> num,
>>>>> +				__ATOMIC_ACQ_REL);
>>>>>     		for (i = 0; i < num; i++)
>>>>>     			rte_pktmbuf_free(buf[i]);
>>>>>     		num = rte_distributor_get_pkt(d,
>>>>> @@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg)
>>>>>
>>>>>     		total += num;
>>>>>     	}
>>>>> -	worker_stats[id].handled_packets += num;
>>>>>     	count += num;
>>>>>     	returned = rte_distributor_return_pkt(d, id, buf, num);
>>>>>
>>>>> +	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>>>>> +			__ATOMIC_ACQ_REL);
>>>>>     	if (id == zero_id) {
>>>>>     		/* for worker zero, allow it to restart to pick up last packet
>>>>>     		 * when all workers are shutting down.
>>>>> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg)
>>>>>     				id, buf, buf, num);
>>>>>
>>>>>     		while (!quit) {
>>>>> -			worker_stats[id].handled_packets += num;
>>>>>     			count += num;
>>>>>     			rte_pktmbuf_free(pkt);
>>>>>     			num = rte_distributor_get_pkt(d, id, buf, buf, num);
>>>>> +
>>>>> 	__atomic_fetch_add(&worker_stats[id].handled_packets,
>>>>> +					num, __ATOMIC_ACQ_REL);
>>>>>     		}
>>>>>     		returned = rte_distributor_return_pkt(d,
>>>>>     				id, buf, num);
>>>>> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct
>>>>> worker_params *wp,
>>>>>
>>>>>     	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>>>     		printf("Worker %u handled %u packets\n", i,
>>>>> -				worker_stats[i].handled_packets);
>>>>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>>>>> +					__ATOMIC_ACQUIRE));
>>>>>
>>>>>     	if (total_packet_count() != BURST * 2) {
>>>>>     		printf("Line %d: Error, not all packets flushed. "
>>>>> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct
>>>>> worker_params *wp,
>>>>>     	zero_quit = 0;
>>>>>     	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>>>     		printf("Worker %u handled %u packets\n", i,
>>>>> -				worker_stats[i].handled_packets);
>>>>> +			__atomic_load_n(&worker_stats[i].handled_packets,
>>>>> +					__ATOMIC_ACQUIRE));
>>>>>
>>>>>     	if (total_packet_count() != BURST) {
>>>>>     		printf("Line %d: Error, not all packets flushed. "
>>>>> --
>>>>> 2.17.1
>>> --
>>> Lukasz Wojciechowski
>>> Principal Software Engineer
>>>
>>> Samsung R&D Institute Poland
>>> Samsung Electronics
>>> Office +48 22 377 88 25
>>> l.wojciechow@partner.samsung.com
Honnappa Nagarahalli Oct. 16, 2020, 3:42 p.m. UTC | #7
<snip>

> 
> W dniu 16.10.2020 o 14:43, Lukasz Wojciechowski pisze:
> > Hi Honnappa,
> >
> > Thank you for your answer.
> > In the current v7 version I followed your advise and used RELAXED memory
> model.
> > And it works without any issues. I guess after fixing other issues found
> since v4 the distributor works more stable.
> > I didn't have time to rearrange all tests in the way I proposed, but I guess if
> they work like this it's not a top priority.
Agree, not a top priority.

> >
> > Can you give an ack on the series? I believe David Marchand is waiting for
> your opinion to process it.
> I'm sorry I didn't see your other comments. I'll try to fix them them today.
No problem, I can review the next series quickly.

> >
> > Best regards
> > Lukasz
> >
> > W dniu 16.10.2020 o 07:43, Honnappa Nagarahalli pisze:
> >> <snip>
> >>
> >>> Hi Honnappa,
> >>>
> >>> Many thanks for the review!
> >>>
> >>> I'll write my answers here not inline as it would be easier to read
> >>> them in one place, I think.
> >>> So first of all I agree with you in 2 things:
> >>> 1) all uses of statistics must be atomic and lack of that caused
> >>> most of the problems
> >>> 2) it would be better to replace barrier and memset in
> >>> clear_packet_count() with atomic stores as you suggested
> >>>
> >>> So I will apply both of above.
> >>>
> >>> However I wasn't not fully convinced on changing acquire/release to
> relaxed.
> >>> It wood be perfectly ok if it would look like in this Herb Sutter's example:
> >>> https://youtu.be/KeLBd2[]  EJLOU?t=4170 But in his case the counters
> >>> are cleared before worker threads start and are printout after they
> >>> are completed.
> >>>
> >>> In case of the dpdk distributor tests both worker and main cores are
> >>> running at the same time. In the sanity_test, the statistics are
> >>> cleared and verified few times for different hashes of packages. The
> >>> worker cores are not stopped at this time and they continue their loops
> in handle procedure.
> >>> Verification made in main core is an exchange of data as the current
> >>> statistics indicate how the test will result.
> >> Agree. The key point we have to note is that the data that is exchanged
> between the two threads is already atomic (handled_packets is atomic).
> >>
> >>> So as I wasn't convinced, I run some tests with both both relaxed
> >>> and acquire/release modes and they both fail :( The failures caused
> >>> by statistics errors to number of tests ratio for
> >>> 200000 tests was:
> >>> for relaxed: 0,000790562
> >>> for acq/rel: 0,000091321
> >>>
> >>>
> >>> That's why I'm going to modify tests in such way, that they would:
> >>> 1) clear statistics
> >>> 2) launch worker threads
> >>> 3) run test
> >>> 4) wait for workers procedures to complete
> >>> 5) check stats, verify results and print them out
> >>>
> >>> This way worker main core will use (clear or verify) stats only when
> >>> there are no worker threads. This would make things simpler and
> >>> allowing to focus on testing the distributor not tests. And of
> >>> course relaxed mode would be enough!
> >> Agree, this would be the only way to ensure that the main thread sees
> >> the correct statistics (just like in the video)
> >>
> >>> Best regards
> >>> Lukasz
> >>>
> >>>
> >>> W dniu 29.09.2020 o 07:49, Honnappa Nagarahalli pisze:
> >>>> <snip>
> >>>>
> >>>>> Statistics of handled packets are cleared and read on main lcore,
> >>>>> while they are increased in workers handlers on different lcores.
> >>>>>
> >>>>> Without synchronization occasionally showed invalid values.
> >>>>> This patch uses atomic acquire/release mechanisms to synchronize.
> >>>> In general, load-acquire and store-release memory orderings are
> >>>> required
> >>> while synchronizing data (that cannot be updated atomically) between
> >>> threads. In the situation, making counters atomic is enough.
> >>>>> Fixes: c3eabff124e6 ("distributor: add unit tests")
> >>>>> Cc: bruce.richardson@intel.com
> >>>>> Cc: stable@dpdk.org
> >>>>>
> >>>>> Signed-off-by: Lukasz Wojciechowski
> >>>>> <l.wojciechow@partner.samsung.com>
> >>>>> Acked-by: David Hunt <david.hunt@intel.com>
> >>>>> ---
> >>>>>     app/test/test_distributor.c | 39
> >>>>> ++++++++++++++++++++++++-----------
> >>> --
> >>>>>     1 file changed, 26 insertions(+), 13 deletions(-)
> >>>>>
> >>>>> diff --git a/app/test/test_distributor.c
> >>>>> b/app/test/test_distributor.c index
> >>>>> 35b25463a..0e49e3714 100644
> >>>>> --- a/app/test/test_distributor.c
> >>>>> +++ b/app/test/test_distributor.c
> >>>>> @@ -43,7 +43,8 @@ total_packet_count(void)  {
> >>>>>     	unsigned i, count = 0;
> >>>>>     	for (i = 0; i < worker_idx; i++)
> >>>>> -		count += worker_stats[i].handled_packets;
> >>>>> +		count +=
> >>>>> __atomic_load_n(&worker_stats[i].handled_packets,
> >>>>> +				__ATOMIC_ACQUIRE);
> >>>> RELAXED memory order is sufficient. For ex: the worker threads are
> >>>> not
> >>> 'releasing' any data that is not atomically updated to the main thread.
> >>>>>     	return count;
> >>>>>     }
> >>>>>
> >>>>> @@ -52,6 +53,7 @@ static inline void
> >>>>>     clear_packet_count(void)
> >>>>>     {
> >>>>>     	memset(&worker_stats, 0, sizeof(worker_stats));
> >>>>> +	rte_atomic_thread_fence(__ATOMIC_RELEASE);
> >>>> Ideally, the counters should be set to 0 atomically rather than
> >>>> using a
> >>> memset.
> >>>>>     }
> >>>>>
> >>>>>     /* this is the basic worker function for sanity test @@ -72,13
> >>>>> +74,13 @@ handle_work(void *arg)
> >>>>>     	num = rte_distributor_get_pkt(db, id, buf, buf, num);
> >>>>>     	while (!quit) {
> >>>>>
> 	__atomic_fetch_add(&worker_stats[id].handled_packets,
> >>>>> num,
> >>>>> -				__ATOMIC_RELAXED);
> >>>>> +				__ATOMIC_ACQ_REL);
> >>>> Using the __ATOMIC_ACQ_REL order does not mean anything to the
> main
> >>> thread. The main thread might still see the updates from different
> >>> threads in different order.
> >>>>>     		count += num;
> >>>>>     		num = rte_distributor_get_pkt(db, id,
> >>>>>     				buf, buf, num);
> >>>>>     	}
> >>>>>     	__atomic_fetch_add(&worker_stats[id].handled_packets,
> num,
> >>>>> -			__ATOMIC_RELAXED);
> >>>>> +			__ATOMIC_ACQ_REL);
> >>>> Same here, do not see why this change is required.
> >>>>
> >>>>>     	count += num;
> >>>>>     	rte_distributor_return_pkt(db, id, buf, num);
> >>>>>     	return 0;
> >>>>> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct
> >>>>> rte_mempool *p)
> >>>>>
> >>>>>     	for (i = 0; i < rte_lcore_count() - 1; i++)
> >>>>>     		printf("Worker %u handled %u packets\n", i,
> >>>>> -				worker_stats[i].handled_packets);
> >>>>> +
> 	__atomic_load_n(&worker_stats[i].handled_packets,
> >>>>> +					__ATOMIC_ACQUIRE));
> >>>> __ATOMIC_RELAXED is enough.
> >>>>
> >>>>>     	printf("Sanity test with all zero hashes done.\n");
> >>>>>
> >>>>>     	/* pick two flows and check they go correctly */ @@ -159,7
> >>>>> +162,9 @@ sanity_test(struct worker_params *wp, struct
> rte_mempool
> >>>>> *p)
> >>>>>
> >>>>>     		for (i = 0; i < rte_lcore_count() - 1; i++)
> >>>>>     			printf("Worker %u handled %u packets\n", i,
> >>>>> -
> 	worker_stats[i].handled_packets);
> >>>>> +				__atomic_load_n(
> >>>>> +
> 	&worker_stats[i].handled_packets,
> >>>>> +					__ATOMIC_ACQUIRE));
> >>>> __ATOMIC_RELAXED is enough
> >>>>
> >>>>>     		printf("Sanity test with two hash values done\n");
> >>>>>     	}
> >>>>>
> >>>>> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct
> >>>>> rte_mempool *p)
> >>>>>
> >>>>>     	for (i = 0; i < rte_lcore_count() - 1; i++)
> >>>>>     		printf("Worker %u handled %u packets\n", i,
> >>>>> -				worker_stats[i].handled_packets);
> >>>>> +
> 	__atomic_load_n(&worker_stats[i].handled_packets,
> >>>>> +					__ATOMIC_ACQUIRE));
> >>>> __ATOMIC_RELAXED is enough
> >>>>
> >>>>>     	printf("Sanity test with non-zero hashes done\n");
> >>>>>
> >>>>>     	rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15
> >>>>> +286,17 @@ handle_work_with_free_mbufs(void *arg)
> >>>>>     		buf[i] = NULL;
> >>>>>     	num = rte_distributor_get_pkt(d, id, buf, buf, num);
> >>>>>     	while (!quit) {
> >>>>> -		worker_stats[id].handled_packets += num;
> >>>>>     		count += num;
> >>>>> +
> 	__atomic_fetch_add(&worker_stats[id].handled_packets,
> >>>>> num,
> >>>>> +				__ATOMIC_ACQ_REL);
> >>>> IMO, the problem would be the non-atomic update of the statistics.
> >>>> So, __ATOMIC_RELAXED is enough
> >>>>
> >>>>>     		for (i = 0; i < num; i++)
> >>>>>     			rte_pktmbuf_free(buf[i]);
> >>>>>     		num = rte_distributor_get_pkt(d,
> >>>>>     				id, buf, buf, num);
> >>>>>     	}
> >>>>> -	worker_stats[id].handled_packets += num;
> >>>>>     	count += num;
> >>>>> +	__atomic_fetch_add(&worker_stats[id].handled_packets,
> num,
> >>>>> +			__ATOMIC_ACQ_REL);
> >>>> Same here, the problem is non-atomic update of the statistics,
> >>> __ATOMIC_RELAXED is enough.
> >>>> Similarly, for changes below, __ATOMIC_RELAXED is enough.
> >>>>
> >>>>>     	rte_distributor_return_pkt(d, id, buf, num);
> >>>>>     	return 0;
> >>>>>     }
> >>>>> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg)
> >>>>>     	/* wait for quit single globally, or for worker zero, wait
> >>>>>     	 * for zero_quit */
> >>>>>     	while (!quit && !(id == zero_id && zero_quit)) {
> >>>>> -		worker_stats[id].handled_packets += num;
> >>>>>     		count += num;
> >>>>> +
> 	__atomic_fetch_add(&worker_stats[id].handled_packets,
> >>>>> num,
> >>>>> +				__ATOMIC_ACQ_REL);
> >>>>>     		for (i = 0; i < num; i++)
> >>>>>     			rte_pktmbuf_free(buf[i]);
> >>>>>     		num = rte_distributor_get_pkt(d, @@ -379,10
> +388,11 @@
> >>>>> handle_work_for_shutdown_test(void *arg)
> >>>>>
> >>>>>     		total += num;
> >>>>>     	}
> >>>>> -	worker_stats[id].handled_packets += num;
> >>>>>     	count += num;
> >>>>>     	returned = rte_distributor_return_pkt(d, id, buf, num);
> >>>>>
> >>>>> +	__atomic_fetch_add(&worker_stats[id].handled_packets,
> num,
> >>>>> +			__ATOMIC_ACQ_REL);
> >>>>>     	if (id == zero_id) {
> >>>>>     		/* for worker zero, allow it to restart to pick up last
> packet
> >>>>>     		 * when all workers are shutting down.
> >>>>> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg)
> >>>>>     				id, buf, buf, num);
> >>>>>
> >>>>>     		while (!quit) {
> >>>>> -			worker_stats[id].handled_packets += num;
> >>>>>     			count += num;
> >>>>>     			rte_pktmbuf_free(pkt);
> >>>>>     			num = rte_distributor_get_pkt(d, id, buf, buf,
> num);
> >>>>> +
> >>>>> 	__atomic_fetch_add(&worker_stats[id].handled_packets,
> >>>>> +					num, __ATOMIC_ACQ_REL);
> >>>>>     		}
> >>>>>     		returned = rte_distributor_return_pkt(d,
> >>>>>     				id, buf, num);
> >>>>> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct
> >>>>> worker_params *wp,
> >>>>>
> >>>>>     	for (i = 0; i < rte_lcore_count() - 1; i++)
> >>>>>     		printf("Worker %u handled %u packets\n", i,
> >>>>> -				worker_stats[i].handled_packets);
> >>>>> +
> 	__atomic_load_n(&worker_stats[i].handled_packets,
> >>>>> +					__ATOMIC_ACQUIRE));
> >>>>>
> >>>>>     	if (total_packet_count() != BURST * 2) {
> >>>>>     		printf("Line %d: Error, not all packets flushed. "
> >>>>> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct
> >>>>> worker_params *wp,
> >>>>>     	zero_quit = 0;
> >>>>>     	for (i = 0; i < rte_lcore_count() - 1; i++)
> >>>>>     		printf("Worker %u handled %u packets\n", i,
> >>>>> -				worker_stats[i].handled_packets);
> >>>>> +
> 	__atomic_load_n(&worker_stats[i].handled_packets,
> >>>>> +					__ATOMIC_ACQUIRE));
> >>>>>
> >>>>>     	if (total_packet_count() != BURST) {
> >>>>>     		printf("Line %d: Error, not all packets flushed. "
> >>>>> --
> >>>>> 2.17.1
> >>> --
> >>> Lukasz Wojciechowski
> >>> Principal Software Engineer
> >>>
> >>> Samsung R&D Institute Poland
> >>> Samsung Electronics
> >>> Office +48 22 377 88 25
> >>> l.wojciechow@partner.samsung.com
> 
> --
> Lukasz Wojciechowski
> Principal Software Engineer
> 
> Samsung R&D Institute Poland
> Samsung Electronics
> Office +48 22 377 88 25
> l.wojciechow@partner.samsung.com
Lukasz Wojciechowski Oct. 17, 2020, 3:34 a.m. UTC | #8
W dniu 16.10.2020 o 17:42, Honnappa Nagarahalli pisze:
> <snip>
>
>> W dniu 16.10.2020 o 14:43, Lukasz Wojciechowski pisze:
>>> Hi Honnappa,
>>>
>>> Thank you for your answer.
>>> In the current v7 version I followed your advise and used RELAXED memory
>> model.
>>> And it works without any issues. I guess after fixing other issues found
>> since v4 the distributor works more stable.
>>> I didn't have time to rearrange all tests in the way I proposed, but I guess if
>> they work like this it's not a top priority.
> Agree, not a top priority.
>
>>> Can you give an ack on the series? I believe David Marchand is waiting for
>> your opinion to process it.
>> I'm sorry I didn't see your other comments. I'll try to fix them them today.
> No problem, I can review the next series quickly.
So it's already there, the v8.
but you have one patch more to look at :)
I was ready to submit new version, but I run some tests ... and one of 
executions hanged, so I attached myself with gdb and found one issue more.
>
>>> Best regards
>>> Lukasz
>>>
>>> W dniu 16.10.2020 o 07:43, Honnappa Nagarahalli pisze:
>>>> <snip>
>>>>
>>>>> Hi Honnappa,
>>>>>
>>>>> Many thanks for the review!
>>>>>
>>>>> I'll write my answers here not inline as it would be easier to read
>>>>> them in one place, I think.
>>>>> So first of all I agree with you in 2 things:
>>>>> 1) all uses of statistics must be atomic and lack of that caused
>>>>> most of the problems
>>>>> 2) it would be better to replace barrier and memset in
>>>>> clear_packet_count() with atomic stores as you suggested
>>>>>
>>>>> So I will apply both of above.
>>>>>
>>>>> However I wasn't not fully convinced on changing acquire/release to
>> relaxed.
>>>>> It wood be perfectly ok if it would look like in this Herb Sutter's example:
>>>>> https://youtu.be/KeLBd2[]  EJLOU?t=4170 But in his case the counters
>>>>> are cleared before worker threads start and are printout after they
>>>>> are completed.
>>>>>
>>>>> In case of the dpdk distributor tests both worker and main cores are
>>>>> running at the same time. In the sanity_test, the statistics are
>>>>> cleared and verified few times for different hashes of packages. The
>>>>> worker cores are not stopped at this time and they continue their loops
>> in handle procedure.
>>>>> Verification made in main core is an exchange of data as the current
>>>>> statistics indicate how the test will result.
>>>> Agree. The key point we have to note is that the data that is exchanged
>> between the two threads is already atomic (handled_packets is atomic).
>>>>> So as I wasn't convinced, I run some tests with both both relaxed
>>>>> and acquire/release modes and they both fail :( The failures caused
>>>>> by statistics errors to number of tests ratio for
>>>>> 200000 tests was:
>>>>> for relaxed: 0,000790562
>>>>> for acq/rel: 0,000091321
>>>>>
>>>>>
>>>>> That's why I'm going to modify tests in such way, that they would:
>>>>> 1) clear statistics
>>>>> 2) launch worker threads
>>>>> 3) run test
>>>>> 4) wait for workers procedures to complete
>>>>> 5) check stats, verify results and print them out
>>>>>
>>>>> This way worker main core will use (clear or verify) stats only when
>>>>> there are no worker threads. This would make things simpler and
>>>>> allowing to focus on testing the distributor not tests. And of
>>>>> course relaxed mode would be enough!
>>>> Agree, this would be the only way to ensure that the main thread sees
>>>> the correct statistics (just like in the video)
>>>>
>>>>> Best regards
>>>>> Lukasz
>>>>>
>>>>>
>>>>> W dniu 29.09.2020 o 07:49, Honnappa Nagarahalli pisze:
>>>>>> <snip>
>>>>>>
>>>>>>> Statistics of handled packets are cleared and read on main lcore,
>>>>>>> while they are increased in workers handlers on different lcores.
>>>>>>>
>>>>>>> Without synchronization occasionally showed invalid values.
>>>>>>> This patch uses atomic acquire/release mechanisms to synchronize.
>>>>>> In general, load-acquire and store-release memory orderings are
>>>>>> required
>>>>> while synchronizing data (that cannot be updated atomically) between
>>>>> threads. In the situation, making counters atomic is enough.
>>>>>>> Fixes: c3eabff124e6 ("distributor: add unit tests")
>>>>>>> Cc: bruce.richardson@intel.com
>>>>>>> Cc: stable@dpdk.org
>>>>>>>
>>>>>>> Signed-off-by: Lukasz Wojciechowski
>>>>>>> <l.wojciechow@partner.samsung.com>
>>>>>>> Acked-by: David Hunt <david.hunt@intel.com>
>>>>>>> ---
>>>>>>>      app/test/test_distributor.c | 39
>>>>>>> ++++++++++++++++++++++++-----------
>>>>> --
>>>>>>>      1 file changed, 26 insertions(+), 13 deletions(-)
>>>>>>>
>>>>>>> diff --git a/app/test/test_distributor.c
>>>>>>> b/app/test/test_distributor.c index
>>>>>>> 35b25463a..0e49e3714 100644
>>>>>>> --- a/app/test/test_distributor.c
>>>>>>> +++ b/app/test/test_distributor.c
>>>>>>> @@ -43,7 +43,8 @@ total_packet_count(void)  {
>>>>>>>      	unsigned i, count = 0;
>>>>>>>      	for (i = 0; i < worker_idx; i++)
>>>>>>> -		count += worker_stats[i].handled_packets;
>>>>>>> +		count +=
>>>>>>> __atomic_load_n(&worker_stats[i].handled_packets,
>>>>>>> +				__ATOMIC_ACQUIRE);
>>>>>> RELAXED memory order is sufficient. For ex: the worker threads are
>>>>>> not
>>>>> 'releasing' any data that is not atomically updated to the main thread.
>>>>>>>      	return count;
>>>>>>>      }
>>>>>>>
>>>>>>> @@ -52,6 +53,7 @@ static inline void
>>>>>>>      clear_packet_count(void)
>>>>>>>      {
>>>>>>>      	memset(&worker_stats, 0, sizeof(worker_stats));
>>>>>>> +	rte_atomic_thread_fence(__ATOMIC_RELEASE);
>>>>>> Ideally, the counters should be set to 0 atomically rather than
>>>>>> using a
>>>>> memset.
>>>>>>>      }
>>>>>>>
>>>>>>>      /* this is the basic worker function for sanity test @@ -72,13
>>>>>>> +74,13 @@ handle_work(void *arg)
>>>>>>>      	num = rte_distributor_get_pkt(db, id, buf, buf, num);
>>>>>>>      	while (!quit) {
>>>>>>>
>> 	__atomic_fetch_add(&worker_stats[id].handled_packets,
>>>>>>> num,
>>>>>>> -				__ATOMIC_RELAXED);
>>>>>>> +				__ATOMIC_ACQ_REL);
>>>>>> Using the __ATOMIC_ACQ_REL order does not mean anything to the
>> main
>>>>> thread. The main thread might still see the updates from different
>>>>> threads in different order.
>>>>>>>      		count += num;
>>>>>>>      		num = rte_distributor_get_pkt(db, id,
>>>>>>>      				buf, buf, num);
>>>>>>>      	}
>>>>>>>      	__atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>>>>>>> -			__ATOMIC_RELAXED);
>>>>>>> +			__ATOMIC_ACQ_REL);
>>>>>> Same here, do not see why this change is required.
>>>>>>
>>>>>>>      	count += num;
>>>>>>>      	rte_distributor_return_pkt(db, id, buf, num);
>>>>>>>      	return 0;
>>>>>>> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct
>>>>>>> rte_mempool *p)
>>>>>>>
>>>>>>>      	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>>>>>      		printf("Worker %u handled %u packets\n", i,
>>>>>>> -				worker_stats[i].handled_packets);
>>>>>>> +
>> 	__atomic_load_n(&worker_stats[i].handled_packets,
>>>>>>> +					__ATOMIC_ACQUIRE));
>>>>>> __ATOMIC_RELAXED is enough.
>>>>>>
>>>>>>>      	printf("Sanity test with all zero hashes done.\n");
>>>>>>>
>>>>>>>      	/* pick two flows and check they go correctly */ @@ -159,7
>>>>>>> +162,9 @@ sanity_test(struct worker_params *wp, struct
>> rte_mempool
>>>>>>> *p)
>>>>>>>
>>>>>>>      		for (i = 0; i < rte_lcore_count() - 1; i++)
>>>>>>>      			printf("Worker %u handled %u packets\n", i,
>>>>>>> -
>> 	worker_stats[i].handled_packets);
>>>>>>> +				__atomic_load_n(
>>>>>>> +
>> 	&worker_stats[i].handled_packets,
>>>>>>> +					__ATOMIC_ACQUIRE));
>>>>>> __ATOMIC_RELAXED is enough
>>>>>>
>>>>>>>      		printf("Sanity test with two hash values done\n");
>>>>>>>      	}
>>>>>>>
>>>>>>> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct
>>>>>>> rte_mempool *p)
>>>>>>>
>>>>>>>      	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>>>>>      		printf("Worker %u handled %u packets\n", i,
>>>>>>> -				worker_stats[i].handled_packets);
>>>>>>> +
>> 	__atomic_load_n(&worker_stats[i].handled_packets,
>>>>>>> +					__ATOMIC_ACQUIRE));
>>>>>> __ATOMIC_RELAXED is enough
>>>>>>
>>>>>>>      	printf("Sanity test with non-zero hashes done\n");
>>>>>>>
>>>>>>>      	rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15
>>>>>>> +286,17 @@ handle_work_with_free_mbufs(void *arg)
>>>>>>>      		buf[i] = NULL;
>>>>>>>      	num = rte_distributor_get_pkt(d, id, buf, buf, num);
>>>>>>>      	while (!quit) {
>>>>>>> -		worker_stats[id].handled_packets += num;
>>>>>>>      		count += num;
>>>>>>> +
>> 	__atomic_fetch_add(&worker_stats[id].handled_packets,
>>>>>>> num,
>>>>>>> +				__ATOMIC_ACQ_REL);
>>>>>> IMO, the problem would be the non-atomic update of the statistics.
>>>>>> So, __ATOMIC_RELAXED is enough
>>>>>>
>>>>>>>      		for (i = 0; i < num; i++)
>>>>>>>      			rte_pktmbuf_free(buf[i]);
>>>>>>>      		num = rte_distributor_get_pkt(d,
>>>>>>>      				id, buf, buf, num);
>>>>>>>      	}
>>>>>>> -	worker_stats[id].handled_packets += num;
>>>>>>>      	count += num;
>>>>>>> +	__atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>>>>>>> +			__ATOMIC_ACQ_REL);
>>>>>> Same here, the problem is non-atomic update of the statistics,
>>>>> __ATOMIC_RELAXED is enough.
>>>>>> Similarly, for changes below, __ATOMIC_RELAXED is enough.
>>>>>>
>>>>>>>      	rte_distributor_return_pkt(d, id, buf, num);
>>>>>>>      	return 0;
>>>>>>>      }
>>>>>>> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg)
>>>>>>>      	/* wait for quit single globally, or for worker zero, wait
>>>>>>>      	 * for zero_quit */
>>>>>>>      	while (!quit && !(id == zero_id && zero_quit)) {
>>>>>>> -		worker_stats[id].handled_packets += num;
>>>>>>>      		count += num;
>>>>>>> +
>> 	__atomic_fetch_add(&worker_stats[id].handled_packets,
>>>>>>> num,
>>>>>>> +				__ATOMIC_ACQ_REL);
>>>>>>>      		for (i = 0; i < num; i++)
>>>>>>>      			rte_pktmbuf_free(buf[i]);
>>>>>>>      		num = rte_distributor_get_pkt(d, @@ -379,10
>> +388,11 @@
>>>>>>> handle_work_for_shutdown_test(void *arg)
>>>>>>>
>>>>>>>      		total += num;
>>>>>>>      	}
>>>>>>> -	worker_stats[id].handled_packets += num;
>>>>>>>      	count += num;
>>>>>>>      	returned = rte_distributor_return_pkt(d, id, buf, num);
>>>>>>>
>>>>>>> +	__atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>>>>>>> +			__ATOMIC_ACQ_REL);
>>>>>>>      	if (id == zero_id) {
>>>>>>>      		/* for worker zero, allow it to restart to pick up last
>> packet
>>>>>>>      		 * when all workers are shutting down.
>>>>>>> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg)
>>>>>>>      				id, buf, buf, num);
>>>>>>>
>>>>>>>      		while (!quit) {
>>>>>>> -			worker_stats[id].handled_packets += num;
>>>>>>>      			count += num;
>>>>>>>      			rte_pktmbuf_free(pkt);
>>>>>>>      			num = rte_distributor_get_pkt(d, id, buf, buf,
>> num);
>>>>>>> +
>>>>>>> 	__atomic_fetch_add(&worker_stats[id].handled_packets,
>>>>>>> +					num, __ATOMIC_ACQ_REL);
>>>>>>>      		}
>>>>>>>      		returned = rte_distributor_return_pkt(d,
>>>>>>>      				id, buf, num);
>>>>>>> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct
>>>>>>> worker_params *wp,
>>>>>>>
>>>>>>>      	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>>>>>      		printf("Worker %u handled %u packets\n", i,
>>>>>>> -				worker_stats[i].handled_packets);
>>>>>>> +
>> 	__atomic_load_n(&worker_stats[i].handled_packets,
>>>>>>> +					__ATOMIC_ACQUIRE));
>>>>>>>
>>>>>>>      	if (total_packet_count() != BURST * 2) {
>>>>>>>      		printf("Line %d: Error, not all packets flushed. "
>>>>>>> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct
>>>>>>> worker_params *wp,
>>>>>>>      	zero_quit = 0;
>>>>>>>      	for (i = 0; i < rte_lcore_count() - 1; i++)
>>>>>>>      		printf("Worker %u handled %u packets\n", i,
>>>>>>> -				worker_stats[i].handled_packets);
>>>>>>> +
>> 	__atomic_load_n(&worker_stats[i].handled_packets,
>>>>>>> +					__ATOMIC_ACQUIRE));
>>>>>>>
>>>>>>>      	if (total_packet_count() != BURST) {
>>>>>>>      		printf("Line %d: Error, not all packets flushed. "
>>>>>>> --
>>>>>>> 2.17.1
>>>>> --
>>>>> Lukasz Wojciechowski
>>>>> Principal Software Engineer
>>>>>
>>>>> Samsung R&D Institute Poland
>>>>> Samsung Electronics
>>>>> Office +48 22 377 88 25
>>>>> l.wojciechow@partner.samsung.com
>> --
>> Lukasz Wojciechowski
>> Principal Software Engineer
>>
>> Samsung R&D Institute Poland
>> Samsung Electronics
>> Office +48 22 377 88 25
>> l.wojciechow@partner.samsung.com

Patch
diff mbox series

diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
index 35b25463a..0e49e3714 100644
--- a/app/test/test_distributor.c
+++ b/app/test/test_distributor.c
@@ -43,7 +43,8 @@  total_packet_count(void)
 {
 	unsigned i, count = 0;
 	for (i = 0; i < worker_idx; i++)
-		count += worker_stats[i].handled_packets;
+		count += __atomic_load_n(&worker_stats[i].handled_packets,
+				__ATOMIC_ACQUIRE);
 	return count;
 }
 
@@ -52,6 +53,7 @@  static inline void
 clear_packet_count(void)
 {
 	memset(&worker_stats, 0, sizeof(worker_stats));
+	rte_atomic_thread_fence(__ATOMIC_RELEASE);
 }
 
 /* this is the basic worker function for sanity test
@@ -72,13 +74,13 @@  handle_work(void *arg)
 	num = rte_distributor_get_pkt(db, id, buf, buf, num);
 	while (!quit) {
 		__atomic_fetch_add(&worker_stats[id].handled_packets, num,
-				__ATOMIC_RELAXED);
+				__ATOMIC_ACQ_REL);
 		count += num;
 		num = rte_distributor_get_pkt(db, id,
 				buf, buf, num);
 	}
 	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
-			__ATOMIC_RELAXED);
+			__ATOMIC_ACQ_REL);
 	count += num;
 	rte_distributor_return_pkt(db, id, buf, num);
 	return 0;
@@ -134,7 +136,8 @@  sanity_test(struct worker_params *wp, struct rte_mempool *p)
 
 	for (i = 0; i < rte_lcore_count() - 1; i++)
 		printf("Worker %u handled %u packets\n", i,
-				worker_stats[i].handled_packets);
+			__atomic_load_n(&worker_stats[i].handled_packets,
+					__ATOMIC_ACQUIRE));
 	printf("Sanity test with all zero hashes done.\n");
 
 	/* pick two flows and check they go correctly */
@@ -159,7 +162,9 @@  sanity_test(struct worker_params *wp, struct rte_mempool *p)
 
 		for (i = 0; i < rte_lcore_count() - 1; i++)
 			printf("Worker %u handled %u packets\n", i,
-					worker_stats[i].handled_packets);
+				__atomic_load_n(
+					&worker_stats[i].handled_packets,
+					__ATOMIC_ACQUIRE));
 		printf("Sanity test with two hash values done\n");
 	}
 
@@ -185,7 +190,8 @@  sanity_test(struct worker_params *wp, struct rte_mempool *p)
 
 	for (i = 0; i < rte_lcore_count() - 1; i++)
 		printf("Worker %u handled %u packets\n", i,
-				worker_stats[i].handled_packets);
+			__atomic_load_n(&worker_stats[i].handled_packets,
+					__ATOMIC_ACQUIRE));
 	printf("Sanity test with non-zero hashes done\n");
 
 	rte_mempool_put_bulk(p, (void *)bufs, BURST);
@@ -280,15 +286,17 @@  handle_work_with_free_mbufs(void *arg)
 		buf[i] = NULL;
 	num = rte_distributor_get_pkt(d, id, buf, buf, num);
 	while (!quit) {
-		worker_stats[id].handled_packets += num;
 		count += num;
+		__atomic_fetch_add(&worker_stats[id].handled_packets, num,
+				__ATOMIC_ACQ_REL);
 		for (i = 0; i < num; i++)
 			rte_pktmbuf_free(buf[i]);
 		num = rte_distributor_get_pkt(d,
 				id, buf, buf, num);
 	}
-	worker_stats[id].handled_packets += num;
 	count += num;
+	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
+			__ATOMIC_ACQ_REL);
 	rte_distributor_return_pkt(d, id, buf, num);
 	return 0;
 }
@@ -363,8 +371,9 @@  handle_work_for_shutdown_test(void *arg)
 	/* wait for quit single globally, or for worker zero, wait
 	 * for zero_quit */
 	while (!quit && !(id == zero_id && zero_quit)) {
-		worker_stats[id].handled_packets += num;
 		count += num;
+		__atomic_fetch_add(&worker_stats[id].handled_packets, num,
+				__ATOMIC_ACQ_REL);
 		for (i = 0; i < num; i++)
 			rte_pktmbuf_free(buf[i]);
 		num = rte_distributor_get_pkt(d,
@@ -379,10 +388,11 @@  handle_work_for_shutdown_test(void *arg)
 
 		total += num;
 	}
-	worker_stats[id].handled_packets += num;
 	count += num;
 	returned = rte_distributor_return_pkt(d, id, buf, num);
 
+	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
+			__ATOMIC_ACQ_REL);
 	if (id == zero_id) {
 		/* for worker zero, allow it to restart to pick up last packet
 		 * when all workers are shutting down.
@@ -394,10 +404,11 @@  handle_work_for_shutdown_test(void *arg)
 				id, buf, buf, num);
 
 		while (!quit) {
-			worker_stats[id].handled_packets += num;
 			count += num;
 			rte_pktmbuf_free(pkt);
 			num = rte_distributor_get_pkt(d, id, buf, buf, num);
+			__atomic_fetch_add(&worker_stats[id].handled_packets,
+					num, __ATOMIC_ACQ_REL);
 		}
 		returned = rte_distributor_return_pkt(d,
 				id, buf, num);
@@ -461,7 +472,8 @@  sanity_test_with_worker_shutdown(struct worker_params *wp,
 
 	for (i = 0; i < rte_lcore_count() - 1; i++)
 		printf("Worker %u handled %u packets\n", i,
-				worker_stats[i].handled_packets);
+			__atomic_load_n(&worker_stats[i].handled_packets,
+					__ATOMIC_ACQUIRE));
 
 	if (total_packet_count() != BURST * 2) {
 		printf("Line %d: Error, not all packets flushed. "
@@ -514,7 +526,8 @@  test_flush_with_worker_shutdown(struct worker_params *wp,
 	zero_quit = 0;
 	for (i = 0; i < rte_lcore_count() - 1; i++)
 		printf("Worker %u handled %u packets\n", i,
-				worker_stats[i].handled_packets);
+			__atomic_load_n(&worker_stats[i].handled_packets,
+					__ATOMIC_ACQUIRE));
 
 	if (total_packet_count() != BURST) {
 		printf("Line %d: Error, not all packets flushed. "