[v5,13/15] test/distributor: add test with packets marking

Message ID 20201008052323.11547-14-l.wojciechow@partner.samsung.com (mailing list archive)
State Superseded, archived
Delegated to: David Marchand
Headers
Series fix distributor synchronization issues |

Checks

Context Check Description
ci/checkpatch warning coding style issues

Commit Message

Lukasz Wojciechowski Oct. 8, 2020, 5:23 a.m. UTC
  All of the former tests analyzed only statistics
of packets processed by all workers.
The new test verifies also if packets are processed
on workers as expected.
Every packets processed by the worker is marked
and analyzed after it is returned to distributor.

This test allows finding issues in matching algorithms.

Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
---
 app/test/test_distributor.c | 141 ++++++++++++++++++++++++++++++++++++
 1 file changed, 141 insertions(+)
  

Comments

Hunt, David Oct. 9, 2020, 12:50 p.m. UTC | #1
Hi Lukasz,

On 8/10/2020 6:23 AM, Lukasz Wojciechowski wrote:
> All of the former tests analyzed only statistics
> of packets processed by all workers.
> The new test verifies also if packets are processed
> on workers as expected.
> Every packets processed by the worker is marked
> and analyzed after it is returned to distributor.
>
> This test allows finding issues in matching algorithms.
>
> Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
> ---
>   app/test/test_distributor.c | 141 ++++++++++++++++++++++++++++++++++++
>   1 file changed, 141 insertions(+)
>
> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
> index 1e0a079ff..0404e463a 100644
> --- a/app/test/test_distributor.c
> +++ b/app/test/test_distributor.c
> @@ -542,6 +542,141 @@ test_flush_with_worker_shutdown(struct worker_params *wp,
>   	return 0;
>   }
>   
> +static int
> +handle_and_mark_work(void *arg)
> +{
> +	struct rte_mbuf *buf[8] __rte_cache_aligned;
> +	struct worker_params *wp = arg;
> +	struct rte_distributor *db = wp->dist;
> +	unsigned int num, i;
> +	unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
> +	num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
> +	while (!quit) {
> +		__atomic_fetch_add(&worker_stats[id].handled_packets, num,
> +				__ATOMIC_ACQ_REL);
> +		for (i = 0; i < num; i++)
> +			buf[i]->udata64 += id + 1;
> +		num = rte_distributor_get_pkt(db, id,
> +				buf, buf, num);
> +	}
> +	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
> +			__ATOMIC_ACQ_REL);
> +	rte_distributor_return_pkt(db, id, buf, num);
> +	return 0;
> +}
> +
> +/* sanity_mark_test sends packets to workers which mark them.
> + * Every packet has also encoded sequence number.
> + * The returned packets are sorted and verified if they were handled
> + * by proper workers.
> + */
> +static int
> +sanity_mark_test(struct worker_params *wp, struct rte_mempool *p)
> +{
> +	const unsigned int buf_count = 24;
> +	const unsigned int burst = 8;
> +	const unsigned int shift = 12;
> +	const unsigned int seq_shift = 10;
> +
> +	struct rte_distributor *db = wp->dist;
> +	struct rte_mbuf *bufs[buf_count];
> +	struct rte_mbuf *returns[buf_count];
> +	unsigned int i, count, id;
> +	unsigned int sorted[buf_count], seq;
> +	unsigned int failed = 0;
> +
> +	printf("=== Marked packets test ===\n");
> +	clear_packet_count();
> +	if (rte_mempool_get_bulk(p, (void *)bufs, buf_count) != 0) {
> +		printf("line %d: Error getting mbufs from pool\n", __LINE__);
> +		return -1;
> +	}
> +
> +/* bufs' hashes will be like these below, but shifted left.
> + * The shifting is for avoiding collisions with backlogs
> + * and in-flight tags left by previous tests.
> + * [1, 1, 1, 1, 1, 1, 1, 1
> + *  1, 1, 1, 1, 2, 2, 2, 2
> + *  2, 2, 2, 2, 1, 1, 1, 1]
> + */

I would suggest indenting the comments to the same indent level as the 
code, this
would make the flow easier to read. Same with additional comments below.


> +	for (i = 0; i < burst; i++) {
> +		bufs[0 * burst + i]->hash.usr = 1 << shift;
> +		bufs[1 * burst + i]->hash.usr = ((i < burst / 2) ? 1 : 2)
> +			<< shift;
> +		bufs[2 * burst + i]->hash.usr = ((i < burst / 2) ? 2 : 1)
> +			<< shift;
> +	}
> +/* Assign a sequence number to each packet. The sequence is shifted,
> + * so that lower bits of the udate64 will hold mark from worker.
> + */
> +	for (i = 0; i < buf_count; i++)
> +		bufs[i]->udata64 = i << seq_shift;
> +
> +	count = 0;
> +	for (i = 0; i < buf_count/burst; i++) {
> +		rte_distributor_process(db, &bufs[i * burst], burst);
> +		count += rte_distributor_returned_pkts(db, &returns[count],
> +			buf_count - count);
> +	}
> +
> +	do {
> +		rte_distributor_flush(db);
> +		count += rte_distributor_returned_pkts(db, &returns[count],
> +			buf_count - count);
> +	} while (count < buf_count);
> +
> +	for (i = 0; i < rte_lcore_count() - 1; i++)
> +		printf("Worker %u handled %u packets\n", i,
> +			__atomic_load_n(&worker_stats[i].handled_packets,
> +					__ATOMIC_ACQUIRE));
> +
> +/* Sort returned packets by sent order (sequence numbers). */
> +	for (i = 0; i < buf_count; i++) {
> +		seq = returns[i]->udata64 >> seq_shift;
> +		id = returns[i]->udata64 - (seq << seq_shift);
> +		sorted[seq] = id;
> +	}
> +
> +/* Verify that packets [0-11] and [20-23] were processed
> + * by the same worker
> + */
> +	for (i = 1; i < 12; i++) {
> +		if (sorted[i] != sorted[0]) {
> +			printf("Packet number %u processed by worker %u,"
> +				" but should be processes by worker %u\n",
> +				i, sorted[i], sorted[0]);
> +			failed = 1;
> +		}
> +	}
> +	for (i = 20; i < 24; i++) {
> +		if (sorted[i] != sorted[0]) {
> +			printf("Packet number %u processed by worker %u,"
> +				" but should be processes by worker %u\n",
> +				i, sorted[i], sorted[0]);
> +			failed = 1;
> +		}
> +	}
> +/* And verify that packets [12-19] were processed
> + * by the another worker
> + */
> +	for (i = 13; i < 20; i++) {
> +		if (sorted[i] != sorted[12]) {
> +			printf("Packet number %u processed by worker %u,"
> +				" but should be processes by worker %u\n",
> +				i, sorted[i], sorted[12]);
> +			failed = 1;
> +		}
> +	}
> +
> +	rte_mempool_put_bulk(p, (void *)bufs, buf_count);
> +
> +	if (failed)
> +		return -1;
> +
> +	printf("Marked packets test passed\n");
> +	return 0;
> +}
> +
>   static
>   int test_error_distributor_create_name(void)
>   {
> @@ -726,6 +861,12 @@ test_distributor(void)
>   				goto err;
>   			quit_workers(&worker_params, p);
>   
> +			rte_eal_mp_remote_launch(handle_and_mark_work,
> +					&worker_params, SKIP_MASTER);
> +			if (sanity_mark_test(&worker_params, p) < 0)
> +				goto err;
> +			quit_workers(&worker_params, p);
> +
>   		} else {
>   			printf("Too few cores to run worker shutdown test\n");
>   		}


Checking the flows go to the correct workers is areally good test to 
have. Thanks for the effort here.

Apart from the comment indentation nit: the rest looks good to me.

Acked-by: David Hunt <david.hunt@intel.com>
  
Lukasz Wojciechowski Oct. 9, 2020, 9:12 p.m. UTC | #2
Hi David,

W dniu 09.10.2020 o 14:50, David Hunt pisze:
> Hi Lukasz,
>
> On 8/10/2020 6:23 AM, Lukasz Wojciechowski wrote:
>> All of the former tests analyzed only statistics
>> of packets processed by all workers.
>> The new test verifies also if packets are processed
>> on workers as expected.
>> Every packets processed by the worker is marked
>> and analyzed after it is returned to distributor.
>>
>> This test allows finding issues in matching algorithms.
>>
>> Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
>> ---
>>   app/test/test_distributor.c | 141 ++++++++++++++++++++++++++++++++++++
>>   1 file changed, 141 insertions(+)
>>
>> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
>> index 1e0a079ff..0404e463a 100644
>> --- a/app/test/test_distributor.c
>> +++ b/app/test/test_distributor.c
>> @@ -542,6 +542,141 @@ test_flush_with_worker_shutdown(struct 
>> worker_params *wp,
>>       return 0;
>>   }
>>   +static int
>> +handle_and_mark_work(void *arg)
>> +{
>> +    struct rte_mbuf *buf[8] __rte_cache_aligned;
>> +    struct worker_params *wp = arg;
>> +    struct rte_distributor *db = wp->dist;
>> +    unsigned int num, i;
>> +    unsigned int id = __atomic_fetch_add(&worker_idx, 1, 
>> __ATOMIC_RELAXED);
>> +    num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
>> +    while (!quit) {
>> + __atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> +                __ATOMIC_ACQ_REL);
>> +        for (i = 0; i < num; i++)
>> +            buf[i]->udata64 += id + 1;
>> +        num = rte_distributor_get_pkt(db, id,
>> +                buf, buf, num);
>> +    }
>> +    __atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> +            __ATOMIC_ACQ_REL);
>> +    rte_distributor_return_pkt(db, id, buf, num);
>> +    return 0;
>> +}
>> +
>> +/* sanity_mark_test sends packets to workers which mark them.
>> + * Every packet has also encoded sequence number.
>> + * The returned packets are sorted and verified if they were handled
>> + * by proper workers.
>> + */
>> +static int
>> +sanity_mark_test(struct worker_params *wp, struct rte_mempool *p)
>> +{
>> +    const unsigned int buf_count = 24;
>> +    const unsigned int burst = 8;
>> +    const unsigned int shift = 12;
>> +    const unsigned int seq_shift = 10;
>> +
>> +    struct rte_distributor *db = wp->dist;
>> +    struct rte_mbuf *bufs[buf_count];
>> +    struct rte_mbuf *returns[buf_count];
>> +    unsigned int i, count, id;
>> +    unsigned int sorted[buf_count], seq;
>> +    unsigned int failed = 0;
>> +
>> +    printf("=== Marked packets test ===\n");
>> +    clear_packet_count();
>> +    if (rte_mempool_get_bulk(p, (void *)bufs, buf_count) != 0) {
>> +        printf("line %d: Error getting mbufs from pool\n", __LINE__);
>> +        return -1;
>> +    }
>> +
>> +/* bufs' hashes will be like these below, but shifted left.
>> + * The shifting is for avoiding collisions with backlogs
>> + * and in-flight tags left by previous tests.
>> + * [1, 1, 1, 1, 1, 1, 1, 1
>> + *  1, 1, 1, 1, 2, 2, 2, 2
>> + *  2, 2, 2, 2, 1, 1, 1, 1]
>> + */
>
> I would suggest indenting the comments to the same indent level as the 
> code, this
> would make the flow easier to read. Same with additional comments below.
>
Indentation fixed as you suggested will be fixed in v6
>
>> +    for (i = 0; i < burst; i++) {
>> +        bufs[0 * burst + i]->hash.usr = 1 << shift;
>> +        bufs[1 * burst + i]->hash.usr = ((i < burst / 2) ? 1 : 2)
>> +            << shift;
>> +        bufs[2 * burst + i]->hash.usr = ((i < burst / 2) ? 2 : 1)
>> +            << shift;
>> +    }
>> +/* Assign a sequence number to each packet. The sequence is shifted,
>> + * so that lower bits of the udate64 will hold mark from worker.
>> + */
>> +    for (i = 0; i < buf_count; i++)
>> +        bufs[i]->udata64 = i << seq_shift;
>> +
>> +    count = 0;
>> +    for (i = 0; i < buf_count/burst; i++) {
>> +        rte_distributor_process(db, &bufs[i * burst], burst);
>> +        count += rte_distributor_returned_pkts(db, &returns[count],
>> +            buf_count - count);
>> +    }
>> +
>> +    do {
>> +        rte_distributor_flush(db);
>> +        count += rte_distributor_returned_pkts(db, &returns[count],
>> +            buf_count - count);
>> +    } while (count < buf_count);
>> +
>> +    for (i = 0; i < rte_lcore_count() - 1; i++)
>> +        printf("Worker %u handled %u packets\n", i,
>> + __atomic_load_n(&worker_stats[i].handled_packets,
>> +                    __ATOMIC_ACQUIRE));
>> +
>> +/* Sort returned packets by sent order (sequence numbers). */
>> +    for (i = 0; i < buf_count; i++) {
>> +        seq = returns[i]->udata64 >> seq_shift;
>> +        id = returns[i]->udata64 - (seq << seq_shift);
>> +        sorted[seq] = id;
>> +    }
>> +
>> +/* Verify that packets [0-11] and [20-23] were processed
>> + * by the same worker
>> + */
>> +    for (i = 1; i < 12; i++) {
>> +        if (sorted[i] != sorted[0]) {
>> +            printf("Packet number %u processed by worker %u,"
>> +                " but should be processes by worker %u\n",
>> +                i, sorted[i], sorted[0]);
>> +            failed = 1;
>> +        }
>> +    }
>> +    for (i = 20; i < 24; i++) {
>> +        if (sorted[i] != sorted[0]) {
>> +            printf("Packet number %u processed by worker %u,"
>> +                " but should be processes by worker %u\n",
>> +                i, sorted[i], sorted[0]);
>> +            failed = 1;
>> +        }
>> +    }
>> +/* And verify that packets [12-19] were processed
>> + * by the another worker
>> + */
>> +    for (i = 13; i < 20; i++) {
>> +        if (sorted[i] != sorted[12]) {
>> +            printf("Packet number %u processed by worker %u,"
>> +                " but should be processes by worker %u\n",
>> +                i, sorted[i], sorted[12]);
>> +            failed = 1;
>> +        }
>> +    }
>> +
>> +    rte_mempool_put_bulk(p, (void *)bufs, buf_count);
>> +
>> +    if (failed)
>> +        return -1;
>> +
>> +    printf("Marked packets test passed\n");
>> +    return 0;
>> +}
>> +
>>   static
>>   int test_error_distributor_create_name(void)
>>   {
>> @@ -726,6 +861,12 @@ test_distributor(void)
>>                   goto err;
>>               quit_workers(&worker_params, p);
>>   +            rte_eal_mp_remote_launch(handle_and_mark_work,
>> +                    &worker_params, SKIP_MASTER);
>> +            if (sanity_mark_test(&worker_params, p) < 0)
>> +                goto err;
>> +            quit_workers(&worker_params, p);
>> +
>>           } else {
>>               printf("Too few cores to run worker shutdown test\n");
>>           }
>
>
> Checking the flows go to the correct workers is areally good test to 
> have. Thanks for the effort here.
>
> Apart from the comment indentation nit: the rest looks good to me.
>
> Acked-by: David Hunt <david.hunt@intel.com>

Thanks

Lukasz

>
>
>
  

Patch

diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
index 1e0a079ff..0404e463a 100644
--- a/app/test/test_distributor.c
+++ b/app/test/test_distributor.c
@@ -542,6 +542,141 @@  test_flush_with_worker_shutdown(struct worker_params *wp,
 	return 0;
 }
 
+static int
+handle_and_mark_work(void *arg)
+{
+	struct rte_mbuf *buf[8] __rte_cache_aligned;
+	struct worker_params *wp = arg;
+	struct rte_distributor *db = wp->dist;
+	unsigned int num, i;
+	unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
+	num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
+	while (!quit) {
+		__atomic_fetch_add(&worker_stats[id].handled_packets, num,
+				__ATOMIC_ACQ_REL);
+		for (i = 0; i < num; i++)
+			buf[i]->udata64 += id + 1;
+		num = rte_distributor_get_pkt(db, id,
+				buf, buf, num);
+	}
+	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
+			__ATOMIC_ACQ_REL);
+	rte_distributor_return_pkt(db, id, buf, num);
+	return 0;
+}
+
+/* sanity_mark_test sends packets to workers which mark them.
+ * Every packet has also encoded sequence number.
+ * The returned packets are sorted and verified if they were handled
+ * by proper workers.
+ */
+static int
+sanity_mark_test(struct worker_params *wp, struct rte_mempool *p)
+{
+	const unsigned int buf_count = 24;
+	const unsigned int burst = 8;
+	const unsigned int shift = 12;
+	const unsigned int seq_shift = 10;
+
+	struct rte_distributor *db = wp->dist;
+	struct rte_mbuf *bufs[buf_count];
+	struct rte_mbuf *returns[buf_count];
+	unsigned int i, count, id;
+	unsigned int sorted[buf_count], seq;
+	unsigned int failed = 0;
+
+	printf("=== Marked packets test ===\n");
+	clear_packet_count();
+	if (rte_mempool_get_bulk(p, (void *)bufs, buf_count) != 0) {
+		printf("line %d: Error getting mbufs from pool\n", __LINE__);
+		return -1;
+	}
+
+/* bufs' hashes will be like these below, but shifted left.
+ * The shifting is for avoiding collisions with backlogs
+ * and in-flight tags left by previous tests.
+ * [1, 1, 1, 1, 1, 1, 1, 1
+ *  1, 1, 1, 1, 2, 2, 2, 2
+ *  2, 2, 2, 2, 1, 1, 1, 1]
+ */
+	for (i = 0; i < burst; i++) {
+		bufs[0 * burst + i]->hash.usr = 1 << shift;
+		bufs[1 * burst + i]->hash.usr = ((i < burst / 2) ? 1 : 2)
+			<< shift;
+		bufs[2 * burst + i]->hash.usr = ((i < burst / 2) ? 2 : 1)
+			<< shift;
+	}
+/* Assign a sequence number to each packet. The sequence is shifted,
+ * so that lower bits of the udate64 will hold mark from worker.
+ */
+	for (i = 0; i < buf_count; i++)
+		bufs[i]->udata64 = i << seq_shift;
+
+	count = 0;
+	for (i = 0; i < buf_count/burst; i++) {
+		rte_distributor_process(db, &bufs[i * burst], burst);
+		count += rte_distributor_returned_pkts(db, &returns[count],
+			buf_count - count);
+	}
+
+	do {
+		rte_distributor_flush(db);
+		count += rte_distributor_returned_pkts(db, &returns[count],
+			buf_count - count);
+	} while (count < buf_count);
+
+	for (i = 0; i < rte_lcore_count() - 1; i++)
+		printf("Worker %u handled %u packets\n", i,
+			__atomic_load_n(&worker_stats[i].handled_packets,
+					__ATOMIC_ACQUIRE));
+
+/* Sort returned packets by sent order (sequence numbers). */
+	for (i = 0; i < buf_count; i++) {
+		seq = returns[i]->udata64 >> seq_shift;
+		id = returns[i]->udata64 - (seq << seq_shift);
+		sorted[seq] = id;
+	}
+
+/* Verify that packets [0-11] and [20-23] were processed
+ * by the same worker
+ */
+	for (i = 1; i < 12; i++) {
+		if (sorted[i] != sorted[0]) {
+			printf("Packet number %u processed by worker %u,"
+				" but should be processes by worker %u\n",
+				i, sorted[i], sorted[0]);
+			failed = 1;
+		}
+	}
+	for (i = 20; i < 24; i++) {
+		if (sorted[i] != sorted[0]) {
+			printf("Packet number %u processed by worker %u,"
+				" but should be processes by worker %u\n",
+				i, sorted[i], sorted[0]);
+			failed = 1;
+		}
+	}
+/* And verify that packets [12-19] were processed
+ * by the another worker
+ */
+	for (i = 13; i < 20; i++) {
+		if (sorted[i] != sorted[12]) {
+			printf("Packet number %u processed by worker %u,"
+				" but should be processes by worker %u\n",
+				i, sorted[i], sorted[12]);
+			failed = 1;
+		}
+	}
+
+	rte_mempool_put_bulk(p, (void *)bufs, buf_count);
+
+	if (failed)
+		return -1;
+
+	printf("Marked packets test passed\n");
+	return 0;
+}
+
 static
 int test_error_distributor_create_name(void)
 {
@@ -726,6 +861,12 @@  test_distributor(void)
 				goto err;
 			quit_workers(&worker_params, p);
 
+			rte_eal_mp_remote_launch(handle_and_mark_work,
+					&worker_params, SKIP_MASTER);
+			if (sanity_mark_test(&worker_params, p) < 0)
+				goto err;
+			quit_workers(&worker_params, p);
+
 		} else {
 			printf("Too few cores to run worker shutdown test\n");
 		}