@@ -44,7 +44,7 @@ total_packet_count(void)
unsigned i, count = 0;
for (i = 0; i < worker_idx; i++)
count += __atomic_load_n(&worker_stats[i].handled_packets,
- __ATOMIC_ACQUIRE);
+ __ATOMIC_RELAXED);
return count;
}
@@ -55,7 +55,7 @@ clear_packet_count(void)
unsigned int i;
for (i = 0; i < RTE_MAX_LCORE; i++)
__atomic_store_n(&worker_stats[i].handled_packets, 0,
- __ATOMIC_RELEASE);
+ __ATOMIC_RELAXED);
}
/* this is the basic worker function for sanity test
@@ -67,20 +67,18 @@ handle_work(void *arg)
struct rte_mbuf *buf[8] __rte_cache_aligned;
struct worker_params *wp = arg;
struct rte_distributor *db = wp->dist;
- unsigned int count = 0, num;
+ unsigned int num;
unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
while (!quit) {
__atomic_fetch_add(&worker_stats[id].handled_packets, num,
- __ATOMIC_ACQ_REL);
- count += num;
+ __ATOMIC_RELAXED);
num = rte_distributor_get_pkt(db, id,
buf, buf, num);
}
__atomic_fetch_add(&worker_stats[id].handled_packets, num,
- __ATOMIC_ACQ_REL);
- count += num;
+ __ATOMIC_RELAXED);
rte_distributor_return_pkt(db, id, buf, num);
return 0;
}
@@ -136,7 +134,7 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
for (i = 0; i < rte_lcore_count() - 1; i++)
printf("Worker %u handled %u packets\n", i,
__atomic_load_n(&worker_stats[i].handled_packets,
- __ATOMIC_ACQUIRE));
+ __ATOMIC_RELAXED));
printf("Sanity test with all zero hashes done.\n");
/* pick two flows and check they go correctly */
@@ -163,7 +161,7 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
printf("Worker %u handled %u packets\n", i,
__atomic_load_n(
&worker_stats[i].handled_packets,
- __ATOMIC_ACQUIRE));
+ __ATOMIC_RELAXED));
printf("Sanity test with two hash values done\n");
}
@@ -190,7 +188,7 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
for (i = 0; i < rte_lcore_count() - 1; i++)
printf("Worker %u handled %u packets\n", i,
__atomic_load_n(&worker_stats[i].handled_packets,
- __ATOMIC_ACQUIRE));
+ __ATOMIC_RELAXED));
printf("Sanity test with non-zero hashes done\n");
rte_mempool_put_bulk(p, (void *)bufs, BURST);
@@ -276,23 +274,20 @@ handle_work_with_free_mbufs(void *arg)
struct rte_mbuf *buf[8] __rte_cache_aligned;
struct worker_params *wp = arg;
struct rte_distributor *d = wp->dist;
- unsigned int count = 0;
unsigned int i;
unsigned int num;
unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
while (!quit) {
- count += num;
__atomic_fetch_add(&worker_stats[id].handled_packets, num,
- __ATOMIC_ACQ_REL);
+ __ATOMIC_RELAXED);
for (i = 0; i < num; i++)
rte_pktmbuf_free(buf[i]);
num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
}
- count += num;
__atomic_fetch_add(&worker_stats[id].handled_packets, num,
- __ATOMIC_ACQ_REL);
+ __ATOMIC_RELAXED);
rte_distributor_return_pkt(d, id, buf, num);
return 0;
}
@@ -318,7 +313,6 @@ sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p)
rte_distributor_process(d, NULL, 0);
for (j = 0; j < BURST; j++) {
bufs[j]->hash.usr = (i+j) << 1;
- rte_mbuf_refcnt_set(bufs[j], 1);
}
rte_distributor_process(d, bufs, BURST);
@@ -342,15 +336,10 @@ sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p)
static int
handle_work_for_shutdown_test(void *arg)
{
- struct rte_mbuf *pkt = NULL;
struct rte_mbuf *buf[8] __rte_cache_aligned;
struct worker_params *wp = arg;
struct rte_distributor *d = wp->dist;
- unsigned int count = 0;
unsigned int num;
- unsigned int total = 0;
- unsigned int i;
- unsigned int returned = 0;
unsigned int zero_id = 0;
unsigned int zero_unset;
const unsigned int id = __atomic_fetch_add(&worker_idx, 1,
@@ -368,11 +357,8 @@ handle_work_for_shutdown_test(void *arg)
/* wait for quit single globally, or for worker zero, wait
* for zero_quit */
while (!quit && !(id == zero_id && zero_quit)) {
- count += num;
__atomic_fetch_add(&worker_stats[id].handled_packets, num,
- __ATOMIC_ACQ_REL);
- for (i = 0; i < num; i++)
- rte_pktmbuf_free(buf[i]);
+ __ATOMIC_RELAXED);
num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
if (num > 0) {
@@ -381,15 +367,12 @@ handle_work_for_shutdown_test(void *arg)
false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
}
zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE);
-
- total += num;
}
- count += num;
- returned = rte_distributor_return_pkt(d, id, buf, num);
-
__atomic_fetch_add(&worker_stats[id].handled_packets, num,
- __ATOMIC_ACQ_REL);
+ __ATOMIC_RELAXED);
if (id == zero_id) {
+ rte_distributor_return_pkt(d, id, NULL, 0);
+
/* for worker zero, allow it to restart to pick up last packet
* when all workers are shutting down.
*/
@@ -400,15 +383,11 @@ handle_work_for_shutdown_test(void *arg)
while (!quit) {
__atomic_fetch_add(&worker_stats[id].handled_packets,
- num, __ATOMIC_ACQ_REL);
- count += num;
- rte_pktmbuf_free(pkt);
+ num, __ATOMIC_RELAXED);
num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
}
- returned = rte_distributor_return_pkt(d,
- id, buf, num);
- printf("Num returned = %d\n", returned);
}
+ rte_distributor_return_pkt(d, id, buf, num);
return 0;
}
@@ -424,7 +403,9 @@ sanity_test_with_worker_shutdown(struct worker_params *wp,
{
struct rte_distributor *d = wp->dist;
struct rte_mbuf *bufs[BURST];
- unsigned i;
+ struct rte_mbuf *bufs2[BURST];
+ unsigned int i;
+ unsigned int failed = 0;
printf("=== Sanity test of worker shutdown ===\n");
@@ -450,16 +431,17 @@ sanity_test_with_worker_shutdown(struct worker_params *wp,
*/
/* get more buffers to queue up, again setting them to the same flow */
- if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) {
printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ rte_mempool_put_bulk(p, (void *)bufs, BURST);
return -1;
}
for (i = 0; i < BURST; i++)
- bufs[i]->hash.usr = 1;
+ bufs2[i]->hash.usr = 1;
/* get worker zero to quit */
zero_quit = 1;
- rte_distributor_process(d, bufs, BURST);
+ rte_distributor_process(d, bufs2, BURST);
/* flush the distributor */
rte_distributor_flush(d);
@@ -468,15 +450,21 @@ sanity_test_with_worker_shutdown(struct worker_params *wp,
for (i = 0; i < rte_lcore_count() - 1; i++)
printf("Worker %u handled %u packets\n", i,
__atomic_load_n(&worker_stats[i].handled_packets,
- __ATOMIC_ACQUIRE));
+ __ATOMIC_RELAXED));
if (total_packet_count() != BURST * 2) {
printf("Line %d: Error, not all packets flushed. "
"Expected %u, got %u\n",
__LINE__, BURST * 2, total_packet_count());
- return -1;
+ failed = 1;
}
+ rte_mempool_put_bulk(p, (void *)bufs, BURST);
+ rte_mempool_put_bulk(p, (void *)bufs2, BURST);
+
+ if (failed)
+ return -1;
+
printf("Sanity test with worker shutdown passed\n\n");
return 0;
}
@@ -490,7 +478,8 @@ test_flush_with_worker_shutdown(struct worker_params *wp,
{
struct rte_distributor *d = wp->dist;
struct rte_mbuf *bufs[BURST];
- unsigned i;
+ unsigned int i;
+ unsigned int failed = 0;
printf("=== Test flush fn with worker shutdown (%s) ===\n", wp->name);
@@ -522,15 +511,20 @@ test_flush_with_worker_shutdown(struct worker_params *wp,
for (i = 0; i < rte_lcore_count() - 1; i++)
printf("Worker %u handled %u packets\n", i,
__atomic_load_n(&worker_stats[i].handled_packets,
- __ATOMIC_ACQUIRE));
+ __ATOMIC_RELAXED));
if (total_packet_count() != BURST) {
printf("Line %d: Error, not all packets flushed. "
"Expected %u, got %u\n",
__LINE__, BURST, total_packet_count());
- return -1;
+ failed = 1;
}
+ rte_mempool_put_bulk(p, (void *)bufs, BURST);
+
+ if (failed)
+ return -1;
+
printf("Flush test with worker shutdown passed\n\n");
return 0;
}
@@ -596,7 +590,10 @@ quit_workers(struct worker_params *wp, struct rte_mempool *p)
const unsigned num_workers = rte_lcore_count() - 1;
unsigned i;
struct rte_mbuf *bufs[RTE_MAX_LCORE];
- rte_mempool_get_bulk(p, (void *)bufs, num_workers);
+ if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return;
+ }
zero_quit = 0;
quit = 1;
@@ -604,11 +601,12 @@ quit_workers(struct worker_params *wp, struct rte_mempool *p)
bufs[i]->hash.usr = i << 1;
rte_distributor_process(d, bufs, num_workers);
- rte_mempool_put_bulk(p, (void *)bufs, num_workers);
-
rte_distributor_process(d, NULL, 0);
rte_distributor_flush(d);
rte_eal_mp_wait_lcore();
+
+ rte_mempool_put_bulk(p, (void *)bufs, num_workers);
+
quit = 0;
worker_idx = 0;
zero_idx = RTE_MAX_LCORE;