@@ -67,20 +67,18 @@ handle_work(void *arg)
struct rte_mbuf *buf[8] __rte_cache_aligned;
struct worker_params *wp = arg;
struct rte_distributor *db = wp->dist;
- unsigned int count = 0, num;
+ unsigned int num;
unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
while (!quit) {
__atomic_fetch_add(&worker_stats[id].handled_packets, num,
__ATOMIC_ACQ_REL);
- count += num;
num = rte_distributor_get_pkt(db, id,
buf, buf, num);
}
__atomic_fetch_add(&worker_stats[id].handled_packets, num,
__ATOMIC_ACQ_REL);
- count += num;
rte_distributor_return_pkt(db, id, buf, num);
return 0;
}
@@ -276,21 +274,18 @@ handle_work_with_free_mbufs(void *arg)
struct rte_mbuf *buf[8] __rte_cache_aligned;
struct worker_params *wp = arg;
struct rte_distributor *d = wp->dist;
- unsigned int count = 0;
unsigned int i;
unsigned int num;
unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
while (!quit) {
- count += num;
__atomic_fetch_add(&worker_stats[id].handled_packets, num,
__ATOMIC_ACQ_REL);
for (i = 0; i < num; i++)
rte_pktmbuf_free(buf[i]);
num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
}
- count += num;
__atomic_fetch_add(&worker_stats[id].handled_packets, num,
__ATOMIC_ACQ_REL);
rte_distributor_return_pkt(d, id, buf, num);
@@ -318,7 +313,6 @@ sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p)
rte_distributor_process(d, NULL, 0);
for (j = 0; j < BURST; j++) {
bufs[j]->hash.usr = (i+j) << 1;
- rte_mbuf_refcnt_set(bufs[j], 1);
}
rte_distributor_process(d, bufs, BURST);
@@ -342,15 +336,10 @@ sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p)
static int
handle_work_for_shutdown_test(void *arg)
{
- struct rte_mbuf *pkt = NULL;
struct rte_mbuf *buf[8] __rte_cache_aligned;
struct worker_params *wp = arg;
struct rte_distributor *d = wp->dist;
- unsigned int count = 0;
unsigned int num;
- unsigned int total = 0;
- unsigned int i;
- unsigned int returned = 0;
unsigned int zero_id = 0;
unsigned int zero_unset;
const unsigned int id = __atomic_fetch_add(&worker_idx, 1,
@@ -368,11 +357,8 @@ handle_work_for_shutdown_test(void *arg)
/* wait for quit single globally, or for worker zero, wait
* for zero_quit */
while (!quit && !(id == zero_id && zero_quit)) {
- count += num;
__atomic_fetch_add(&worker_stats[id].handled_packets, num,
__ATOMIC_ACQ_REL);
- for (i = 0; i < num; i++)
- rte_pktmbuf_free(buf[i]);
num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
if (num > 0) {
@@ -381,15 +367,12 @@ handle_work_for_shutdown_test(void *arg)
false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
}
zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE);
-
- total += num;
}
- count += num;
- returned = rte_distributor_return_pkt(d, id, buf, num);
-
__atomic_fetch_add(&worker_stats[id].handled_packets, num,
__ATOMIC_ACQ_REL);
if (id == zero_id) {
+ rte_distributor_return_pkt(d, id, NULL, 0);
+
/* for worker zero, allow it to restart to pick up last packet
* when all workers are shutting down.
*/
@@ -401,14 +384,10 @@ handle_work_for_shutdown_test(void *arg)
while (!quit) {
__atomic_fetch_add(&worker_stats[id].handled_packets,
num, __ATOMIC_ACQ_REL);
- count += num;
- rte_pktmbuf_free(pkt);
num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
}
- returned = rte_distributor_return_pkt(d,
- id, buf, num);
- printf("Num returned = %d\n", returned);
}
+ rte_distributor_return_pkt(d, id, buf, num);
return 0;
}
@@ -424,7 +403,9 @@ sanity_test_with_worker_shutdown(struct worker_params *wp,
{
struct rte_distributor *d = wp->dist;
struct rte_mbuf *bufs[BURST];
- unsigned i;
+ struct rte_mbuf *bufs2[BURST];
+ unsigned int i;
+ unsigned int failed = 0;
printf("=== Sanity test of worker shutdown ===\n");
@@ -450,16 +431,17 @@ sanity_test_with_worker_shutdown(struct worker_params *wp,
*/
/* get more buffers to queue up, again setting them to the same flow */
- if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) {
printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ rte_mempool_put_bulk(p, (void *)bufs, BURST);
return -1;
}
for (i = 0; i < BURST; i++)
- bufs[i]->hash.usr = 1;
+ bufs2[i]->hash.usr = 1;
/* get worker zero to quit */
zero_quit = 1;
- rte_distributor_process(d, bufs, BURST);
+ rte_distributor_process(d, bufs2, BURST);
/* flush the distributor */
rte_distributor_flush(d);
@@ -474,9 +456,15 @@ sanity_test_with_worker_shutdown(struct worker_params *wp,
printf("Line %d: Error, not all packets flushed. "
"Expected %u, got %u\n",
__LINE__, BURST * 2, total_packet_count());
- return -1;
+ failed = 1;
}
+ rte_mempool_put_bulk(p, (void *)bufs, BURST);
+ rte_mempool_put_bulk(p, (void *)bufs2, BURST);
+
+ if (failed)
+ return -1;
+
printf("Sanity test with worker shutdown passed\n\n");
return 0;
}
@@ -490,7 +478,8 @@ test_flush_with_worker_shutdown(struct worker_params *wp,
{
struct rte_distributor *d = wp->dist;
struct rte_mbuf *bufs[BURST];
- unsigned i;
+ unsigned int i;
+ unsigned int failed = 0;
printf("=== Test flush fn with worker shutdown (%s) ===\n", wp->name);
@@ -528,9 +517,14 @@ test_flush_with_worker_shutdown(struct worker_params *wp,
printf("Line %d: Error, not all packets flushed. "
"Expected %u, got %u\n",
__LINE__, BURST, total_packet_count());
- return -1;
+ failed = 1;
}
+ rte_mempool_put_bulk(p, (void *)bufs, BURST);
+
+ if (failed)
+ return -1;
+
printf("Flush test with worker shutdown passed\n\n");
return 0;
}
@@ -596,7 +590,10 @@ quit_workers(struct worker_params *wp, struct rte_mempool *p)
const unsigned num_workers = rte_lcore_count() - 1;
unsigned i;
struct rte_mbuf *bufs[RTE_MAX_LCORE];
- rte_mempool_get_bulk(p, (void *)bufs, num_workers);
+ if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return;
+ }
zero_quit = 0;
quit = 1;
@@ -604,11 +601,12 @@ quit_workers(struct worker_params *wp, struct rte_mempool *p)
bufs[i]->hash.usr = i << 1;
rte_distributor_process(d, bufs, num_workers);
- rte_mempool_put_bulk(p, (void *)bufs, num_workers);
-
rte_distributor_process(d, NULL, 0);
rte_distributor_flush(d);
rte_eal_mp_wait_lcore();
+
+ rte_mempool_put_bulk(p, (void *)bufs, num_workers);
+
quit = 0;
worker_idx = 0;
zero_idx = RTE_MAX_LCORE;