@@ -278,6 +278,7 @@ lcore_rx(struct lcore_params *p)
app_stats.rx.enqueued_pkts += sent;
if (unlikely(sent < nb_ret)) {
+ app_stats.rx.enqdrop_pkts += nb_ret - sent;
RTE_LOG_DP(DEBUG, DISTRAPP,
"%s:Packet loss due to full ring\n", __func__);
while (sent < nb_ret)
@@ -295,13 +296,12 @@ lcore_rx(struct lcore_params *p)
static inline void
flush_one_port(struct output_buffer *outbuf, uint8_t outp)
{
- unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs,
- outbuf->count);
- app_stats.tx.tx_pkts += nb_tx;
+ unsigned int nb_tx = rte_eth_tx_burst(outp, 0,
+ outbuf->mbufs, outbuf->count);
+ app_stats.tx.tx_pkts += outbuf->count;
if (unlikely(nb_tx < outbuf->count)) {
- RTE_LOG_DP(DEBUG, DISTRAPP,
- "%s:Packet loss with tx_burst\n", __func__);
+ app_stats.tx.enqdrop_pkts += outbuf->count - nb_tx;
do {
rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
} while (++nb_tx < outbuf->count);
@@ -313,6 +313,7 @@ static inline void
flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports)
{
uint8_t outp;
+
for (outp = 0; outp < nb_ports; outp++) {
/* skip ports that are not enabled */
if ((enabled_port_mask & (1 << outp)) == 0)
@@ -405,9 +406,9 @@ lcore_tx(struct rte_ring *in_r)
if ((enabled_port_mask & (1 << port)) == 0)
continue;
- struct rte_mbuf *bufs[BURST_SIZE];
+ struct rte_mbuf *bufs[BURST_SIZE_TX];
const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
- (void *)bufs, BURST_SIZE);
+ (void *)bufs, BURST_SIZE_TX);
app_stats.tx.dequeue_pkts += nb_rx;
/* if we get no traffic, flush anything we have */
@@ -436,11 +437,12 @@ lcore_tx(struct rte_ring *in_r)
outbuf = &tx_buffers[outp];
outbuf->mbufs[outbuf->count++] = bufs[i];
- if (outbuf->count == BURST_SIZE)
+ if (outbuf->count == BURST_SIZE_TX)
flush_one_port(outbuf, outp);
}
}
}
+ printf("\nCore %u exiting tx task.\n", rte_lcore_id());
return 0;
}
@@ -562,6 +564,8 @@ lcore_worker(struct lcore_params *p)
for (i = 0; i < 8; i++)
buf[i] = NULL;
+ app_stats.worker_pkts[p->worker_id] = 1;
+
printf("\nCore %u acting as worker core.\n", rte_lcore_id());
while (!quit_signal_work) {
num = rte_distributor_get_pkt(d, id, buf, buf, num);
@@ -573,6 +577,10 @@ lcore_worker(struct lcore_params *p)
rte_pause();
buf[i]->port ^= xor_val;
}
+
+ app_stats.worker_pkts[p->worker_id] += num;
+ if (num > 0)
+ app_stats.worker_bursts[p->worker_id][num-1]++;
}
return 0;
}
@@ -677,9 +685,10 @@ main(int argc, char *argv[])
if (ret < 0)
rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n");
- if (rte_lcore_count() < 4)
+ if (rte_lcore_count() < 5)
rte_exit(EXIT_FAILURE, "Error, This application needs at "
- "least 4 logical cores to run:\n"
+ "least 5 logical cores to run:\n"
+ "1 lcore for stats (can be core 0)\n"
"1 lcore for packet RX\n"
"1 lcore for distribution\n"
"1 lcore for packet TX\n"
@@ -721,7 +730,7 @@ main(int argc, char *argv[])
}
d = rte_distributor_create("PKT_DIST", rte_socket_id(),
- rte_lcore_count() - 3,
+ rte_lcore_count() - 4,
RTE_DIST_ALG_BURST);
if (d == NULL)
rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
@@ -760,7 +769,21 @@ main(int argc, char *argv[])
/* tx core */
rte_eal_remote_launch((lcore_function_t *)lcore_tx,
dist_tx_ring, lcore_id);
+ } else if (worker_id == rte_lcore_count() - 2) {
+ printf("Starting rx on worker_id %d, lcore_id %d\n",
+ worker_id, lcore_id);
+ /* rx core */
+ struct lcore_params *p =
+ rte_malloc(NULL, sizeof(*p), 0);
+ if (!p)
+ rte_panic("malloc failure\n");
+ *p = (struct lcore_params){worker_id, d, rx_dist_ring,
+ dist_tx_ring, mbuf_pool};
+ rte_eal_remote_launch((lcore_function_t *)lcore_rx,
+ p, lcore_id);
} else {
+ printf("Starting worker on worker_id %d, lcore_id %d\n",
+ worker_id, lcore_id);
struct lcore_params *p =
rte_malloc(NULL, sizeof(*p), 0);
if (!p)
@@ -773,11 +796,6 @@ main(int argc, char *argv[])
}
worker_id++;
}
- /* call lcore_main on master core only */
- struct lcore_params p = { 0, d, rx_dist_ring, dist_tx_ring, mbuf_pool};
-
- if (lcore_rx(&p) != 0)
- return -1;
freq = rte_get_timer_hz();
t = rte_rdtsc() + freq;