[v2,2/8] app/test: synchronize statistics between lcores
Checks
Commit Message
Statistics of handled packets are cleared and read on main lcore,
while they are increased in workers handlers on different lcores.
Without synchronization occasionally showed invalid values.
This patch uses atomic acquire/release mechanisms to synchronize.
Fixes: c3eabff124e6 ("distributor: add unit tests")
Cc: bruce.richardson@intel.com
Cc: stable@dpdk.org
Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
---
app/test/test_distributor.c | 39 ++++++++++++++++++++++++-------------
1 file changed, 26 insertions(+), 13 deletions(-)
Comments
<snip>
>
> Statistics of handled packets are cleared and read on main lcore, while they
> are increased in workers handlers on different lcores.
>
> Without synchronization occasionally showed invalid values.
What exactly do you mean by invalid values? Can you elaborate?
> This patch uses atomic acquire/release mechanisms to synchronize.
>
> Fixes: c3eabff124e6 ("distributor: add unit tests")
> Cc: bruce.richardson@intel.com
> Cc: stable@dpdk.org
>
> Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
> ---
> app/test/test_distributor.c | 39 ++++++++++++++++++++++++-------------
> 1 file changed, 26 insertions(+), 13 deletions(-)
>
> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index
> 35b25463a..0e49e3714 100644
> --- a/app/test/test_distributor.c
> +++ b/app/test/test_distributor.c
> @@ -43,7 +43,8 @@ total_packet_count(void) {
> unsigned i, count = 0;
> for (i = 0; i < worker_idx; i++)
> - count += worker_stats[i].handled_packets;
> + count +=
> __atomic_load_n(&worker_stats[i].handled_packets,
> + __ATOMIC_ACQUIRE);
> return count;
> }
>
> @@ -52,6 +53,7 @@ static inline void
> clear_packet_count(void)
> {
> memset(&worker_stats, 0, sizeof(worker_stats));
> + rte_atomic_thread_fence(__ATOMIC_RELEASE);
> }
>
> /* this is the basic worker function for sanity test @@ -72,13 +74,13 @@
> handle_work(void *arg)
> num = rte_distributor_get_pkt(db, id, buf, buf, num);
> while (!quit) {
> __atomic_fetch_add(&worker_stats[id].handled_packets,
> num,
> - __ATOMIC_RELAXED);
> + __ATOMIC_ACQ_REL);
> count += num;
> num = rte_distributor_get_pkt(db, id,
> buf, buf, num);
> }
> __atomic_fetch_add(&worker_stats[id].handled_packets, num,
> - __ATOMIC_RELAXED);
> + __ATOMIC_ACQ_REL);
> count += num;
> rte_distributor_return_pkt(db, id, buf, num);
> return 0;
> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct
> rte_mempool *p)
>
> for (i = 0; i < rte_lcore_count() - 1; i++)
> printf("Worker %u handled %u packets\n", i,
> - worker_stats[i].handled_packets);
> + __atomic_load_n(&worker_stats[i].handled_packets,
> + __ATOMIC_ACQUIRE));
> printf("Sanity test with all zero hashes done.\n");
>
> /* pick two flows and check they go correctly */ @@ -159,7 +162,9
> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
>
> for (i = 0; i < rte_lcore_count() - 1; i++)
> printf("Worker %u handled %u packets\n", i,
> - worker_stats[i].handled_packets);
> + __atomic_load_n(
> + &worker_stats[i].handled_packets,
> + __ATOMIC_ACQUIRE));
> printf("Sanity test with two hash values done\n");
> }
>
> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct
> rte_mempool *p)
>
> for (i = 0; i < rte_lcore_count() - 1; i++)
> printf("Worker %u handled %u packets\n", i,
> - worker_stats[i].handled_packets);
> + __atomic_load_n(&worker_stats[i].handled_packets,
> + __ATOMIC_ACQUIRE));
> printf("Sanity test with non-zero hashes done\n");
>
> rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15
> +286,17 @@ handle_work_with_free_mbufs(void *arg)
> buf[i] = NULL;
> num = rte_distributor_get_pkt(d, id, buf, buf, num);
> while (!quit) {
> - worker_stats[id].handled_packets += num;
> count += num;
> + __atomic_fetch_add(&worker_stats[id].handled_packets,
> num,
> + __ATOMIC_ACQ_REL);
> for (i = 0; i < num; i++)
> rte_pktmbuf_free(buf[i]);
> num = rte_distributor_get_pkt(d,
> id, buf, buf, num);
> }
> - worker_stats[id].handled_packets += num;
> count += num;
> + __atomic_fetch_add(&worker_stats[id].handled_packets, num,
> + __ATOMIC_ACQ_REL);
> rte_distributor_return_pkt(d, id, buf, num);
> return 0;
> }
> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg)
> /* wait for quit single globally, or for worker zero, wait
> * for zero_quit */
> while (!quit && !(id == zero_id && zero_quit)) {
> - worker_stats[id].handled_packets += num;
> count += num;
> + __atomic_fetch_add(&worker_stats[id].handled_packets,
> num,
> + __ATOMIC_ACQ_REL);
> for (i = 0; i < num; i++)
> rte_pktmbuf_free(buf[i]);
> num = rte_distributor_get_pkt(d,
> @@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg)
>
> total += num;
> }
> - worker_stats[id].handled_packets += num;
> count += num;
> returned = rte_distributor_return_pkt(d, id, buf, num);
>
> + __atomic_fetch_add(&worker_stats[id].handled_packets, num,
> + __ATOMIC_ACQ_REL);
> if (id == zero_id) {
> /* for worker zero, allow it to restart to pick up last packet
> * when all workers are shutting down.
> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg)
> id, buf, buf, num);
>
> while (!quit) {
> - worker_stats[id].handled_packets += num;
> count += num;
> rte_pktmbuf_free(pkt);
> num = rte_distributor_get_pkt(d, id, buf, buf, num);
> +
> __atomic_fetch_add(&worker_stats[id].handled_packets,
> + num, __ATOMIC_ACQ_REL);
> }
> returned = rte_distributor_return_pkt(d,
> id, buf, num);
> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct
> worker_params *wp,
>
> for (i = 0; i < rte_lcore_count() - 1; i++)
> printf("Worker %u handled %u packets\n", i,
> - worker_stats[i].handled_packets);
> + __atomic_load_n(&worker_stats[i].handled_packets,
> + __ATOMIC_ACQUIRE));
>
> if (total_packet_count() != BURST * 2) {
> printf("Line %d: Error, not all packets flushed. "
> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct
> worker_params *wp,
> zero_quit = 0;
> for (i = 0; i < rte_lcore_count() - 1; i++)
> printf("Worker %u handled %u packets\n", i,
> - worker_stats[i].handled_packets);
> + __atomic_load_n(&worker_stats[i].handled_packets,
> + __ATOMIC_ACQUIRE));
>
> if (total_packet_count() != BURST) {
> printf("Line %d: Error, not all packets flushed. "
> --
> 2.17.1
W dniu 23.09.2020 o 06:30, Honnappa Nagarahalli pisze:
> <snip>
>
>> Statistics of handled packets are cleared and read on main lcore, while they
>> are increased in workers handlers on different lcores.
>>
>> Without synchronization occasionally showed invalid values.
> What exactly do you mean by invalid values? Can you elaborate?
I mean values that shouldn't be there which are obviously not related to
number of packets handled by workers.
I reverted the patch and run stress tests. Failures without this patch
look like these:
=== Test flush fn with worker shutdown (burst) ===
Worker 0 handled 0 packets
Worker 1 handled 0 packets
Worker 2 handled 0 packets
Worker 3 handled 0 packets
Worker 4 handled 32 packets
Worker 5 handled 0 packets
Worker 6 handled 6 packets
Line 519: Error, not all packets flushed. Expected 32, got 38
Test Failed
or:
=== Sanity test of worker shutdown ===
Worker 0 handled 0 packets
Worker 1 handled 0 packets
Worker 2 handled 0 packets
Worker 3 handled 0 packets
Worker 4 handled 0 packets
Worker 5 handled 64 packets
Worker 6 handled 149792 packets
Line 466: Error, not all packets flushed. Expected 64, got 149856
Test Failed
The 6 or 149792 packets reported by worker 6 were never sent to or
processed by the workers.
>> This patch uses atomic acquire/release mechanisms to synchronize.
>>
>> Fixes: c3eabff124e6 ("distributor: add unit tests")
>> Cc: bruce.richardson@intel.com
>> Cc: stable@dpdk.org
>>
>> Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
>> ---
>> app/test/test_distributor.c | 39 ++++++++++++++++++++++++-------------
>> 1 file changed, 26 insertions(+), 13 deletions(-)
>>
>> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index
>> 35b25463a..0e49e3714 100644
>> --- a/app/test/test_distributor.c
>> +++ b/app/test/test_distributor.c
>> @@ -43,7 +43,8 @@ total_packet_count(void) {
>> unsigned i, count = 0;
>> for (i = 0; i < worker_idx; i++)
>> - count += worker_stats[i].handled_packets;
>> + count +=
>> __atomic_load_n(&worker_stats[i].handled_packets,
>> + __ATOMIC_ACQUIRE);
>> return count;
>> }
>>
>> @@ -52,6 +53,7 @@ static inline void
>> clear_packet_count(void)
>> {
>> memset(&worker_stats, 0, sizeof(worker_stats));
>> + rte_atomic_thread_fence(__ATOMIC_RELEASE);
>> }
>>
>> /* this is the basic worker function for sanity test @@ -72,13 +74,13 @@
>> handle_work(void *arg)
>> num = rte_distributor_get_pkt(db, id, buf, buf, num);
>> while (!quit) {
>> __atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> - __ATOMIC_RELAXED);
>> + __ATOMIC_ACQ_REL);
>> count += num;
>> num = rte_distributor_get_pkt(db, id,
>> buf, buf, num);
>> }
>> __atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> - __ATOMIC_RELAXED);
>> + __ATOMIC_ACQ_REL);
>> count += num;
>> rte_distributor_return_pkt(db, id, buf, num);
>> return 0;
>> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct
>> rte_mempool *p)
>>
>> for (i = 0; i < rte_lcore_count() - 1; i++)
>> printf("Worker %u handled %u packets\n", i,
>> - worker_stats[i].handled_packets);
>> + __atomic_load_n(&worker_stats[i].handled_packets,
>> + __ATOMIC_ACQUIRE));
>> printf("Sanity test with all zero hashes done.\n");
>>
>> /* pick two flows and check they go correctly */ @@ -159,7 +162,9
>> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
>>
>> for (i = 0; i < rte_lcore_count() - 1; i++)
>> printf("Worker %u handled %u packets\n", i,
>> - worker_stats[i].handled_packets);
>> + __atomic_load_n(
>> + &worker_stats[i].handled_packets,
>> + __ATOMIC_ACQUIRE));
>> printf("Sanity test with two hash values done\n");
>> }
>>
>> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct
>> rte_mempool *p)
>>
>> for (i = 0; i < rte_lcore_count() - 1; i++)
>> printf("Worker %u handled %u packets\n", i,
>> - worker_stats[i].handled_packets);
>> + __atomic_load_n(&worker_stats[i].handled_packets,
>> + __ATOMIC_ACQUIRE));
>> printf("Sanity test with non-zero hashes done\n");
>>
>> rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15
>> +286,17 @@ handle_work_with_free_mbufs(void *arg)
>> buf[i] = NULL;
>> num = rte_distributor_get_pkt(d, id, buf, buf, num);
>> while (!quit) {
>> - worker_stats[id].handled_packets += num;
>> count += num;
>> + __atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> + __ATOMIC_ACQ_REL);
>> for (i = 0; i < num; i++)
>> rte_pktmbuf_free(buf[i]);
>> num = rte_distributor_get_pkt(d,
>> id, buf, buf, num);
>> }
>> - worker_stats[id].handled_packets += num;
>> count += num;
>> + __atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> + __ATOMIC_ACQ_REL);
>> rte_distributor_return_pkt(d, id, buf, num);
>> return 0;
>> }
>> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg)
>> /* wait for quit single globally, or for worker zero, wait
>> * for zero_quit */
>> while (!quit && !(id == zero_id && zero_quit)) {
>> - worker_stats[id].handled_packets += num;
>> count += num;
>> + __atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> + __ATOMIC_ACQ_REL);
>> for (i = 0; i < num; i++)
>> rte_pktmbuf_free(buf[i]);
>> num = rte_distributor_get_pkt(d,
>> @@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg)
>>
>> total += num;
>> }
>> - worker_stats[id].handled_packets += num;
>> count += num;
>> returned = rte_distributor_return_pkt(d, id, buf, num);
>>
>> + __atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> + __ATOMIC_ACQ_REL);
>> if (id == zero_id) {
>> /* for worker zero, allow it to restart to pick up last packet
>> * when all workers are shutting down.
>> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg)
>> id, buf, buf, num);
>>
>> while (!quit) {
>> - worker_stats[id].handled_packets += num;
>> count += num;
>> rte_pktmbuf_free(pkt);
>> num = rte_distributor_get_pkt(d, id, buf, buf, num);
>> +
>> __atomic_fetch_add(&worker_stats[id].handled_packets,
>> + num, __ATOMIC_ACQ_REL);
>> }
>> returned = rte_distributor_return_pkt(d,
>> id, buf, num);
>> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct
>> worker_params *wp,
>>
>> for (i = 0; i < rte_lcore_count() - 1; i++)
>> printf("Worker %u handled %u packets\n", i,
>> - worker_stats[i].handled_packets);
>> + __atomic_load_n(&worker_stats[i].handled_packets,
>> + __ATOMIC_ACQUIRE));
>>
>> if (total_packet_count() != BURST * 2) {
>> printf("Line %d: Error, not all packets flushed. "
>> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct
>> worker_params *wp,
>> zero_quit = 0;
>> for (i = 0; i < rte_lcore_count() - 1; i++)
>> printf("Worker %u handled %u packets\n", i,
>> - worker_stats[i].handled_packets);
>> + __atomic_load_n(&worker_stats[i].handled_packets,
>> + __ATOMIC_ACQUIRE));
>>
>> if (total_packet_count() != BURST) {
>> printf("Line %d: Error, not all packets flushed. "
>> --
>> 2.17.1
@@ -43,7 +43,8 @@ total_packet_count(void)
{
unsigned i, count = 0;
for (i = 0; i < worker_idx; i++)
- count += worker_stats[i].handled_packets;
+ count += __atomic_load_n(&worker_stats[i].handled_packets,
+ __ATOMIC_ACQUIRE);
return count;
}
@@ -52,6 +53,7 @@ static inline void
clear_packet_count(void)
{
memset(&worker_stats, 0, sizeof(worker_stats));
+ rte_atomic_thread_fence(__ATOMIC_RELEASE);
}
/* this is the basic worker function for sanity test
@@ -72,13 +74,13 @@ handle_work(void *arg)
num = rte_distributor_get_pkt(db, id, buf, buf, num);
while (!quit) {
__atomic_fetch_add(&worker_stats[id].handled_packets, num,
- __ATOMIC_RELAXED);
+ __ATOMIC_ACQ_REL);
count += num;
num = rte_distributor_get_pkt(db, id,
buf, buf, num);
}
__atomic_fetch_add(&worker_stats[id].handled_packets, num,
- __ATOMIC_RELAXED);
+ __ATOMIC_ACQ_REL);
count += num;
rte_distributor_return_pkt(db, id, buf, num);
return 0;
@@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
for (i = 0; i < rte_lcore_count() - 1; i++)
printf("Worker %u handled %u packets\n", i,
- worker_stats[i].handled_packets);
+ __atomic_load_n(&worker_stats[i].handled_packets,
+ __ATOMIC_ACQUIRE));
printf("Sanity test with all zero hashes done.\n");
/* pick two flows and check they go correctly */
@@ -159,7 +162,9 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
for (i = 0; i < rte_lcore_count() - 1; i++)
printf("Worker %u handled %u packets\n", i,
- worker_stats[i].handled_packets);
+ __atomic_load_n(
+ &worker_stats[i].handled_packets,
+ __ATOMIC_ACQUIRE));
printf("Sanity test with two hash values done\n");
}
@@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
for (i = 0; i < rte_lcore_count() - 1; i++)
printf("Worker %u handled %u packets\n", i,
- worker_stats[i].handled_packets);
+ __atomic_load_n(&worker_stats[i].handled_packets,
+ __ATOMIC_ACQUIRE));
printf("Sanity test with non-zero hashes done\n");
rte_mempool_put_bulk(p, (void *)bufs, BURST);
@@ -280,15 +286,17 @@ handle_work_with_free_mbufs(void *arg)
buf[i] = NULL;
num = rte_distributor_get_pkt(d, id, buf, buf, num);
while (!quit) {
- worker_stats[id].handled_packets += num;
count += num;
+ __atomic_fetch_add(&worker_stats[id].handled_packets, num,
+ __ATOMIC_ACQ_REL);
for (i = 0; i < num; i++)
rte_pktmbuf_free(buf[i]);
num = rte_distributor_get_pkt(d,
id, buf, buf, num);
}
- worker_stats[id].handled_packets += num;
count += num;
+ __atomic_fetch_add(&worker_stats[id].handled_packets, num,
+ __ATOMIC_ACQ_REL);
rte_distributor_return_pkt(d, id, buf, num);
return 0;
}
@@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg)
/* wait for quit single globally, or for worker zero, wait
* for zero_quit */
while (!quit && !(id == zero_id && zero_quit)) {
- worker_stats[id].handled_packets += num;
count += num;
+ __atomic_fetch_add(&worker_stats[id].handled_packets, num,
+ __ATOMIC_ACQ_REL);
for (i = 0; i < num; i++)
rte_pktmbuf_free(buf[i]);
num = rte_distributor_get_pkt(d,
@@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg)
total += num;
}
- worker_stats[id].handled_packets += num;
count += num;
returned = rte_distributor_return_pkt(d, id, buf, num);
+ __atomic_fetch_add(&worker_stats[id].handled_packets, num,
+ __ATOMIC_ACQ_REL);
if (id == zero_id) {
/* for worker zero, allow it to restart to pick up last packet
* when all workers are shutting down.
@@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg)
id, buf, buf, num);
while (!quit) {
- worker_stats[id].handled_packets += num;
count += num;
rte_pktmbuf_free(pkt);
num = rte_distributor_get_pkt(d, id, buf, buf, num);
+ __atomic_fetch_add(&worker_stats[id].handled_packets,
+ num, __ATOMIC_ACQ_REL);
}
returned = rte_distributor_return_pkt(d,
id, buf, num);
@@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct worker_params *wp,
for (i = 0; i < rte_lcore_count() - 1; i++)
printf("Worker %u handled %u packets\n", i,
- worker_stats[i].handled_packets);
+ __atomic_load_n(&worker_stats[i].handled_packets,
+ __ATOMIC_ACQUIRE));
if (total_packet_count() != BURST * 2) {
printf("Line %d: Error, not all packets flushed. "
@@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct worker_params *wp,
zero_quit = 0;
for (i = 0; i < rte_lcore_count() - 1; i++)
printf("Worker %u handled %u packets\n", i,
- worker_stats[i].handled_packets);
+ __atomic_load_n(&worker_stats[i].handled_packets,
+ __ATOMIC_ACQUIRE));
if (total_packet_count() != BURST) {
printf("Line %d: Error, not all packets flushed. "