[10/12] examples/generator: telemetry support

Message ID 20211214141242.3383831-11-ronan.randles@intel.com (mailing list archive)
State Not Applicable, archived
Delegated to: Thomas Monjalon
Headers
Series add packet generator library and example app |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Ronan Randles Dec. 14, 2021, 2:12 p.m. UTC
  This commit adds telemetry introducing the callback functions
and returning measured values

Signed-off-by: Ronan Randles <ronan.randles@intel.com>
---
 examples/generator/main.c | 159 ++++++++++++++++++++++++++++++++------
 1 file changed, 135 insertions(+), 24 deletions(-)
  

Patch

diff --git a/examples/generator/main.c b/examples/generator/main.c
index 1ddf4c1603..2525d34b6e 100644
--- a/examples/generator/main.c
+++ b/examples/generator/main.c
@@ -13,6 +13,7 @@ 
 #include <rte_lcore.h>
 #include <rte_mbuf.h>
 #include <rte_gen.h>
+#include <rte_telemetry.h>
 
 #define RX_RING_SIZE 1024
 #define TX_RING_SIZE 1024
@@ -32,6 +33,29 @@  static volatile int done;
 static struct rte_mempool *mbuf_pool;
 struct rte_gen *gen;
 
+struct gen_args {
+	/* Inputs */
+	struct rte_gen *gen;
+
+	/* Outputs */
+	uint64_t tx_total_packets;
+	uint64_t rx_total_packets;
+	uint64_t rx_missed_total;
+	uint64_t tx_failed;
+	uint64_t last_tx_total;
+	uint64_t measured_tx_pps;
+
+
+} __rte_cache_aligned;
+/* Expose a struct as a global so the telemetry callbacks can access the
+ * data required to provide stats etc.
+ */
+struct telemetry_userdata {
+	struct gen_args *prod;
+	struct gen_args *cons;
+};
+static struct telemetry_userdata telemetry_userdata;
+
 static void handle_sigint(int sig);
 
 /* Initializes a given port using global settings and with the RX buffers
@@ -123,10 +147,11 @@  port_init(uint16_t port, struct rte_mempool *mbuf_pool)
  * an input port and writing to an output port.
  */
 static int
-lcore_producer(__rte_unused void *arg)
+lcore_producer(void *arg)
 {
+	struct gen_args *args = arg;
+	struct rte_gen *gen = args->gen;
 	uint16_t port;
-
 	/* Check that the port is on the same NUMA node as the polling thread
 	 * for best performance.
 	 */
@@ -138,25 +163,34 @@  lcore_producer(__rte_unused void *arg)
 					"polling thread.\n\tPerformance will "
 					"not be optimal.\n", port);
 
+	uint64_t tsc_hz = rte_get_tsc_hz();
+	uint64_t last_tsc_reading = 0;
+	uint64_t last_tx_total = 0;
+
 	/* Run until the application is quit or killed. */
 	while (!done) {
 		struct rte_mbuf *bufs[BURST_SIZE];
-		int i;
+		uint16_t nb_tx = 0;
 		/* Receive packets from gen and then tx them over port */
 		RTE_ETH_FOREACH_DEV(port) {
-			int nb_recieved = rte_gen_rx_burst(gen, bufs,
+			int nb_generated = rte_gen_rx_burst(gen, bufs,
 							BURST_SIZE);
-			for (i = 0; i < nb_recieved; i++) {
-				bufs[i]->pkt_len = 64;
-				bufs[i]->data_len = 64;
-			}
 
-			uint16_t nb_tx = rte_eth_tx_burst(port, 0, bufs,
-							nb_recieved);
-			if (nb_tx != nb_recieved)
-				rte_pktmbuf_free_bulk(&bufs[nb_tx],
-							(nb_recieved - nb_tx));
+			uint64_t start_tsc = rte_rdtsc();
+			if (start_tsc > last_tsc_reading + tsc_hz) {
+				args->measured_tx_pps = args->tx_total_packets -
+								last_tx_total;
+				last_tx_total = args->tx_total_packets;
+				last_tsc_reading = start_tsc;
+			}
+			nb_tx = rte_eth_tx_burst(port, 0, bufs, nb_generated);
+			args->tx_total_packets += nb_tx;
 
+			uint64_t tx_failed = nb_generated - nb_tx;
+			if (nb_tx != nb_generated) {
+				rte_pktmbuf_free_bulk(&bufs[nb_tx], tx_failed);
+				args->tx_failed += tx_failed;
+			}
 			if (unlikely(nb_tx == 0))
 				continue;
 
@@ -169,10 +203,11 @@  lcore_producer(__rte_unused void *arg)
  * an input port and writing to an output port.
  */
 static int
-lcore_consumer(__rte_unused void *arg)
+lcore_consumer(void *arg)
 {
+	struct gen_args *args = arg;
+	struct rte_gen *gen = args->gen;
 	uint16_t port;
-
 	/* Check that the port is on the same NUMA node as the polling thread
 	 * for best performance.
 	 */
@@ -195,16 +230,16 @@  lcore_consumer(__rte_unused void *arg)
 			uint64_t latency[BURST_SIZE];
 			uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
 							BURST_SIZE);
-			rte_gen_tx_burst(gen, bufs, latency, nb_rx);
+			if (unlikely(nb_rx == 0))
+				continue;
+
+			args->rx_total_packets += nb_rx;
 
 			int nb_sent = rte_gen_tx_burst(gen, bufs,
 							latency, nb_rx);
 			if (nb_sent != nb_rx)
 				rte_panic("invalid tx quantity\n");
 
-			if (unlikely(nb_rx == 0))
-				continue;
-
 		}
 	}
 	return 0;
@@ -217,6 +252,58 @@  void handle_sigint(int sig)
 	done = 1;
 }
 
+static int
+tele_gen_packet(const char *cmd, const char *params, struct rte_tel_data *d)
+{
+	RTE_SET_USED(cmd);
+	RTE_SET_USED(params);
+	RTE_SET_USED(d);
+
+	rte_tel_data_string(d, "Ether()/IP()");
+	return 0;
+}
+
+static int
+tele_gen_mpps(const char *cmd, const char *params, struct rte_tel_data *d)
+{
+	struct gen_args *args = telemetry_userdata.prod;
+	RTE_SET_USED(cmd);
+	if (params) {
+		rte_tel_data_add_dict_int(d, "TEST",
+					(args->measured_tx_pps/1000000));
+	}
+	rte_tel_data_start_dict(d);
+	rte_tel_data_add_dict_int(d, "mpps",
+					(args->measured_tx_pps/1000000));
+	return 0;
+}
+
+static int
+tele_gen_stats(const char *cmd, const char *params, struct rte_tel_data *d)
+{
+	RTE_SET_USED(cmd);
+	RTE_SET_USED(params);
+
+	struct gen_args *args_prod = telemetry_userdata.prod;
+	struct gen_args *args_cons = telemetry_userdata.cons;
+	rte_tel_data_start_dict(d);
+	static const char * const stats[] = {
+		"tx_total_packets",
+		"rx_total_packets",
+		"measured_tx_pps"
+	};
+
+	uint64_t values[RTE_DIM(stats)] = {0};
+	values[0] = args_prod->tx_total_packets;
+	values[1] = args_cons->rx_total_packets;
+	values[2] = args_prod->measured_tx_pps;
+
+	uint32_t i;
+	for (i = 0; i < RTE_DIM(stats); i++)
+		rte_tel_data_add_dict_int(d, stats[i], values[i]);
+
+	return 0;
+}
 /* The main function, which does initialization and calls the per-lcore
  * functions.
  */
@@ -224,6 +311,10 @@  int
 main(int argc, char *argv[])
 {
 	signal(SIGINT, handle_sigint);
+
+	#define CORE_COUNT 2
+	struct gen_args core_launch_args[CORE_COUNT];
+
 	unsigned int nb_ports;
 	uint16_t portid;
 
@@ -253,7 +344,7 @@  main(int argc, char *argv[])
 			rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n",
 					portid);
 
-	gen = rte_gen_create(mbuf_pool);
+	struct rte_gen *gen = rte_gen_create(mbuf_pool);
 	if (!gen)
 		rte_panic("Gen failed to initialize\n");
 
@@ -261,19 +352,39 @@  main(int argc, char *argv[])
 	if (err)
 		rte_panic("Failed to parse input args");
 
+	memset(core_launch_args, 0, sizeof(struct gen_args) * CORE_COUNT);
 	/* launch lcore functions */
 	uint32_t lcore_count = 0;
 	uint32_t lcore_id = 0;
 	RTE_LCORE_FOREACH_WORKER(lcore_id) {
-		if (lcore_count == 0)
-			rte_eal_remote_launch(lcore_producer, NULL, lcore_id);
-		else if (lcore_count == 1)
-			rte_eal_remote_launch(lcore_consumer, NULL, lcore_id);
+		core_launch_args[lcore_count].gen = gen;
+		if (lcore_count == 0) {
+			telemetry_userdata.prod =
+						&core_launch_args[lcore_count];
+			rte_eal_remote_launch(lcore_producer,
+					      telemetry_userdata.prod,
+					      lcore_id);
+		} else if (lcore_count == 1) {
+			telemetry_userdata.cons =
+						&core_launch_args[lcore_count];
+			rte_eal_remote_launch(lcore_consumer,
+					      telemetry_userdata.cons,
+					      lcore_id);
+		}
 		else
 			break;
 
 		lcore_count++;
 	}
+
+	/* Export stats via Telemetry */
+	rte_telemetry_register_cmd("/gen/stats", tele_gen_stats,
+			"Return statistics of the Gen instance.");
+	rte_telemetry_register_cmd("/gen/packet", tele_gen_packet,
+			"Return the Gen string packet being sent.");
+	rte_telemetry_register_cmd("/gen/mpps", tele_gen_mpps,
+			"Get/Set the mpps rate");
+
 	/* Stall the main thread until all other threads have returned. */
 	rte_eal_mp_wait_lcore();