[v7,08/16] test/distributor: fix freeing mbufs

Message ID 20201010160508.19709-9-l.wojciechow@partner.samsung.com (mailing list archive)
State Superseded, archived
Delegated to: David Marchand
Headers
Series fix distributor synchronization issues |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Lukasz Wojciechowski Oct. 10, 2020, 4:04 p.m. UTC
  Sanity tests with mbuf alloc and shutdown tests assume that
mbufs passed to worker cores are freed in handlers.
Such packets should not be returned to the distributor's main
core. The only packets that should be returned are the packets
send after completion of the tests in quit_workers function.

This patch stops returning mbufs to distributor's core.
In case of shutdown tests it is impossible to determine
how worker and distributor threads would synchronize.
Packets used by tests should be freed and packets used during
quit_workers() shouldn't. That's why returning mbufs to mempool
is moved to test procedure run on distributor thread
from worker threads.

Additionally this patch cleans up unused variables.

Fixes: c0de0eb82e40 ("distributor: switch over to new API")
Cc: david.hunt@intel.com
Cc: stable@dpdk.org

Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Acked-by: David Hunt <david.hunt@intel.com>
---
 app/test/test_distributor.c | 96 ++++++++++++++++++-------------------
 1 file changed, 47 insertions(+), 49 deletions(-)
  

Comments

Honnappa Nagarahalli Oct. 16, 2020, 5:12 a.m. UTC | #1
<snip>

> 
> Sanity tests with mbuf alloc and shutdown tests assume that mbufs passed
> to worker cores are freed in handlers.
> Such packets should not be returned to the distributor's main core. The only
> packets that should be returned are the packets send after completion of
> the tests in quit_workers function.
> 
> This patch stops returning mbufs to distributor's core.
> In case of shutdown tests it is impossible to determine how worker and
> distributor threads would synchronize.
> Packets used by tests should be freed and packets used during
> quit_workers() shouldn't. That's why returning mbufs to mempool is moved
> to test procedure run on distributor thread from worker threads.
> 
> Additionally this patch cleans up unused variables.
> 
> Fixes: c0de0eb82e40 ("distributor: switch over to new API")
> Cc: david.hunt@intel.com
> Cc: stable@dpdk.org
> 
> Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
> Acked-by: David Hunt <david.hunt@intel.com>
> ---
>  app/test/test_distributor.c | 96 ++++++++++++++++++-------------------
>  1 file changed, 47 insertions(+), 49 deletions(-)
> 
> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index
> 838459392..06e01ff9d 100644
> --- a/app/test/test_distributor.c
> +++ b/app/test/test_distributor.c
> @@ -44,7 +44,7 @@ total_packet_count(void)
>  	unsigned i, count = 0;
>  	for (i = 0; i < worker_idx; i++)
>  		count +=
> __atomic_load_n(&worker_stats[i].handled_packets,
> -				__ATOMIC_ACQUIRE);
> +				__ATOMIC_RELAXED);
I think it is better to make this and other statistics changes below in commit 6/16. It will be in line with the commit log as well.

>  	return count;
>  }
> 
> @@ -55,7 +55,7 @@ clear_packet_count(void)
>  	unsigned int i;
>  	for (i = 0; i < RTE_MAX_LCORE; i++)
>  		__atomic_store_n(&worker_stats[i].handled_packets, 0,
> -			__ATOMIC_RELEASE);
> +			__ATOMIC_RELAXED);
>  }
> 
>  /* this is the basic worker function for sanity test @@ -67,20 +67,18 @@
> handle_work(void *arg)
>  	struct rte_mbuf *buf[8] __rte_cache_aligned;
>  	struct worker_params *wp = arg;
>  	struct rte_distributor *db = wp->dist;
> -	unsigned int count = 0, num;
> +	unsigned int num;
>  	unsigned int id = __atomic_fetch_add(&worker_idx, 1,
> __ATOMIC_RELAXED);
> 
>  	num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
>  	while (!quit) {
>  		__atomic_fetch_add(&worker_stats[id].handled_packets,
> num,
> -				__ATOMIC_ACQ_REL);
> -		count += num;
> +				__ATOMIC_RELAXED);
>  		num = rte_distributor_get_pkt(db, id,
>  				buf, buf, num);
>  	}
>  	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
> -			__ATOMIC_ACQ_REL);
> -	count += num;
> +			__ATOMIC_RELAXED);
>  	rte_distributor_return_pkt(db, id, buf, num);
>  	return 0;
>  }
> @@ -136,7 +134,7 @@ sanity_test(struct worker_params *wp, struct
> rte_mempool *p)
>  	for (i = 0; i < rte_lcore_count() - 1; i++)
>  		printf("Worker %u handled %u packets\n", i,
>  			__atomic_load_n(&worker_stats[i].handled_packets,
> -					__ATOMIC_ACQUIRE));
> +					__ATOMIC_RELAXED));
>  	printf("Sanity test with all zero hashes done.\n");
> 
>  	/* pick two flows and check they go correctly */ @@ -163,7 +161,7
> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
>  			printf("Worker %u handled %u packets\n", i,
>  				__atomic_load_n(
>  					&worker_stats[i].handled_packets,
> -					__ATOMIC_ACQUIRE));
> +					__ATOMIC_RELAXED));
>  		printf("Sanity test with two hash values done\n");
>  	}
> 
> @@ -190,7 +188,7 @@ sanity_test(struct worker_params *wp, struct
> rte_mempool *p)
>  	for (i = 0; i < rte_lcore_count() - 1; i++)
>  		printf("Worker %u handled %u packets\n", i,
>  			__atomic_load_n(&worker_stats[i].handled_packets,
> -					__ATOMIC_ACQUIRE));
> +					__ATOMIC_RELAXED));
>  	printf("Sanity test with non-zero hashes done\n");
> 
>  	rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -276,23
> +274,20 @@ handle_work_with_free_mbufs(void *arg)
>  	struct rte_mbuf *buf[8] __rte_cache_aligned;
>  	struct worker_params *wp = arg;
>  	struct rte_distributor *d = wp->dist;
> -	unsigned int count = 0;
>  	unsigned int i;
>  	unsigned int num;
>  	unsigned int id = __atomic_fetch_add(&worker_idx, 1,
> __ATOMIC_RELAXED);
> 
>  	num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>  	while (!quit) {
> -		count += num;
>  		__atomic_fetch_add(&worker_stats[id].handled_packets,
> num,
> -				__ATOMIC_ACQ_REL);
> +				__ATOMIC_RELAXED);
>  		for (i = 0; i < num; i++)
>  			rte_pktmbuf_free(buf[i]);
>  		num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>  	}
> -	count += num;
>  	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
> -			__ATOMIC_ACQ_REL);
> +			__ATOMIC_RELAXED);
>  	rte_distributor_return_pkt(d, id, buf, num);
>  	return 0;
>  }
> @@ -318,7 +313,6 @@ sanity_test_with_mbuf_alloc(struct worker_params
> *wp, struct rte_mempool *p)
>  			rte_distributor_process(d, NULL, 0);
>  		for (j = 0; j < BURST; j++) {
>  			bufs[j]->hash.usr = (i+j) << 1;
> -			rte_mbuf_refcnt_set(bufs[j], 1);
>  		}
> 
>  		rte_distributor_process(d, bufs, BURST); @@ -342,15 +336,10
> @@ sanity_test_with_mbuf_alloc(struct worker_params *wp, struct
> rte_mempool *p)  static int  handle_work_for_shutdown_test(void *arg)  {
> -	struct rte_mbuf *pkt = NULL;
>  	struct rte_mbuf *buf[8] __rte_cache_aligned;
>  	struct worker_params *wp = arg;
>  	struct rte_distributor *d = wp->dist;
> -	unsigned int count = 0;
>  	unsigned int num;
> -	unsigned int total = 0;
> -	unsigned int i;
> -	unsigned int returned = 0;
>  	unsigned int zero_id = 0;
>  	unsigned int zero_unset;
>  	const unsigned int id = __atomic_fetch_add(&worker_idx, 1, @@ -
> 368,11 +357,8 @@ handle_work_for_shutdown_test(void *arg)
>  	/* wait for quit single globally, or for worker zero, wait
>  	 * for zero_quit */
>  	while (!quit && !(id == zero_id && zero_quit)) {
> -		count += num;
>  		__atomic_fetch_add(&worker_stats[id].handled_packets,
> num,
> -				__ATOMIC_ACQ_REL);
> -		for (i = 0; i < num; i++)
> -			rte_pktmbuf_free(buf[i]);
> +				__ATOMIC_RELAXED);
>  		num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
> 
>  		if (num > 0) {
> @@ -381,15 +367,12 @@ handle_work_for_shutdown_test(void *arg)
>  				false, __ATOMIC_ACQ_REL,
> __ATOMIC_ACQUIRE);
>  		}
>  		zero_id = __atomic_load_n(&zero_idx,
> __ATOMIC_ACQUIRE);
> -
> -		total += num;
>  	}
> -	count += num;
> -	returned = rte_distributor_return_pkt(d, id, buf, num);
> -
>  	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
> -			__ATOMIC_ACQ_REL);
> +			__ATOMIC_RELAXED);
>  	if (id == zero_id) {
> +		rte_distributor_return_pkt(d, id, NULL, 0);
> +
>  		/* for worker zero, allow it to restart to pick up last packet
>  		 * when all workers are shutting down.
>  		 */
> @@ -400,15 +383,11 @@ handle_work_for_shutdown_test(void *arg)
> 
>  		while (!quit) {
> 
> 	__atomic_fetch_add(&worker_stats[id].handled_packets,
> -					num, __ATOMIC_ACQ_REL);
> -			count += num;
> -			rte_pktmbuf_free(pkt);
> +					num, __ATOMIC_RELAXED);
>  			num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>  		}
> -		returned = rte_distributor_return_pkt(d,
> -				id, buf, num);
> -		printf("Num returned = %d\n", returned);
>  	}
> +	rte_distributor_return_pkt(d, id, buf, num);
>  	return 0;
>  }
> 
> @@ -424,7 +403,9 @@ sanity_test_with_worker_shutdown(struct
> worker_params *wp,  {
>  	struct rte_distributor *d = wp->dist;
>  	struct rte_mbuf *bufs[BURST];
> -	unsigned i;
> +	struct rte_mbuf *bufs2[BURST];
> +	unsigned int i;
> +	unsigned int failed = 0;
> 
>  	printf("=== Sanity test of worker shutdown ===\n");
> 
> @@ -450,16 +431,17 @@ sanity_test_with_worker_shutdown(struct
> worker_params *wp,
>  	 */
> 
>  	/* get more buffers to queue up, again setting them to the same
> flow */
> -	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
> +	if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) {
>  		printf("line %d: Error getting mbufs from pool\n", __LINE__);
> +		rte_mempool_put_bulk(p, (void *)bufs, BURST);
>  		return -1;
>  	}
>  	for (i = 0; i < BURST; i++)
> -		bufs[i]->hash.usr = 1;
> +		bufs2[i]->hash.usr = 1;
> 
>  	/* get worker zero to quit */
>  	zero_quit = 1;
> -	rte_distributor_process(d, bufs, BURST);
> +	rte_distributor_process(d, bufs2, BURST);
> 
>  	/* flush the distributor */
>  	rte_distributor_flush(d);
> @@ -468,15 +450,21 @@ sanity_test_with_worker_shutdown(struct
> worker_params *wp,
>  	for (i = 0; i < rte_lcore_count() - 1; i++)
>  		printf("Worker %u handled %u packets\n", i,
>  			__atomic_load_n(&worker_stats[i].handled_packets,
> -					__ATOMIC_ACQUIRE));
> +					__ATOMIC_RELAXED));
> 
>  	if (total_packet_count() != BURST * 2) {
>  		printf("Line %d: Error, not all packets flushed. "
>  				"Expected %u, got %u\n",
>  				__LINE__, BURST * 2, total_packet_count());
> -		return -1;
> +		failed = 1;
>  	}
> 
> +	rte_mempool_put_bulk(p, (void *)bufs, BURST);
> +	rte_mempool_put_bulk(p, (void *)bufs2, BURST);
> +
> +	if (failed)
> +		return -1;
> +
>  	printf("Sanity test with worker shutdown passed\n\n");
>  	return 0;
>  }
> @@ -490,7 +478,8 @@ test_flush_with_worker_shutdown(struct
> worker_params *wp,  {
>  	struct rte_distributor *d = wp->dist;
>  	struct rte_mbuf *bufs[BURST];
> -	unsigned i;
> +	unsigned int i;
> +	unsigned int failed = 0;
> 
>  	printf("=== Test flush fn with worker shutdown (%s) ===\n", wp-
> >name);
> 
> @@ -522,15 +511,20 @@ test_flush_with_worker_shutdown(struct
> worker_params *wp,
>  	for (i = 0; i < rte_lcore_count() - 1; i++)
>  		printf("Worker %u handled %u packets\n", i,
>  			__atomic_load_n(&worker_stats[i].handled_packets,
> -					__ATOMIC_ACQUIRE));
> +					__ATOMIC_RELAXED));
> 
>  	if (total_packet_count() != BURST) {
>  		printf("Line %d: Error, not all packets flushed. "
>  				"Expected %u, got %u\n",
>  				__LINE__, BURST, total_packet_count());
> -		return -1;
> +		failed = 1;
>  	}
> 
> +	rte_mempool_put_bulk(p, (void *)bufs, BURST);
> +
> +	if (failed)
> +		return -1;
> +
>  	printf("Flush test with worker shutdown passed\n\n");
>  	return 0;
>  }
> @@ -596,7 +590,10 @@ quit_workers(struct worker_params *wp, struct
> rte_mempool *p)
>  	const unsigned num_workers = rte_lcore_count() - 1;
>  	unsigned i;
>  	struct rte_mbuf *bufs[RTE_MAX_LCORE];
> -	rte_mempool_get_bulk(p, (void *)bufs, num_workers);
> +	if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
> +		printf("line %d: Error getting mbufs from pool\n", __LINE__);
> +		return;
> +	}
> 
>  	zero_quit = 0;
>  	quit = 1;
> @@ -604,11 +601,12 @@ quit_workers(struct worker_params *wp, struct
> rte_mempool *p)
>  		bufs[i]->hash.usr = i << 1;
>  	rte_distributor_process(d, bufs, num_workers);
> 
> -	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
> -
>  	rte_distributor_process(d, NULL, 0);
>  	rte_distributor_flush(d);
>  	rte_eal_mp_wait_lcore();
> +
> +	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
> +
>  	quit = 0;
>  	worker_idx = 0;
>  	zero_idx = RTE_MAX_LCORE;
> --
> 2.17.1
  
Lukasz Wojciechowski Oct. 17, 2020, 3:28 a.m. UTC | #2
Hi Honnappa,

W dniu 16.10.2020 o 07:12, Honnappa Nagarahalli pisze:
> <snip>
>
>> Sanity tests with mbuf alloc and shutdown tests assume that mbufs passed
>> to worker cores are freed in handlers.
>> Such packets should not be returned to the distributor's main core. The only
>> packets that should be returned are the packets send after completion of
>> the tests in quit_workers function.
>>
>> This patch stops returning mbufs to distributor's core.
>> In case of shutdown tests it is impossible to determine how worker and
>> distributor threads would synchronize.
>> Packets used by tests should be freed and packets used during
>> quit_workers() shouldn't. That's why returning mbufs to mempool is moved
>> to test procedure run on distributor thread from worker threads.
>>
>> Additionally this patch cleans up unused variables.
>>
>> Fixes: c0de0eb82e40 ("distributor: switch over to new API")
>> Cc: david.hunt@intel.com
>> Cc: stable@dpdk.org
>>
>> Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
>> Acked-by: David Hunt <david.hunt@intel.com>
>> ---
>>   app/test/test_distributor.c | 96 ++++++++++++++++++-------------------
>>   1 file changed, 47 insertions(+), 49 deletions(-)
>>
>> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index
>> 838459392..06e01ff9d 100644
>> --- a/app/test/test_distributor.c
>> +++ b/app/test/test_distributor.c
>> @@ -44,7 +44,7 @@ total_packet_count(void)
>>   	unsigned i, count = 0;
>>   	for (i = 0; i < worker_idx; i++)
>>   		count +=
>> __atomic_load_n(&worker_stats[i].handled_packets,
>> -				__ATOMIC_ACQUIRE);
>> +				__ATOMIC_RELAXED);
> I think it is better to make this and other statistics changes below in commit 6/16. It will be in line with the commit log as well.
I changed the order of patches to avoid duplicated changes in the code.
>
>>   	return count;
>>   }
>>
>> @@ -55,7 +55,7 @@ clear_packet_count(void)
>>   	unsigned int i;
>>   	for (i = 0; i < RTE_MAX_LCORE; i++)
>>   		__atomic_store_n(&worker_stats[i].handled_packets, 0,
>> -			__ATOMIC_RELEASE);
>> +			__ATOMIC_RELAXED);
>>   }
>>
>>   /* this is the basic worker function for sanity test @@ -67,20 +67,18 @@
>> handle_work(void *arg)
>>   	struct rte_mbuf *buf[8] __rte_cache_aligned;
>>   	struct worker_params *wp = arg;
>>   	struct rte_distributor *db = wp->dist;
>> -	unsigned int count = 0, num;
>> +	unsigned int num;
>>   	unsigned int id = __atomic_fetch_add(&worker_idx, 1,
>> __ATOMIC_RELAXED);
>>
>>   	num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
>>   	while (!quit) {
>>   		__atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> -				__ATOMIC_ACQ_REL);
>> -		count += num;
>> +				__ATOMIC_RELAXED);
>>   		num = rte_distributor_get_pkt(db, id,
>>   				buf, buf, num);
>>   	}
>>   	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> -			__ATOMIC_ACQ_REL);
>> -	count += num;
>> +			__ATOMIC_RELAXED);
>>   	rte_distributor_return_pkt(db, id, buf, num);
>>   	return 0;
>>   }
>> @@ -136,7 +134,7 @@ sanity_test(struct worker_params *wp, struct
>> rte_mempool *p)
>>   	for (i = 0; i < rte_lcore_count() - 1; i++)
>>   		printf("Worker %u handled %u packets\n", i,
>>   			__atomic_load_n(&worker_stats[i].handled_packets,
>> -					__ATOMIC_ACQUIRE));
>> +					__ATOMIC_RELAXED));
>>   	printf("Sanity test with all zero hashes done.\n");
>>
>>   	/* pick two flows and check they go correctly */ @@ -163,7 +161,7
>> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
>>   			printf("Worker %u handled %u packets\n", i,
>>   				__atomic_load_n(
>>   					&worker_stats[i].handled_packets,
>> -					__ATOMIC_ACQUIRE));
>> +					__ATOMIC_RELAXED));
>>   		printf("Sanity test with two hash values done\n");
>>   	}
>>
>> @@ -190,7 +188,7 @@ sanity_test(struct worker_params *wp, struct
>> rte_mempool *p)
>>   	for (i = 0; i < rte_lcore_count() - 1; i++)
>>   		printf("Worker %u handled %u packets\n", i,
>>   			__atomic_load_n(&worker_stats[i].handled_packets,
>> -					__ATOMIC_ACQUIRE));
>> +					__ATOMIC_RELAXED));
>>   	printf("Sanity test with non-zero hashes done\n");
>>
>>   	rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -276,23
>> +274,20 @@ handle_work_with_free_mbufs(void *arg)
>>   	struct rte_mbuf *buf[8] __rte_cache_aligned;
>>   	struct worker_params *wp = arg;
>>   	struct rte_distributor *d = wp->dist;
>> -	unsigned int count = 0;
>>   	unsigned int i;
>>   	unsigned int num;
>>   	unsigned int id = __atomic_fetch_add(&worker_idx, 1,
>> __ATOMIC_RELAXED);
>>
>>   	num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>>   	while (!quit) {
>> -		count += num;
>>   		__atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> -				__ATOMIC_ACQ_REL);
>> +				__ATOMIC_RELAXED);
>>   		for (i = 0; i < num; i++)
>>   			rte_pktmbuf_free(buf[i]);
>>   		num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>>   	}
>> -	count += num;
>>   	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> -			__ATOMIC_ACQ_REL);
>> +			__ATOMIC_RELAXED);
>>   	rte_distributor_return_pkt(d, id, buf, num);
>>   	return 0;
>>   }
>> @@ -318,7 +313,6 @@ sanity_test_with_mbuf_alloc(struct worker_params
>> *wp, struct rte_mempool *p)
>>   			rte_distributor_process(d, NULL, 0);
>>   		for (j = 0; j < BURST; j++) {
>>   			bufs[j]->hash.usr = (i+j) << 1;
>> -			rte_mbuf_refcnt_set(bufs[j], 1);
>>   		}
>>
>>   		rte_distributor_process(d, bufs, BURST); @@ -342,15 +336,10
>> @@ sanity_test_with_mbuf_alloc(struct worker_params *wp, struct
>> rte_mempool *p)  static int  handle_work_for_shutdown_test(void *arg)  {
>> -	struct rte_mbuf *pkt = NULL;
>>   	struct rte_mbuf *buf[8] __rte_cache_aligned;
>>   	struct worker_params *wp = arg;
>>   	struct rte_distributor *d = wp->dist;
>> -	unsigned int count = 0;
>>   	unsigned int num;
>> -	unsigned int total = 0;
>> -	unsigned int i;
>> -	unsigned int returned = 0;
>>   	unsigned int zero_id = 0;
>>   	unsigned int zero_unset;
>>   	const unsigned int id = __atomic_fetch_add(&worker_idx, 1, @@ -
>> 368,11 +357,8 @@ handle_work_for_shutdown_test(void *arg)
>>   	/* wait for quit single globally, or for worker zero, wait
>>   	 * for zero_quit */
>>   	while (!quit && !(id == zero_id && zero_quit)) {
>> -		count += num;
>>   		__atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> -				__ATOMIC_ACQ_REL);
>> -		for (i = 0; i < num; i++)
>> -			rte_pktmbuf_free(buf[i]);
>> +				__ATOMIC_RELAXED);
>>   		num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>>
>>   		if (num > 0) {
>> @@ -381,15 +367,12 @@ handle_work_for_shutdown_test(void *arg)
>>   				false, __ATOMIC_ACQ_REL,
>> __ATOMIC_ACQUIRE);
>>   		}
>>   		zero_id = __atomic_load_n(&zero_idx,
>> __ATOMIC_ACQUIRE);
>> -
>> -		total += num;
>>   	}
>> -	count += num;
>> -	returned = rte_distributor_return_pkt(d, id, buf, num);
>> -
>>   	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> -			__ATOMIC_ACQ_REL);
>> +			__ATOMIC_RELAXED);
>>   	if (id == zero_id) {
>> +		rte_distributor_return_pkt(d, id, NULL, 0);
>> +
>>   		/* for worker zero, allow it to restart to pick up last packet
>>   		 * when all workers are shutting down.
>>   		 */
>> @@ -400,15 +383,11 @@ handle_work_for_shutdown_test(void *arg)
>>
>>   		while (!quit) {
>>
>> 	__atomic_fetch_add(&worker_stats[id].handled_packets,
>> -					num, __ATOMIC_ACQ_REL);
>> -			count += num;
>> -			rte_pktmbuf_free(pkt);
>> +					num, __ATOMIC_RELAXED);
>>   			num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
>>   		}
>> -		returned = rte_distributor_return_pkt(d,
>> -				id, buf, num);
>> -		printf("Num returned = %d\n", returned);
>>   	}
>> +	rte_distributor_return_pkt(d, id, buf, num);
>>   	return 0;
>>   }
>>
>> @@ -424,7 +403,9 @@ sanity_test_with_worker_shutdown(struct
>> worker_params *wp,  {
>>   	struct rte_distributor *d = wp->dist;
>>   	struct rte_mbuf *bufs[BURST];
>> -	unsigned i;
>> +	struct rte_mbuf *bufs2[BURST];
>> +	unsigned int i;
>> +	unsigned int failed = 0;
>>
>>   	printf("=== Sanity test of worker shutdown ===\n");
>>
>> @@ -450,16 +431,17 @@ sanity_test_with_worker_shutdown(struct
>> worker_params *wp,
>>   	 */
>>
>>   	/* get more buffers to queue up, again setting them to the same
>> flow */
>> -	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
>> +	if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) {
>>   		printf("line %d: Error getting mbufs from pool\n", __LINE__);
>> +		rte_mempool_put_bulk(p, (void *)bufs, BURST);
>>   		return -1;
>>   	}
>>   	for (i = 0; i < BURST; i++)
>> -		bufs[i]->hash.usr = 1;
>> +		bufs2[i]->hash.usr = 1;
>>
>>   	/* get worker zero to quit */
>>   	zero_quit = 1;
>> -	rte_distributor_process(d, bufs, BURST);
>> +	rte_distributor_process(d, bufs2, BURST);
>>
>>   	/* flush the distributor */
>>   	rte_distributor_flush(d);
>> @@ -468,15 +450,21 @@ sanity_test_with_worker_shutdown(struct
>> worker_params *wp,
>>   	for (i = 0; i < rte_lcore_count() - 1; i++)
>>   		printf("Worker %u handled %u packets\n", i,
>>   			__atomic_load_n(&worker_stats[i].handled_packets,
>> -					__ATOMIC_ACQUIRE));
>> +					__ATOMIC_RELAXED));
>>
>>   	if (total_packet_count() != BURST * 2) {
>>   		printf("Line %d: Error, not all packets flushed. "
>>   				"Expected %u, got %u\n",
>>   				__LINE__, BURST * 2, total_packet_count());
>> -		return -1;
>> +		failed = 1;
>>   	}
>>
>> +	rte_mempool_put_bulk(p, (void *)bufs, BURST);
>> +	rte_mempool_put_bulk(p, (void *)bufs2, BURST);
>> +
>> +	if (failed)
>> +		return -1;
>> +
>>   	printf("Sanity test with worker shutdown passed\n\n");
>>   	return 0;
>>   }
>> @@ -490,7 +478,8 @@ test_flush_with_worker_shutdown(struct
>> worker_params *wp,  {
>>   	struct rte_distributor *d = wp->dist;
>>   	struct rte_mbuf *bufs[BURST];
>> -	unsigned i;
>> +	unsigned int i;
>> +	unsigned int failed = 0;
>>
>>   	printf("=== Test flush fn with worker shutdown (%s) ===\n", wp-
>>> name);
>> @@ -522,15 +511,20 @@ test_flush_with_worker_shutdown(struct
>> worker_params *wp,
>>   	for (i = 0; i < rte_lcore_count() - 1; i++)
>>   		printf("Worker %u handled %u packets\n", i,
>>   			__atomic_load_n(&worker_stats[i].handled_packets,
>> -					__ATOMIC_ACQUIRE));
>> +					__ATOMIC_RELAXED));
>>
>>   	if (total_packet_count() != BURST) {
>>   		printf("Line %d: Error, not all packets flushed. "
>>   				"Expected %u, got %u\n",
>>   				__LINE__, BURST, total_packet_count());
>> -		return -1;
>> +		failed = 1;
>>   	}
>>
>> +	rte_mempool_put_bulk(p, (void *)bufs, BURST);
>> +
>> +	if (failed)
>> +		return -1;
>> +
>>   	printf("Flush test with worker shutdown passed\n\n");
>>   	return 0;
>>   }
>> @@ -596,7 +590,10 @@ quit_workers(struct worker_params *wp, struct
>> rte_mempool *p)
>>   	const unsigned num_workers = rte_lcore_count() - 1;
>>   	unsigned i;
>>   	struct rte_mbuf *bufs[RTE_MAX_LCORE];
>> -	rte_mempool_get_bulk(p, (void *)bufs, num_workers);
>> +	if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
>> +		printf("line %d: Error getting mbufs from pool\n", __LINE__);
>> +		return;
>> +	}
>>
>>   	zero_quit = 0;
>>   	quit = 1;
>> @@ -604,11 +601,12 @@ quit_workers(struct worker_params *wp, struct
>> rte_mempool *p)
>>   		bufs[i]->hash.usr = i << 1;
>>   	rte_distributor_process(d, bufs, num_workers);
>>
>> -	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
>> -
>>   	rte_distributor_process(d, NULL, 0);
>>   	rte_distributor_flush(d);
>>   	rte_eal_mp_wait_lcore();
>> +
>> +	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
>> +
>>   	quit = 0;
>>   	worker_idx = 0;
>>   	zero_idx = RTE_MAX_LCORE;
>> --
>> 2.17.1
  

Patch

diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
index 838459392..06e01ff9d 100644
--- a/app/test/test_distributor.c
+++ b/app/test/test_distributor.c
@@ -44,7 +44,7 @@  total_packet_count(void)
 	unsigned i, count = 0;
 	for (i = 0; i < worker_idx; i++)
 		count += __atomic_load_n(&worker_stats[i].handled_packets,
-				__ATOMIC_ACQUIRE);
+				__ATOMIC_RELAXED);
 	return count;
 }
 
@@ -55,7 +55,7 @@  clear_packet_count(void)
 	unsigned int i;
 	for (i = 0; i < RTE_MAX_LCORE; i++)
 		__atomic_store_n(&worker_stats[i].handled_packets, 0,
-			__ATOMIC_RELEASE);
+			__ATOMIC_RELAXED);
 }
 
 /* this is the basic worker function for sanity test
@@ -67,20 +67,18 @@  handle_work(void *arg)
 	struct rte_mbuf *buf[8] __rte_cache_aligned;
 	struct worker_params *wp = arg;
 	struct rte_distributor *db = wp->dist;
-	unsigned int count = 0, num;
+	unsigned int num;
 	unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
 
 	num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
 	while (!quit) {
 		__atomic_fetch_add(&worker_stats[id].handled_packets, num,
-				__ATOMIC_ACQ_REL);
-		count += num;
+				__ATOMIC_RELAXED);
 		num = rte_distributor_get_pkt(db, id,
 				buf, buf, num);
 	}
 	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
-			__ATOMIC_ACQ_REL);
-	count += num;
+			__ATOMIC_RELAXED);
 	rte_distributor_return_pkt(db, id, buf, num);
 	return 0;
 }
@@ -136,7 +134,7 @@  sanity_test(struct worker_params *wp, struct rte_mempool *p)
 	for (i = 0; i < rte_lcore_count() - 1; i++)
 		printf("Worker %u handled %u packets\n", i,
 			__atomic_load_n(&worker_stats[i].handled_packets,
-					__ATOMIC_ACQUIRE));
+					__ATOMIC_RELAXED));
 	printf("Sanity test with all zero hashes done.\n");
 
 	/* pick two flows and check they go correctly */
@@ -163,7 +161,7 @@  sanity_test(struct worker_params *wp, struct rte_mempool *p)
 			printf("Worker %u handled %u packets\n", i,
 				__atomic_load_n(
 					&worker_stats[i].handled_packets,
-					__ATOMIC_ACQUIRE));
+					__ATOMIC_RELAXED));
 		printf("Sanity test with two hash values done\n");
 	}
 
@@ -190,7 +188,7 @@  sanity_test(struct worker_params *wp, struct rte_mempool *p)
 	for (i = 0; i < rte_lcore_count() - 1; i++)
 		printf("Worker %u handled %u packets\n", i,
 			__atomic_load_n(&worker_stats[i].handled_packets,
-					__ATOMIC_ACQUIRE));
+					__ATOMIC_RELAXED));
 	printf("Sanity test with non-zero hashes done\n");
 
 	rte_mempool_put_bulk(p, (void *)bufs, BURST);
@@ -276,23 +274,20 @@  handle_work_with_free_mbufs(void *arg)
 	struct rte_mbuf *buf[8] __rte_cache_aligned;
 	struct worker_params *wp = arg;
 	struct rte_distributor *d = wp->dist;
-	unsigned int count = 0;
 	unsigned int i;
 	unsigned int num;
 	unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
 
 	num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
 	while (!quit) {
-		count += num;
 		__atomic_fetch_add(&worker_stats[id].handled_packets, num,
-				__ATOMIC_ACQ_REL);
+				__ATOMIC_RELAXED);
 		for (i = 0; i < num; i++)
 			rte_pktmbuf_free(buf[i]);
 		num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
 	}
-	count += num;
 	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
-			__ATOMIC_ACQ_REL);
+			__ATOMIC_RELAXED);
 	rte_distributor_return_pkt(d, id, buf, num);
 	return 0;
 }
@@ -318,7 +313,6 @@  sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p)
 			rte_distributor_process(d, NULL, 0);
 		for (j = 0; j < BURST; j++) {
 			bufs[j]->hash.usr = (i+j) << 1;
-			rte_mbuf_refcnt_set(bufs[j], 1);
 		}
 
 		rte_distributor_process(d, bufs, BURST);
@@ -342,15 +336,10 @@  sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p)
 static int
 handle_work_for_shutdown_test(void *arg)
 {
-	struct rte_mbuf *pkt = NULL;
 	struct rte_mbuf *buf[8] __rte_cache_aligned;
 	struct worker_params *wp = arg;
 	struct rte_distributor *d = wp->dist;
-	unsigned int count = 0;
 	unsigned int num;
-	unsigned int total = 0;
-	unsigned int i;
-	unsigned int returned = 0;
 	unsigned int zero_id = 0;
 	unsigned int zero_unset;
 	const unsigned int id = __atomic_fetch_add(&worker_idx, 1,
@@ -368,11 +357,8 @@  handle_work_for_shutdown_test(void *arg)
 	/* wait for quit single globally, or for worker zero, wait
 	 * for zero_quit */
 	while (!quit && !(id == zero_id && zero_quit)) {
-		count += num;
 		__atomic_fetch_add(&worker_stats[id].handled_packets, num,
-				__ATOMIC_ACQ_REL);
-		for (i = 0; i < num; i++)
-			rte_pktmbuf_free(buf[i]);
+				__ATOMIC_RELAXED);
 		num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
 
 		if (num > 0) {
@@ -381,15 +367,12 @@  handle_work_for_shutdown_test(void *arg)
 				false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
 		}
 		zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE);
-
-		total += num;
 	}
-	count += num;
-	returned = rte_distributor_return_pkt(d, id, buf, num);
-
 	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
-			__ATOMIC_ACQ_REL);
+			__ATOMIC_RELAXED);
 	if (id == zero_id) {
+		rte_distributor_return_pkt(d, id, NULL, 0);
+
 		/* for worker zero, allow it to restart to pick up last packet
 		 * when all workers are shutting down.
 		 */
@@ -400,15 +383,11 @@  handle_work_for_shutdown_test(void *arg)
 
 		while (!quit) {
 			__atomic_fetch_add(&worker_stats[id].handled_packets,
-					num, __ATOMIC_ACQ_REL);
-			count += num;
-			rte_pktmbuf_free(pkt);
+					num, __ATOMIC_RELAXED);
 			num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
 		}
-		returned = rte_distributor_return_pkt(d,
-				id, buf, num);
-		printf("Num returned = %d\n", returned);
 	}
+	rte_distributor_return_pkt(d, id, buf, num);
 	return 0;
 }
 
@@ -424,7 +403,9 @@  sanity_test_with_worker_shutdown(struct worker_params *wp,
 {
 	struct rte_distributor *d = wp->dist;
 	struct rte_mbuf *bufs[BURST];
-	unsigned i;
+	struct rte_mbuf *bufs2[BURST];
+	unsigned int i;
+	unsigned int failed = 0;
 
 	printf("=== Sanity test of worker shutdown ===\n");
 
@@ -450,16 +431,17 @@  sanity_test_with_worker_shutdown(struct worker_params *wp,
 	 */
 
 	/* get more buffers to queue up, again setting them to the same flow */
-	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+	if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) {
 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
+		rte_mempool_put_bulk(p, (void *)bufs, BURST);
 		return -1;
 	}
 	for (i = 0; i < BURST; i++)
-		bufs[i]->hash.usr = 1;
+		bufs2[i]->hash.usr = 1;
 
 	/* get worker zero to quit */
 	zero_quit = 1;
-	rte_distributor_process(d, bufs, BURST);
+	rte_distributor_process(d, bufs2, BURST);
 
 	/* flush the distributor */
 	rte_distributor_flush(d);
@@ -468,15 +450,21 @@  sanity_test_with_worker_shutdown(struct worker_params *wp,
 	for (i = 0; i < rte_lcore_count() - 1; i++)
 		printf("Worker %u handled %u packets\n", i,
 			__atomic_load_n(&worker_stats[i].handled_packets,
-					__ATOMIC_ACQUIRE));
+					__ATOMIC_RELAXED));
 
 	if (total_packet_count() != BURST * 2) {
 		printf("Line %d: Error, not all packets flushed. "
 				"Expected %u, got %u\n",
 				__LINE__, BURST * 2, total_packet_count());
-		return -1;
+		failed = 1;
 	}
 
+	rte_mempool_put_bulk(p, (void *)bufs, BURST);
+	rte_mempool_put_bulk(p, (void *)bufs2, BURST);
+
+	if (failed)
+		return -1;
+
 	printf("Sanity test with worker shutdown passed\n\n");
 	return 0;
 }
@@ -490,7 +478,8 @@  test_flush_with_worker_shutdown(struct worker_params *wp,
 {
 	struct rte_distributor *d = wp->dist;
 	struct rte_mbuf *bufs[BURST];
-	unsigned i;
+	unsigned int i;
+	unsigned int failed = 0;
 
 	printf("=== Test flush fn with worker shutdown (%s) ===\n", wp->name);
 
@@ -522,15 +511,20 @@  test_flush_with_worker_shutdown(struct worker_params *wp,
 	for (i = 0; i < rte_lcore_count() - 1; i++)
 		printf("Worker %u handled %u packets\n", i,
 			__atomic_load_n(&worker_stats[i].handled_packets,
-					__ATOMIC_ACQUIRE));
+					__ATOMIC_RELAXED));
 
 	if (total_packet_count() != BURST) {
 		printf("Line %d: Error, not all packets flushed. "
 				"Expected %u, got %u\n",
 				__LINE__, BURST, total_packet_count());
-		return -1;
+		failed = 1;
 	}
 
+	rte_mempool_put_bulk(p, (void *)bufs, BURST);
+
+	if (failed)
+		return -1;
+
 	printf("Flush test with worker shutdown passed\n\n");
 	return 0;
 }
@@ -596,7 +590,10 @@  quit_workers(struct worker_params *wp, struct rte_mempool *p)
 	const unsigned num_workers = rte_lcore_count() - 1;
 	unsigned i;
 	struct rte_mbuf *bufs[RTE_MAX_LCORE];
-	rte_mempool_get_bulk(p, (void *)bufs, num_workers);
+	if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
+		printf("line %d: Error getting mbufs from pool\n", __LINE__);
+		return;
+	}
 
 	zero_quit = 0;
 	quit = 1;
@@ -604,11 +601,12 @@  quit_workers(struct worker_params *wp, struct rte_mempool *p)
 		bufs[i]->hash.usr = i << 1;
 	rte_distributor_process(d, bufs, num_workers);
 
-	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
-
 	rte_distributor_process(d, NULL, 0);
 	rte_distributor_flush(d);
 	rte_eal_mp_wait_lcore();
+
+	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
+
 	quit = 0;
 	worker_idx = 0;
 	zero_idx = RTE_MAX_LCORE;