[v1,2/6] app/test: synchronize statistics between lcores

Message ID 20200915193449.13310-3-l.wojciechow@partner.samsung.com (mailing list archive)
State Superseded, archived
Delegated to: David Marchand
Headers
Series fix distributor synchronization issues |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Lukasz Wojciechowski Sept. 15, 2020, 7:34 p.m. UTC
  Statistics of handled packets are cleared and read on main lcore,
while they are increased in workers handlers on different lcores.

Without synchronization occasionally showed invalid values.
This patch uses atomic acquire/release mechanisms to synchronize.

Fixes: c3eabff124e6 ("distributor: add unit tests")
Cc: bruce.richardson@intel.com
Cc: stable@dpdk.org

Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
---
 app/test/test_distributor.c | 39 ++++++++++++++++++++++++-------------
 1 file changed, 26 insertions(+), 13 deletions(-)
  

Comments

Hunt, David Sept. 17, 2020, 11:50 a.m. UTC | #1
On 15/9/2020 8:34 PM, Lukasz Wojciechowski wrote:
> Statistics of handled packets are cleared and read on main lcore,
> while they are increased in workers handlers on different lcores.
>
> Without synchronization occasionally showed invalid values.
> This patch uses atomic acquire/release mechanisms to synchronize.
>
> Fixes: c3eabff124e6 ("distributor: add unit tests")
> Cc: bruce.richardson@intel.com
> Cc: stable@dpdk.org
>
> Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
> ---
>   app/test/test_distributor.c | 39 ++++++++++++++++++++++++-------------
>   1 file changed, 26 insertions(+), 13 deletions(-)
>
> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
> index 35b25463a..0e49e3714 100644
> --- a/app/test/test_distributor.c
> +++ b/app/test/test_distributor.c
> @@ -43,7 +43,8 @@ total_packet_count(void)
>   {
>   	unsigned i, count = 0;
>   	for (i = 0; i < worker_idx; i++)
> -		count += worker_stats[i].handled_packets;
> +		count += __atomic_load_n(&worker_stats[i].handled_packets,
> +				__ATOMIC_ACQUIRE);
>   	return count;
>   }
>   
> @@ -52,6 +53,7 @@ static inline void
>   clear_packet_count(void)
>   {
>   	memset(&worker_stats, 0, sizeof(worker_stats));
> +	rte_atomic_thread_fence(__ATOMIC_RELEASE);
>   }
>   
>   /* this is the basic worker function for sanity test
> @@ -72,13 +74,13 @@ handle_work(void *arg)
>   	num = rte_distributor_get_pkt(db, id, buf, buf, num);
>   	while (!quit) {
>   		__atomic_fetch_add(&worker_stats[id].handled_packets, num,
> -				__ATOMIC_RELAXED);
> +				__ATOMIC_ACQ_REL);
>   		count += num;
>   		num = rte_distributor_get_pkt(db, id,
>   				buf, buf, num);
>   	}
>   	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
> -			__ATOMIC_RELAXED);
> +			__ATOMIC_ACQ_REL);
>   	count += num;
>   	rte_distributor_return_pkt(db, id, buf, num);
>   	return 0;
> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
>   
>   	for (i = 0; i < rte_lcore_count() - 1; i++)
>   		printf("Worker %u handled %u packets\n", i,
> -				worker_stats[i].handled_packets);
> +			__atomic_load_n(&worker_stats[i].handled_packets,
> +					__ATOMIC_ACQUIRE));
>   	printf("Sanity test with all zero hashes done.\n");
>   
>   	/* pick two flows and check they go correctly */
> @@ -159,7 +162,9 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
>   
>   		for (i = 0; i < rte_lcore_count() - 1; i++)
>   			printf("Worker %u handled %u packets\n", i,
> -					worker_stats[i].handled_packets);
> +				__atomic_load_n(
> +					&worker_stats[i].handled_packets,
> +					__ATOMIC_ACQUIRE));
>   		printf("Sanity test with two hash values done\n");
>   	}
>   
> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
>   
>   	for (i = 0; i < rte_lcore_count() - 1; i++)
>   		printf("Worker %u handled %u packets\n", i,
> -				worker_stats[i].handled_packets);
> +			__atomic_load_n(&worker_stats[i].handled_packets,
> +					__ATOMIC_ACQUIRE));
>   	printf("Sanity test with non-zero hashes done\n");
>   
>   	rte_mempool_put_bulk(p, (void *)bufs, BURST);
> @@ -280,15 +286,17 @@ handle_work_with_free_mbufs(void *arg)
>   		buf[i] = NULL;
>   	num = rte_distributor_get_pkt(d, id, buf, buf, num);
>   	while (!quit) {
> -		worker_stats[id].handled_packets += num;
>   		count += num;
> +		__atomic_fetch_add(&worker_stats[id].handled_packets, num,
> +				__ATOMIC_ACQ_REL);
>   		for (i = 0; i < num; i++)
>   			rte_pktmbuf_free(buf[i]);
>   		num = rte_distributor_get_pkt(d,
>   				id, buf, buf, num);
>   	}
> -	worker_stats[id].handled_packets += num;
>   	count += num;
> +	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
> +			__ATOMIC_ACQ_REL);
>   	rte_distributor_return_pkt(d, id, buf, num);
>   	return 0;
>   }
> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg)
>   	/* wait for quit single globally, or for worker zero, wait
>   	 * for zero_quit */
>   	while (!quit && !(id == zero_id && zero_quit)) {
> -		worker_stats[id].handled_packets += num;
>   		count += num;
> +		__atomic_fetch_add(&worker_stats[id].handled_packets, num,
> +				__ATOMIC_ACQ_REL);
>   		for (i = 0; i < num; i++)
>   			rte_pktmbuf_free(buf[i]);
>   		num = rte_distributor_get_pkt(d,
> @@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg)
>   
>   		total += num;
>   	}
> -	worker_stats[id].handled_packets += num;
>   	count += num;
>   	returned = rte_distributor_return_pkt(d, id, buf, num);
>   
> +	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
> +			__ATOMIC_ACQ_REL);
>   	if (id == zero_id) {
>   		/* for worker zero, allow it to restart to pick up last packet
>   		 * when all workers are shutting down.
> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg)
>   				id, buf, buf, num);
>   
>   		while (!quit) {
> -			worker_stats[id].handled_packets += num;
>   			count += num;
>   			rte_pktmbuf_free(pkt);
>   			num = rte_distributor_get_pkt(d, id, buf, buf, num);
> +			__atomic_fetch_add(&worker_stats[id].handled_packets,
> +					num, __ATOMIC_ACQ_REL);
>   		}
>   		returned = rte_distributor_return_pkt(d,
>   				id, buf, num);
> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct worker_params *wp,
>   
>   	for (i = 0; i < rte_lcore_count() - 1; i++)
>   		printf("Worker %u handled %u packets\n", i,
> -				worker_stats[i].handled_packets);
> +			__atomic_load_n(&worker_stats[i].handled_packets,
> +					__ATOMIC_ACQUIRE));
>   
>   	if (total_packet_count() != BURST * 2) {
>   		printf("Line %d: Error, not all packets flushed. "
> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct worker_params *wp,
>   	zero_quit = 0;
>   	for (i = 0; i < rte_lcore_count() - 1; i++)
>   		printf("Worker %u handled %u packets\n", i,
> -				worker_stats[i].handled_packets);
> +			__atomic_load_n(&worker_stats[i].handled_packets,
> +					__ATOMIC_ACQUIRE));
>   
>   	if (total_packet_count() != BURST) {
>   		printf("Line %d: Error, not all packets flushed. "


Thanks.

Acked-by: David Hunt <david.hunt@intel.com>
  

Patch

diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
index 35b25463a..0e49e3714 100644
--- a/app/test/test_distributor.c
+++ b/app/test/test_distributor.c
@@ -43,7 +43,8 @@  total_packet_count(void)
 {
 	unsigned i, count = 0;
 	for (i = 0; i < worker_idx; i++)
-		count += worker_stats[i].handled_packets;
+		count += __atomic_load_n(&worker_stats[i].handled_packets,
+				__ATOMIC_ACQUIRE);
 	return count;
 }
 
@@ -52,6 +53,7 @@  static inline void
 clear_packet_count(void)
 {
 	memset(&worker_stats, 0, sizeof(worker_stats));
+	rte_atomic_thread_fence(__ATOMIC_RELEASE);
 }
 
 /* this is the basic worker function for sanity test
@@ -72,13 +74,13 @@  handle_work(void *arg)
 	num = rte_distributor_get_pkt(db, id, buf, buf, num);
 	while (!quit) {
 		__atomic_fetch_add(&worker_stats[id].handled_packets, num,
-				__ATOMIC_RELAXED);
+				__ATOMIC_ACQ_REL);
 		count += num;
 		num = rte_distributor_get_pkt(db, id,
 				buf, buf, num);
 	}
 	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
-			__ATOMIC_RELAXED);
+			__ATOMIC_ACQ_REL);
 	count += num;
 	rte_distributor_return_pkt(db, id, buf, num);
 	return 0;
@@ -134,7 +136,8 @@  sanity_test(struct worker_params *wp, struct rte_mempool *p)
 
 	for (i = 0; i < rte_lcore_count() - 1; i++)
 		printf("Worker %u handled %u packets\n", i,
-				worker_stats[i].handled_packets);
+			__atomic_load_n(&worker_stats[i].handled_packets,
+					__ATOMIC_ACQUIRE));
 	printf("Sanity test with all zero hashes done.\n");
 
 	/* pick two flows and check they go correctly */
@@ -159,7 +162,9 @@  sanity_test(struct worker_params *wp, struct rte_mempool *p)
 
 		for (i = 0; i < rte_lcore_count() - 1; i++)
 			printf("Worker %u handled %u packets\n", i,
-					worker_stats[i].handled_packets);
+				__atomic_load_n(
+					&worker_stats[i].handled_packets,
+					__ATOMIC_ACQUIRE));
 		printf("Sanity test with two hash values done\n");
 	}
 
@@ -185,7 +190,8 @@  sanity_test(struct worker_params *wp, struct rte_mempool *p)
 
 	for (i = 0; i < rte_lcore_count() - 1; i++)
 		printf("Worker %u handled %u packets\n", i,
-				worker_stats[i].handled_packets);
+			__atomic_load_n(&worker_stats[i].handled_packets,
+					__ATOMIC_ACQUIRE));
 	printf("Sanity test with non-zero hashes done\n");
 
 	rte_mempool_put_bulk(p, (void *)bufs, BURST);
@@ -280,15 +286,17 @@  handle_work_with_free_mbufs(void *arg)
 		buf[i] = NULL;
 	num = rte_distributor_get_pkt(d, id, buf, buf, num);
 	while (!quit) {
-		worker_stats[id].handled_packets += num;
 		count += num;
+		__atomic_fetch_add(&worker_stats[id].handled_packets, num,
+				__ATOMIC_ACQ_REL);
 		for (i = 0; i < num; i++)
 			rte_pktmbuf_free(buf[i]);
 		num = rte_distributor_get_pkt(d,
 				id, buf, buf, num);
 	}
-	worker_stats[id].handled_packets += num;
 	count += num;
+	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
+			__ATOMIC_ACQ_REL);
 	rte_distributor_return_pkt(d, id, buf, num);
 	return 0;
 }
@@ -363,8 +371,9 @@  handle_work_for_shutdown_test(void *arg)
 	/* wait for quit single globally, or for worker zero, wait
 	 * for zero_quit */
 	while (!quit && !(id == zero_id && zero_quit)) {
-		worker_stats[id].handled_packets += num;
 		count += num;
+		__atomic_fetch_add(&worker_stats[id].handled_packets, num,
+				__ATOMIC_ACQ_REL);
 		for (i = 0; i < num; i++)
 			rte_pktmbuf_free(buf[i]);
 		num = rte_distributor_get_pkt(d,
@@ -379,10 +388,11 @@  handle_work_for_shutdown_test(void *arg)
 
 		total += num;
 	}
-	worker_stats[id].handled_packets += num;
 	count += num;
 	returned = rte_distributor_return_pkt(d, id, buf, num);
 
+	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
+			__ATOMIC_ACQ_REL);
 	if (id == zero_id) {
 		/* for worker zero, allow it to restart to pick up last packet
 		 * when all workers are shutting down.
@@ -394,10 +404,11 @@  handle_work_for_shutdown_test(void *arg)
 				id, buf, buf, num);
 
 		while (!quit) {
-			worker_stats[id].handled_packets += num;
 			count += num;
 			rte_pktmbuf_free(pkt);
 			num = rte_distributor_get_pkt(d, id, buf, buf, num);
+			__atomic_fetch_add(&worker_stats[id].handled_packets,
+					num, __ATOMIC_ACQ_REL);
 		}
 		returned = rte_distributor_return_pkt(d,
 				id, buf, num);
@@ -461,7 +472,8 @@  sanity_test_with_worker_shutdown(struct worker_params *wp,
 
 	for (i = 0; i < rte_lcore_count() - 1; i++)
 		printf("Worker %u handled %u packets\n", i,
-				worker_stats[i].handled_packets);
+			__atomic_load_n(&worker_stats[i].handled_packets,
+					__ATOMIC_ACQUIRE));
 
 	if (total_packet_count() != BURST * 2) {
 		printf("Line %d: Error, not all packets flushed. "
@@ -514,7 +526,8 @@  test_flush_with_worker_shutdown(struct worker_params *wp,
 	zero_quit = 0;
 	for (i = 0; i < rte_lcore_count() - 1; i++)
 		printf("Worker %u handled %u packets\n", i,
-				worker_stats[i].handled_packets);
+			__atomic_load_n(&worker_stats[i].handled_packets,
+					__ATOMIC_ACQUIRE));
 
 	if (total_packet_count() != BURST) {
 		printf("Line %d: Error, not all packets flushed. "