[v7,06/16] test/distributor: synchronize lcores statistics
Checks
Commit Message
Statistics of handled packets are cleared and read on main lcore,
while they are increased in workers handlers on different lcores.
Without synchronization occasionally showed invalid values.
This patch uses atomic acquire/release mechanisms to synchronize.
Fixes: c3eabff124e6 ("distributor: add unit tests")
Cc: bruce.richardson@intel.com
Cc: stable@dpdk.org
Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Acked-by: David Hunt <david.hunt@intel.com>
---
app/test/test_distributor.c | 43 +++++++++++++++++++++++++------------
1 file changed, 29 insertions(+), 14 deletions(-)
Comments
Hi Lukasz,
I see that in commit 8/16, the same code is changed again (updating the counters using the RELAXED memory order). It is better to pull the statistics changes from 8/16 into this commit.
Thanks,
Honnappa
> -----Original Message-----
> From: dev <dev-bounces@dpdk.org> On Behalf Of Lukasz Wojciechowski
> Sent: Saturday, October 10, 2020 11:05 AM
> To: David Hunt <david.hunt@intel.com>; Bruce Richardson
> <bruce.richardson@intel.com>
> Cc: dev@dpdk.org; l.wojciechow@partner.samsung.com; stable@dpdk.org
> Subject: [dpdk-dev] [PATCH v7 06/16] test/distributor: synchronize lcores
> statistics
>
> Statistics of handled packets are cleared and read on main lcore, while they
> are increased in workers handlers on different lcores.
>
> Without synchronization occasionally showed invalid values.
> This patch uses atomic acquire/release mechanisms to synchronize.
>
> Fixes: c3eabff124e6 ("distributor: add unit tests")
> Cc: bruce.richardson@intel.com
> Cc: stable@dpdk.org
>
> Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
> Acked-by: David Hunt <david.hunt@intel.com>
> ---
> app/test/test_distributor.c | 43 +++++++++++++++++++++++++------------
> 1 file changed, 29 insertions(+), 14 deletions(-)
>
> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index
> 6cd7a2edd..838459392 100644
> --- a/app/test/test_distributor.c
> +++ b/app/test/test_distributor.c
> @@ -43,7 +43,8 @@ total_packet_count(void) {
> unsigned i, count = 0;
> for (i = 0; i < worker_idx; i++)
> - count += worker_stats[i].handled_packets;
> + count +=
> __atomic_load_n(&worker_stats[i].handled_packets,
> + __ATOMIC_ACQUIRE);
For ex: this line is changed in commit 8/16 as well. It is better to pull the changes from 8/16 to this commit.
> return count;
> }
>
> @@ -51,7 +52,10 @@ total_packet_count(void) static inline void
> clear_packet_count(void)
> {
> - memset(&worker_stats, 0, sizeof(worker_stats));
> + unsigned int i;
> + for (i = 0; i < RTE_MAX_LCORE; i++)
> + __atomic_store_n(&worker_stats[i].handled_packets, 0,
> + __ATOMIC_RELEASE);
> }
>
> /* this is the basic worker function for sanity test @@ -69,13 +73,13 @@
> handle_work(void *arg)
> num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
> while (!quit) {
> __atomic_fetch_add(&worker_stats[id].handled_packets,
> num,
> - __ATOMIC_RELAXED);
> + __ATOMIC_ACQ_REL);
> count += num;
> num = rte_distributor_get_pkt(db, id,
> buf, buf, num);
> }
> __atomic_fetch_add(&worker_stats[id].handled_packets, num,
> - __ATOMIC_RELAXED);
> + __ATOMIC_ACQ_REL);
> count += num;
> rte_distributor_return_pkt(db, id, buf, num);
> return 0;
> @@ -131,7 +135,8 @@ sanity_test(struct worker_params *wp, struct
> rte_mempool *p)
>
> for (i = 0; i < rte_lcore_count() - 1; i++)
> printf("Worker %u handled %u packets\n", i,
> - worker_stats[i].handled_packets);
> + __atomic_load_n(&worker_stats[i].handled_packets,
> + __ATOMIC_ACQUIRE));
> printf("Sanity test with all zero hashes done.\n");
>
> /* pick two flows and check they go correctly */ @@ -156,7 +161,9
> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
>
> for (i = 0; i < rte_lcore_count() - 1; i++)
> printf("Worker %u handled %u packets\n", i,
> - worker_stats[i].handled_packets);
> + __atomic_load_n(
> + &worker_stats[i].handled_packets,
> + __ATOMIC_ACQUIRE));
> printf("Sanity test with two hash values done\n");
> }
>
> @@ -182,7 +189,8 @@ sanity_test(struct worker_params *wp, struct
> rte_mempool *p)
>
> for (i = 0; i < rte_lcore_count() - 1; i++)
> printf("Worker %u handled %u packets\n", i,
> - worker_stats[i].handled_packets);
> + __atomic_load_n(&worker_stats[i].handled_packets,
> + __ATOMIC_ACQUIRE));
> printf("Sanity test with non-zero hashes done\n");
>
> rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -275,14
> +283,16 @@ handle_work_with_free_mbufs(void *arg)
>
> num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
> while (!quit) {
> - worker_stats[id].handled_packets += num;
> count += num;
> + __atomic_fetch_add(&worker_stats[id].handled_packets,
> num,
> + __ATOMIC_ACQ_REL);
> for (i = 0; i < num; i++)
> rte_pktmbuf_free(buf[i]);
> num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
> }
> - worker_stats[id].handled_packets += num;
> count += num;
> + __atomic_fetch_add(&worker_stats[id].handled_packets, num,
> + __ATOMIC_ACQ_REL);
> rte_distributor_return_pkt(d, id, buf, num);
> return 0;
> }
> @@ -358,8 +368,9 @@ handle_work_for_shutdown_test(void *arg)
> /* wait for quit single globally, or for worker zero, wait
> * for zero_quit */
> while (!quit && !(id == zero_id && zero_quit)) {
> - worker_stats[id].handled_packets += num;
> count += num;
> + __atomic_fetch_add(&worker_stats[id].handled_packets,
> num,
> + __ATOMIC_ACQ_REL);
> for (i = 0; i < num; i++)
> rte_pktmbuf_free(buf[i]);
> num = rte_distributor_get_pkt(d, id, buf, NULL, 0); @@ -
> 373,10 +384,11 @@ handle_work_for_shutdown_test(void *arg)
>
> total += num;
> }
> - worker_stats[id].handled_packets += num;
> count += num;
> returned = rte_distributor_return_pkt(d, id, buf, num);
>
> + __atomic_fetch_add(&worker_stats[id].handled_packets, num,
> + __ATOMIC_ACQ_REL);
> if (id == zero_id) {
> /* for worker zero, allow it to restart to pick up last packet
> * when all workers are shutting down.
> @@ -387,7 +399,8 @@ handle_work_for_shutdown_test(void *arg)
> num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>
> while (!quit) {
> - worker_stats[id].handled_packets += num;
> +
> __atomic_fetch_add(&worker_stats[id].handled_packets,
> + num, __ATOMIC_ACQ_REL);
> count += num;
> rte_pktmbuf_free(pkt);
> num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
> @@ -454,7 +467,8 @@ sanity_test_with_worker_shutdown(struct
> worker_params *wp,
>
> for (i = 0; i < rte_lcore_count() - 1; i++)
> printf("Worker %u handled %u packets\n", i,
> - worker_stats[i].handled_packets);
> + __atomic_load_n(&worker_stats[i].handled_packets,
> + __ATOMIC_ACQUIRE));
>
> if (total_packet_count() != BURST * 2) {
> printf("Line %d: Error, not all packets flushed. "
> @@ -507,7 +521,8 @@ test_flush_with_worker_shutdown(struct
> worker_params *wp,
> zero_quit = 0;
> for (i = 0; i < rte_lcore_count() - 1; i++)
> printf("Worker %u handled %u packets\n", i,
> - worker_stats[i].handled_packets);
> + __atomic_load_n(&worker_stats[i].handled_packets,
> + __ATOMIC_ACQUIRE));
>
> if (total_packet_count() != BURST) {
> printf("Line %d: Error, not all packets flushed. "
> --
> 2.17.1
Hi Honnappa,
W dniu 16.10.2020 o 07:13, Honnappa Nagarahalli pisze:
> Hi Lukasz,
> I see that in commit 8/16, the same code is changed again (updating the counters using the RELAXED memory order). It is better to pull the statistics changes from 8/16 into this commit.
I reordered patches: "synchronize lcores statistics" and "fix freeing
mbufs" to avoid changing same code.
Many thanks for the review
Lukasz
>
> Thanks,
> Honnappa
>
>> -----Original Message-----
>> From: dev <dev-bounces@dpdk.org> On Behalf Of Lukasz Wojciechowski
>> Sent: Saturday, October 10, 2020 11:05 AM
>> To: David Hunt <david.hunt@intel.com>; Bruce Richardson
>> <bruce.richardson@intel.com>
>> Cc: dev@dpdk.org; l.wojciechow@partner.samsung.com; stable@dpdk.org
>> Subject: [dpdk-dev] [PATCH v7 06/16] test/distributor: synchronize lcores
>> statistics
>>
>> Statistics of handled packets are cleared and read on main lcore, while they
>> are increased in workers handlers on different lcores.
>>
>> Without synchronization occasionally showed invalid values.
>> This patch uses atomic acquire/release mechanisms to synchronize.
>>
>> Fixes: c3eabff124e6 ("distributor: add unit tests")
>> Cc: bruce.richardson@intel.com
>> Cc: stable@dpdk.org
>>
>> Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
>> Acked-by: David Hunt <david.hunt@intel.com>
>> ---
>> app/test/test_distributor.c | 43 +++++++++++++++++++++++++------------
>> 1 file changed, 29 insertions(+), 14 deletions(-)
>>
>> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index
>> 6cd7a2edd..838459392 100644
>> --- a/app/test/test_distributor.c
>> +++ b/app/test/test_distributor.c
>> @@ -43,7 +43,8 @@ total_packet_count(void) {
>> unsigned i, count = 0;
>> for (i = 0; i < worker_idx; i++)
>> - count += worker_stats[i].handled_packets;
>> + count +=
>> __atomic_load_n(&worker_stats[i].handled_packets,
>> + __ATOMIC_ACQUIRE);
> For ex: this line is changed in commit 8/16 as well. It is better to pull the changes from 8/16 to this commit.
>
>> return count;
>> }
>>
>> @@ -51,7 +52,10 @@ total_packet_count(void) static inline void
>> clear_packet_count(void)
>> {
>> - memset(&worker_stats, 0, sizeof(worker_stats));
>> + unsigned int i;
>> + for (i = 0; i < RTE_MAX_LCORE; i++)
>> + __atomic_store_n(&worker_stats[i].handled_packets, 0,
>> + __ATOMIC_RELEASE);
>> }
>>
>> /* this is the basic worker function for sanity test @@ -69,13 +73,13 @@
>> handle_work(void *arg)
>> num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
>> while (!quit) {
>> __atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> - __ATOMIC_RELAXED);
>> + __ATOMIC_ACQ_REL);
>> count += num;
>> num = rte_distributor_get_pkt(db, id,
>> buf, buf, num);
>> }
>> __atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> - __ATOMIC_RELAXED);
>> + __ATOMIC_ACQ_REL);
>> count += num;
>> rte_distributor_return_pkt(db, id, buf, num);
>> return 0;
>> @@ -131,7 +135,8 @@ sanity_test(struct worker_params *wp, struct
>> rte_mempool *p)
>>
>> for (i = 0; i < rte_lcore_count() - 1; i++)
>> printf("Worker %u handled %u packets\n", i,
>> - worker_stats[i].handled_packets);
>> + __atomic_load_n(&worker_stats[i].handled_packets,
>> + __ATOMIC_ACQUIRE));
>> printf("Sanity test with all zero hashes done.\n");
>>
>> /* pick two flows and check they go correctly */ @@ -156,7 +161,9
>> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
>>
>> for (i = 0; i < rte_lcore_count() - 1; i++)
>> printf("Worker %u handled %u packets\n", i,
>> - worker_stats[i].handled_packets);
>> + __atomic_load_n(
>> + &worker_stats[i].handled_packets,
>> + __ATOMIC_ACQUIRE));
>> printf("Sanity test with two hash values done\n");
>> }
>>
>> @@ -182,7 +189,8 @@ sanity_test(struct worker_params *wp, struct
>> rte_mempool *p)
>>
>> for (i = 0; i < rte_lcore_count() - 1; i++)
>> printf("Worker %u handled %u packets\n", i,
>> - worker_stats[i].handled_packets);
>> + __atomic_load_n(&worker_stats[i].handled_packets,
>> + __ATOMIC_ACQUIRE));
>> printf("Sanity test with non-zero hashes done\n");
>>
>> rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -275,14
>> +283,16 @@ handle_work_with_free_mbufs(void *arg)
>>
>> num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>> while (!quit) {
>> - worker_stats[id].handled_packets += num;
>> count += num;
>> + __atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> + __ATOMIC_ACQ_REL);
>> for (i = 0; i < num; i++)
>> rte_pktmbuf_free(buf[i]);
>> num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>> }
>> - worker_stats[id].handled_packets += num;
>> count += num;
>> + __atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> + __ATOMIC_ACQ_REL);
>> rte_distributor_return_pkt(d, id, buf, num);
>> return 0;
>> }
>> @@ -358,8 +368,9 @@ handle_work_for_shutdown_test(void *arg)
>> /* wait for quit single globally, or for worker zero, wait
>> * for zero_quit */
>> while (!quit && !(id == zero_id && zero_quit)) {
>> - worker_stats[id].handled_packets += num;
>> count += num;
>> + __atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> + __ATOMIC_ACQ_REL);
>> for (i = 0; i < num; i++)
>> rte_pktmbuf_free(buf[i]);
>> num = rte_distributor_get_pkt(d, id, buf, NULL, 0); @@ -
>> 373,10 +384,11 @@ handle_work_for_shutdown_test(void *arg)
>>
>> total += num;
>> }
>> - worker_stats[id].handled_packets += num;
>> count += num;
>> returned = rte_distributor_return_pkt(d, id, buf, num);
>>
>> + __atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> + __ATOMIC_ACQ_REL);
>> if (id == zero_id) {
>> /* for worker zero, allow it to restart to pick up last packet
>> * when all workers are shutting down.
>> @@ -387,7 +399,8 @@ handle_work_for_shutdown_test(void *arg)
>> num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>>
>> while (!quit) {
>> - worker_stats[id].handled_packets += num;
>> +
>> __atomic_fetch_add(&worker_stats[id].handled_packets,
>> + num, __ATOMIC_ACQ_REL);
>> count += num;
>> rte_pktmbuf_free(pkt);
>> num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>> @@ -454,7 +467,8 @@ sanity_test_with_worker_shutdown(struct
>> worker_params *wp,
>>
>> for (i = 0; i < rte_lcore_count() - 1; i++)
>> printf("Worker %u handled %u packets\n", i,
>> - worker_stats[i].handled_packets);
>> + __atomic_load_n(&worker_stats[i].handled_packets,
>> + __ATOMIC_ACQUIRE));
>>
>> if (total_packet_count() != BURST * 2) {
>> printf("Line %d: Error, not all packets flushed. "
>> @@ -507,7 +521,8 @@ test_flush_with_worker_shutdown(struct
>> worker_params *wp,
>> zero_quit = 0;
>> for (i = 0; i < rte_lcore_count() - 1; i++)
>> printf("Worker %u handled %u packets\n", i,
>> - worker_stats[i].handled_packets);
>> + __atomic_load_n(&worker_stats[i].handled_packets,
>> + __ATOMIC_ACQUIRE));
>>
>> if (total_packet_count() != BURST) {
>> printf("Line %d: Error, not all packets flushed. "
>> --
>> 2.17.1
@@ -43,7 +43,8 @@ total_packet_count(void)
{
unsigned i, count = 0;
for (i = 0; i < worker_idx; i++)
- count += worker_stats[i].handled_packets;
+ count += __atomic_load_n(&worker_stats[i].handled_packets,
+ __ATOMIC_ACQUIRE);
return count;
}
@@ -51,7 +52,10 @@ total_packet_count(void)
static inline void
clear_packet_count(void)
{
- memset(&worker_stats, 0, sizeof(worker_stats));
+ unsigned int i;
+ for (i = 0; i < RTE_MAX_LCORE; i++)
+ __atomic_store_n(&worker_stats[i].handled_packets, 0,
+ __ATOMIC_RELEASE);
}
/* this is the basic worker function for sanity test
@@ -69,13 +73,13 @@ handle_work(void *arg)
num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
while (!quit) {
__atomic_fetch_add(&worker_stats[id].handled_packets, num,
- __ATOMIC_RELAXED);
+ __ATOMIC_ACQ_REL);
count += num;
num = rte_distributor_get_pkt(db, id,
buf, buf, num);
}
__atomic_fetch_add(&worker_stats[id].handled_packets, num,
- __ATOMIC_RELAXED);
+ __ATOMIC_ACQ_REL);
count += num;
rte_distributor_return_pkt(db, id, buf, num);
return 0;
@@ -131,7 +135,8 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
for (i = 0; i < rte_lcore_count() - 1; i++)
printf("Worker %u handled %u packets\n", i,
- worker_stats[i].handled_packets);
+ __atomic_load_n(&worker_stats[i].handled_packets,
+ __ATOMIC_ACQUIRE));
printf("Sanity test with all zero hashes done.\n");
/* pick two flows and check they go correctly */
@@ -156,7 +161,9 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
for (i = 0; i < rte_lcore_count() - 1; i++)
printf("Worker %u handled %u packets\n", i,
- worker_stats[i].handled_packets);
+ __atomic_load_n(
+ &worker_stats[i].handled_packets,
+ __ATOMIC_ACQUIRE));
printf("Sanity test with two hash values done\n");
}
@@ -182,7 +189,8 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
for (i = 0; i < rte_lcore_count() - 1; i++)
printf("Worker %u handled %u packets\n", i,
- worker_stats[i].handled_packets);
+ __atomic_load_n(&worker_stats[i].handled_packets,
+ __ATOMIC_ACQUIRE));
printf("Sanity test with non-zero hashes done\n");
rte_mempool_put_bulk(p, (void *)bufs, BURST);
@@ -275,14 +283,16 @@ handle_work_with_free_mbufs(void *arg)
num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
while (!quit) {
- worker_stats[id].handled_packets += num;
count += num;
+ __atomic_fetch_add(&worker_stats[id].handled_packets, num,
+ __ATOMIC_ACQ_REL);
for (i = 0; i < num; i++)
rte_pktmbuf_free(buf[i]);
num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
}
- worker_stats[id].handled_packets += num;
count += num;
+ __atomic_fetch_add(&worker_stats[id].handled_packets, num,
+ __ATOMIC_ACQ_REL);
rte_distributor_return_pkt(d, id, buf, num);
return 0;
}
@@ -358,8 +368,9 @@ handle_work_for_shutdown_test(void *arg)
/* wait for quit single globally, or for worker zero, wait
* for zero_quit */
while (!quit && !(id == zero_id && zero_quit)) {
- worker_stats[id].handled_packets += num;
count += num;
+ __atomic_fetch_add(&worker_stats[id].handled_packets, num,
+ __ATOMIC_ACQ_REL);
for (i = 0; i < num; i++)
rte_pktmbuf_free(buf[i]);
num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
@@ -373,10 +384,11 @@ handle_work_for_shutdown_test(void *arg)
total += num;
}
- worker_stats[id].handled_packets += num;
count += num;
returned = rte_distributor_return_pkt(d, id, buf, num);
+ __atomic_fetch_add(&worker_stats[id].handled_packets, num,
+ __ATOMIC_ACQ_REL);
if (id == zero_id) {
/* for worker zero, allow it to restart to pick up last packet
* when all workers are shutting down.
@@ -387,7 +399,8 @@ handle_work_for_shutdown_test(void *arg)
num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
while (!quit) {
- worker_stats[id].handled_packets += num;
+ __atomic_fetch_add(&worker_stats[id].handled_packets,
+ num, __ATOMIC_ACQ_REL);
count += num;
rte_pktmbuf_free(pkt);
num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
@@ -454,7 +467,8 @@ sanity_test_with_worker_shutdown(struct worker_params *wp,
for (i = 0; i < rte_lcore_count() - 1; i++)
printf("Worker %u handled %u packets\n", i,
- worker_stats[i].handled_packets);
+ __atomic_load_n(&worker_stats[i].handled_packets,
+ __ATOMIC_ACQUIRE));
if (total_packet_count() != BURST * 2) {
printf("Line %d: Error, not all packets flushed. "
@@ -507,7 +521,8 @@ test_flush_with_worker_shutdown(struct worker_params *wp,
zero_quit = 0;
for (i = 0; i < rte_lcore_count() - 1; i++)
printf("Worker %u handled %u packets\n", i,
- worker_stats[i].handled_packets);
+ __atomic_load_n(&worker_stats[i].handled_packets,
+ __ATOMIC_ACQUIRE));
if (total_packet_count() != BURST) {
printf("Line %d: Error, not all packets flushed. "