[V6,06/17] pipeline: add support for pipeline I/O specification

Message ID 20220728151147.603265-7-cristian.dumitrescu@intel.com (mailing list archive)
State Accepted, archived
Delegated to: Thomas Monjalon
Headers
Series pipeline: pipeline configuration and build improvements |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Cristian Dumitrescu July 28, 2022, 3:11 p.m. UTC
  Add specification data structure and API for the pipeline I/O ports
and related pipeline configuration such as packet mirroring.

Signed-off-by: Cristian Dumitrescu <cristian.dumitrescu@intel.com>
Signed-off-by: Kamalakannan R. <kamalakannan.r@intel.com>
---
 lib/pipeline/rte_swx_pipeline_spec.c | 852 +++++++++++++++++++++++++++
 lib/pipeline/rte_swx_pipeline_spec.h |  58 ++
 2 files changed, 910 insertions(+)
  

Patch

diff --git a/lib/pipeline/rte_swx_pipeline_spec.c b/lib/pipeline/rte_swx_pipeline_spec.c
index bf21fe17ba..f34803793d 100644
--- a/lib/pipeline/rte_swx_pipeline_spec.c
+++ b/lib/pipeline/rte_swx_pipeline_spec.c
@@ -9,6 +9,12 @@ 
 #include <errno.h>
 
 #include <rte_common.h>
+#include <rte_mempool.h>
+
+#include <rte_swx_port_ethdev.h>
+#include <rte_swx_port_ring.h>
+#include <rte_swx_port_source_sink.h>
+#include <rte_swx_port_fd.h>
 
 #include "rte_swx_pipeline_spec.h"
 
@@ -3566,3 +3572,849 @@  rte_swx_pipeline_build_from_spec(struct rte_swx_pipeline *p,
 	pipeline_spec_free(s);
 	return status;
 }
+
+static void
+port_in_params_free(void *params, const char *port_type)
+{
+	uintptr_t dev_name;
+
+	if (!params || !port_type)
+		return;
+
+	if (!strcmp(port_type, "ethdev")) {
+		struct rte_swx_port_ethdev_reader_params *p = params;
+
+		dev_name = (uintptr_t)p->dev_name;
+	} else if (!strcmp(port_type, "ring")) {
+		struct rte_swx_port_ring_reader_params *p = params;
+
+		dev_name = (uintptr_t)p->name;
+	} else if (!strcmp(port_type, "source")) {
+		struct rte_swx_port_source_params *p = params;
+
+		dev_name = (uintptr_t)p->file_name;
+	} else
+		dev_name = (uintptr_t)NULL;
+
+	free((void *)dev_name);
+	free(params);
+}
+
+static void
+port_out_params_free(void *params, const char *port_type)
+{
+	uintptr_t dev_name;
+
+	if (!params || !port_type)
+		return;
+
+	if (!strcmp(port_type, "ethdev")) {
+		struct rte_swx_port_ethdev_writer_params *p = params;
+
+		dev_name = (uintptr_t)p->dev_name;
+	} else if (!strcmp(port_type, "ring")) {
+		struct rte_swx_port_ring_writer_params *p = params;
+
+		dev_name = (uintptr_t)p->name;
+	} else if (!strcmp(port_type, "sink")) {
+		struct rte_swx_port_sink_params *p = params;
+
+		dev_name = (uintptr_t)p->file_name;
+	} else
+		dev_name = (uintptr_t)NULL;
+
+	free((void *)dev_name);
+	free(params);
+}
+
+void
+pipeline_iospec_free(struct pipeline_iospec *s)
+{
+	uint32_t i;
+
+	if (!s)
+		return;
+
+	/* Input ports. */
+	for (i = 0; i < s->n_ports_in; i++) {
+		uintptr_t name = (uintptr_t)s->port_in_type[i];
+
+		port_in_params_free(s->port_in_params[i], s->port_in_type[i]);
+		free((void *)name);
+	}
+
+	free(s->port_in_type);
+	free(s->port_in_params);
+
+	/* Output ports. */
+	for (i = 0; i < s->n_ports_out; i++) {
+		uintptr_t name = (uintptr_t)s->port_out_type[i];
+
+		port_out_params_free(s->port_out_params[i], s->port_out_type[i]);
+		free((void *)name);
+	}
+
+	free(s->port_out_type);
+	free(s->port_out_params);
+
+	free(s);
+}
+
+static int
+mirroring_parse(struct rte_swx_pipeline_mirroring_params *p,
+		char **tokens,
+		uint32_t n_tokens,
+		const char **err_msg)
+{
+	char *token;
+
+	if ((n_tokens != 4) || strcmp(tokens[0], "slots") || strcmp(tokens[2], "sessions")) {
+		if (err_msg)
+			*err_msg = "Invalid statement.";
+		return -EINVAL;
+	}
+
+	/* <n_slots>. */
+	token = tokens[1];
+	p->n_slots = strtoul(token, &token, 0);
+	if (token[0]) {
+		if (err_msg)
+			*err_msg = "Invalid <n_slots> parameter.";
+		return -EINVAL;
+	}
+
+	/* <n_sessions>. */
+	token = tokens[3];
+	p->n_sessions = strtoul(token, &token, 0);
+	if (token[0]) {
+		if (err_msg)
+			*err_msg = "Invalid <n_sessions> parameter.";
+		return -EINVAL;
+	}
+
+	return 0;
+}
+
+static void *
+port_in_ethdev_parse(char **tokens, uint32_t n_tokens, const char **err_msg)
+{
+	struct rte_swx_port_ethdev_reader_params *p = NULL;
+	char *token, *dev_name = NULL;
+	uint32_t queue_id, burst_size;
+
+	if ((n_tokens != 5) || strcmp(tokens[1], "rxq") || strcmp(tokens[3], "bsz")) {
+		if (err_msg)
+			*err_msg = "Invalid statement.";
+		return NULL;
+	}
+
+	/* <queue_id>. */
+	token = tokens[2];
+	queue_id = strtoul(token, &token, 0);
+	if (token[0]) {
+		if (err_msg)
+			*err_msg = "Invalid <queue_id> parameter.";
+		return NULL;
+	}
+
+	/* <burst_size>. */
+	token = tokens[4];
+	burst_size = strtoul(token, &token, 0);
+	if (token[0]) {
+		if (err_msg)
+			*err_msg = "Invalid <burst_size> parameter.";
+		return NULL;
+	}
+
+	/* Memory allocation. */
+	dev_name = strdup(tokens[0]);
+	p = malloc(sizeof(struct rte_swx_port_ethdev_reader_params));
+	if (!dev_name || !p) {
+		free(dev_name);
+		free(p);
+
+		if (err_msg)
+			*err_msg = "Memory allocation failed.";
+		return NULL;
+	}
+
+	/* Initialization. */
+	p->dev_name = dev_name;
+	p->queue_id = queue_id;
+	p->burst_size = burst_size;
+
+	return p;
+}
+
+static void *
+port_in_ring_parse(char **tokens, uint32_t n_tokens, const char **err_msg)
+{
+	struct rte_swx_port_ring_reader_params *p = NULL;
+	char *token, *name = NULL;
+	uint32_t burst_size;
+
+	if ((n_tokens != 3) || strcmp(tokens[1], "bsz")) {
+		if (err_msg)
+			*err_msg = "Invalid statement.";
+		return NULL;
+	}
+
+	/* <burst_size>. */
+	token = tokens[2];
+	burst_size = strtoul(token, &token, 0);
+	if (token[0]) {
+		if (err_msg)
+			*err_msg = "Invalid <burst_size> parameter.";
+		return NULL;
+	}
+
+	/* Memory allocation. */
+	name = strdup(tokens[0]);
+	p = malloc(sizeof(struct rte_swx_port_ring_reader_params));
+	if (!name || !p) {
+		free(name);
+		free(p);
+
+		if (err_msg)
+			*err_msg = "Memory allocation failed.";
+		return NULL;
+	}
+
+	/* Initialization. */
+	p->name = name;
+	p->burst_size = burst_size;
+
+	return p;
+}
+
+static void *
+port_in_source_parse(char **tokens, uint32_t n_tokens, const char **err_msg)
+{
+	struct rte_swx_port_source_params *p = NULL;
+	struct rte_mempool *pool = NULL;
+	char *token, *file_name = NULL;
+	uint32_t n_loops, n_pkts_max;
+
+	if ((n_tokens != 8) ||
+	    strcmp(tokens[0], "mempool") ||
+	    strcmp(tokens[2], "file") ||
+	    strcmp(tokens[4], "loop") ||
+	    strcmp(tokens[6], "packets")) {
+		if (err_msg)
+			*err_msg = "Invalid statement.";
+		return NULL;
+	}
+
+	/* <mempool_name>. */
+	pool = rte_mempool_lookup(tokens[1]);
+	if (!pool) {
+		if (err_msg)
+			*err_msg = "Invalid <mempool_name> parameter.";
+		return NULL;
+	}
+
+	/* <n_loops>. */
+	token = tokens[5];
+	n_loops = strtoul(token, &token, 0);
+	if (token[0]) {
+		if (err_msg)
+			*err_msg = "Invalid <n_loops> parameter.";
+		return NULL;
+	}
+
+	/* <n_pkts_max>. */
+	token = tokens[7];
+	n_pkts_max = strtoul(token, &token, 0);
+	if (token[0]) {
+		if (err_msg)
+			*err_msg = "Invalid <n_pkts_max> parameter.";
+		return NULL;
+	}
+
+	/* Memory allocation. */
+	file_name = strdup(tokens[3]);
+	p = malloc(sizeof(struct rte_swx_port_source_params));
+	if (!file_name || !p) {
+		free(file_name);
+		free(p);
+
+		if (err_msg)
+			*err_msg = "Memory allocation failed.";
+		return NULL;
+	}
+
+	/* Initialization. */
+	p->pool = pool;
+	p->file_name = file_name;
+	p->n_loops = n_loops;
+	p->n_pkts_max = n_pkts_max;
+
+	return p;
+}
+
+static void *
+port_in_fd_parse(char **tokens,
+		 uint32_t n_tokens,
+		 const char **err_msg)
+{
+	struct rte_swx_port_fd_reader_params *p = NULL;
+	struct rte_mempool *mempool = NULL;
+	char *token;
+	uint32_t mtu, burst_size;
+	int fd;
+
+	if ((n_tokens != 7) ||
+	    strcmp(tokens[1], "mtu") ||
+	    strcmp(tokens[3], "mempool") ||
+	    strcmp(tokens[5], "bsz")) {
+		if (err_msg)
+			*err_msg = "Invalid statement.";
+		return NULL;
+	}
+
+	/* <file_descriptor>. */
+	token = tokens[0];
+	fd = strtol(token, &token, 0);
+	if (token[0]) {
+		if (err_msg)
+			*err_msg = "Invalid <file_descriptor> parameter.";
+		return NULL;
+	}
+
+	/* <mtu>. */
+	token = tokens[2];
+	mtu = strtoul(token, &token, 0);
+	if (token[0]) {
+		if (err_msg)
+			*err_msg = "Invalid <mtu> parameter.";
+		return NULL;
+	}
+
+	/* <mempool_name>. */
+	mempool = rte_mempool_lookup(tokens[4]);
+	if (!mempool) {
+		if (err_msg)
+			*err_msg = "Invalid <mempool_name> parameter.";
+		return NULL;
+	}
+
+	/* <burst_size>. */
+	token = tokens[6];
+	burst_size = strtoul(token, &token, 0);
+	if (token[0]) {
+		if (err_msg)
+			*err_msg = "Invalid <burst_size> parameter.";
+		return NULL;
+	}
+
+	/* Memory allocation. */
+	p = malloc(sizeof(struct rte_swx_port_fd_reader_params));
+	if (!p) {
+		if (err_msg)
+			*err_msg = "Memory allocation failed.";
+		return NULL;
+	}
+
+	/* Initialization. */
+	p->fd = fd;
+	p->mtu = mtu;
+	p->mempool = mempool;
+	p->burst_size = burst_size;
+
+	return p;
+}
+
+static void *
+port_out_ethdev_parse(char **tokens, uint32_t n_tokens, const char **err_msg)
+{
+	struct rte_swx_port_ethdev_writer_params *p = NULL;
+	char *token, *dev_name = NULL;
+	uint32_t queue_id, burst_size;
+
+	if ((n_tokens != 5) || strcmp(tokens[1], "txq") || strcmp(tokens[3], "bsz")) {
+		if (err_msg)
+			*err_msg = "Invalid statement.";
+		return NULL;
+	}
+
+	/* <queue_id>. */
+	token = tokens[2];
+	queue_id = strtoul(token, &token, 0);
+	if (token[0]) {
+		if (err_msg)
+			*err_msg = "Invalid <queue_id> parameter.";
+		return NULL;
+	}
+
+	/* <burst_size>. */
+	token = tokens[4];
+	burst_size = strtoul(token, &token, 0);
+	if (token[0]) {
+		if (err_msg)
+			*err_msg = "Invalid <burst_size> parameter.";
+		return NULL;
+	}
+
+	/* Memory allocation. */
+	dev_name = strdup(tokens[0]);
+	p = malloc(sizeof(struct rte_swx_port_ethdev_writer_params));
+	if (!dev_name || !p) {
+		free(dev_name);
+		free(p);
+
+		if (err_msg)
+			*err_msg = "Memory allocation failed.";
+		return NULL;
+	}
+
+	/* Initialization. */
+	p->dev_name = dev_name;
+	p->queue_id = queue_id;
+	p->burst_size = burst_size;
+
+	return p;
+}
+
+static void *
+port_out_ring_parse(char **tokens, uint32_t n_tokens, const char **err_msg)
+{
+	struct rte_swx_port_ring_writer_params *p = NULL;
+	char *token, *name = NULL;
+	uint32_t burst_size;
+
+	if ((n_tokens != 3) || strcmp(tokens[1], "bsz")) {
+		if (err_msg)
+			*err_msg = "Invalid statement.";
+		return NULL;
+	}
+
+	/* <burst_size>. */
+	token = tokens[2];
+	burst_size = strtoul(token, &token, 0);
+	if (token[0]) {
+		if (err_msg)
+			*err_msg = "Invalid <burst_size> parameter.";
+		return NULL;
+	}
+
+	/* Memory allocation. */
+	name = strdup(tokens[0]);
+	p = malloc(sizeof(struct rte_swx_port_ring_writer_params));
+	if (!name || !p) {
+		free(name);
+		free(p);
+
+		if (err_msg)
+			*err_msg = "Memory allocation failed.";
+		return NULL;
+	}
+
+	/* Initialization. */
+	p->name = name;
+	p->burst_size = burst_size;
+
+	return p;
+}
+
+static void *
+port_out_sink_parse(char **tokens, uint32_t n_tokens, const char **err_msg)
+{
+	struct rte_swx_port_sink_params *p = NULL;
+	char *file_name = NULL;
+	int file_name_valid = 0;
+
+	if ((n_tokens != 2) || strcmp(tokens[0], "file")) {
+		if (err_msg)
+			*err_msg = "Invalid statement.";
+		return NULL;
+	}
+
+	/* Memory allocation. */
+	if (strcmp(tokens[1], "none")) {
+		file_name_valid = 1;
+		file_name = strdup(tokens[1]);
+	}
+
+	p = malloc(sizeof(struct rte_swx_port_ring_writer_params));
+	if ((file_name_valid && !file_name) || !p) {
+		free(file_name);
+		free(p);
+
+		if (err_msg)
+			*err_msg = "Memory allocation failed.";
+		return NULL;
+	}
+
+	/* Initialization. */
+	p->file_name = file_name;
+
+	return p;
+}
+
+static void *
+port_out_fd_parse(char **tokens,
+		  uint32_t n_tokens,
+		  const char **err_msg)
+{
+	struct rte_swx_port_fd_writer_params *p = NULL;
+	char *token;
+	uint32_t burst_size;
+	int fd;
+
+	if ((n_tokens != 3) || strcmp(tokens[1], "bsz")) {
+		if (err_msg)
+			*err_msg = "Invalid statement.";
+		return NULL;
+	}
+
+	/* <file_descriptor>. */
+	token = tokens[0];
+	fd = strtol(token, &token, 0);
+	if (token[0]) {
+		if (err_msg)
+			*err_msg = "Invalid <file_descriptor> parameter.";
+		return NULL;
+	}
+
+	/* <burst_size>. */
+	token = tokens[2];
+	burst_size = strtoul(token, &token, 0);
+	if (token[0]) {
+		if (err_msg)
+			*err_msg = "Invalid <burst_size> parameter.";
+		return NULL;
+	}
+
+	/* Memory allocation. */
+	p = malloc(sizeof(struct rte_swx_port_fd_writer_params));
+	if (!p) {
+		if (err_msg)
+			*err_msg = "Memory allocation failed.";
+		return NULL;
+	}
+
+	/* Initialization. */
+	p->fd = fd;
+	p->burst_size = burst_size;
+
+	return p;
+}
+
+struct pipeline_iospec *
+pipeline_iospec_parse(FILE *spec,
+		      uint32_t *err_line,
+		      const char **err_msg)
+{
+	struct pipeline_iospec *s = NULL;
+	uint32_t n_lines = 0;
+
+	/* Check the input arguments. */
+	if (!spec) {
+		if (err_line)
+			*err_line = n_lines;
+		if (err_msg)
+			*err_msg = "Invalid input argument.";
+		goto error;
+	}
+
+	/* Memory allocation. */
+	s = calloc(sizeof(struct pipeline_iospec), 1);
+	if (!s) {
+		if (err_line)
+			*err_line = n_lines;
+		if (err_msg)
+			*err_msg = "Memory allocation failed.";
+		goto error;
+	}
+
+	/* Initialize with the default values. */
+	s->mirroring_params.n_slots = RTE_SWX_PACKET_MIRRORING_SLOTS_DEFAULT;
+	s->mirroring_params.n_sessions = RTE_SWX_PACKET_MIRRORING_SESSIONS_DEFAULT;
+
+	for (n_lines = 1; ; n_lines++) {
+		char line[MAX_LINE_LENGTH];
+		char *tokens[MAX_TOKENS], *ptr = line;
+		uint32_t n_tokens = 0;
+
+		/* Read next line. */
+		if (!fgets(line, sizeof(line), spec))
+			break;
+
+		/* Parse the line into tokens. */
+		for ( ; ; ) {
+			char *token;
+
+			/* Get token. */
+			token = strtok_r(ptr, " \f\n\r\t\v", &ptr);
+			if (!token)
+				break;
+
+			/* Handle comments. */
+			if ((token[0] == '#') ||
+			    (token[0] == ';') ||
+			    ((token[0] == '/') && (token[1] == '/'))) {
+				break;
+			}
+
+			/* Handle excessively long lines. */
+			if (n_tokens >= RTE_DIM(tokens)) {
+				if (err_line)
+					*err_line = n_lines;
+				if (err_msg)
+					*err_msg = "Too many tokens.";
+				goto error;
+			}
+
+			/* Handle excessively long tokens. */
+			if (strnlen(token, RTE_SWX_NAME_SIZE) >=
+			    RTE_SWX_NAME_SIZE) {
+				if (err_line)
+					*err_line = n_lines;
+				if (err_msg)
+					*err_msg = "Token too big.";
+				goto error;
+			}
+
+			/* Save token. */
+			tokens[n_tokens] = token;
+			n_tokens++;
+		}
+
+		/* Handle empty lines. */
+		if (!n_tokens)
+			continue;
+
+		/* mirroring. */
+		if ((n_tokens >= 1) && !strcmp(tokens[0], "mirroring")) {
+			int status = 0;
+
+			status = mirroring_parse(&s->mirroring_params,
+						 &tokens[1],
+						 n_tokens - 1,
+						 err_msg);
+			if (status) {
+				if (err_line)
+					*err_line = n_lines;
+				goto error;
+			}
+
+			continue;
+		}
+
+		/* port in. */
+		if ((n_tokens >= 4) && !strcmp(tokens[0], "port") && !strcmp(tokens[1], "in")) {
+			char *token = tokens[2];
+			uint32_t *new_id = NULL;
+			const char **new_type = NULL, *port_type = NULL;
+			void **new_params = NULL, *p = NULL;
+			uint32_t port_id;
+
+			/* <port_id>. */
+			port_id = strtoul(token, &token, 0);
+			if (token[0]) {
+				if (err_line)
+					*err_line = n_lines;
+				if (err_msg)
+					*err_msg = "Invalid port ID.";
+				goto error;
+			}
+
+			/* <port_type>. */
+			if (!strcmp(tokens[3], "ethdev"))
+				p = port_in_ethdev_parse(&tokens[4], n_tokens - 4, err_msg);
+			else if (!strcmp(tokens[3], "ring"))
+				p = port_in_ring_parse(&tokens[4], n_tokens - 4, err_msg);
+			else if (!strcmp(tokens[3], "source"))
+				p = port_in_source_parse(&tokens[4], n_tokens - 4, err_msg);
+			else if (!strcmp(tokens[3], "fd"))
+				p = port_in_fd_parse(&tokens[4], n_tokens - 4, err_msg);
+			else {
+				p = NULL;
+				if (err_msg)
+					*err_msg = "Invalid port type.";
+			}
+
+			if (!p) {
+				if (err_line)
+					*err_line = n_lines;
+				goto error;
+			}
+
+			/* New port. */
+			port_type = strdup(tokens[3]);
+			new_id = realloc(s->port_in_id,
+					 (s->n_ports_in + 1) * sizeof(uint32_t));
+			new_type = realloc(s->port_in_type,
+					   (s->n_ports_in + 1) * sizeof(char *));
+			new_params = realloc(s->port_in_params,
+					     (s->n_ports_in + 1) * sizeof(void *));
+			if (!port_type || !new_id || !new_type || !new_params) {
+				uintptr_t pt = (uintptr_t)port_type;
+
+				port_in_params_free(p, tokens[3]);
+				free((void *)pt);
+				free(new_id);
+				free(new_type);
+				free(new_params);
+
+				if (err_line)
+					*err_line = n_lines;
+				if (err_msg)
+					*err_msg = "Memory allocation failed.";
+				goto error;
+			}
+
+			s->port_in_id = new_id;
+			s->port_in_type = new_type;
+			s->port_in_params = new_params;
+
+			s->port_in_id[s->n_ports_in] = port_id;
+			s->port_in_type[s->n_ports_in] = port_type;
+			s->port_in_params[s->n_ports_in] = p;
+			s->n_ports_in++;
+
+			continue;
+		}
+
+		/* port out. */
+		if ((n_tokens >= 4) && !strcmp(tokens[0], "port") && !strcmp(tokens[1], "out")) {
+			char *token = tokens[2];
+			uint32_t *new_id = NULL;
+			const char **new_type = NULL, *port_type = NULL;
+			void **new_params = NULL, *p = NULL;
+			uint32_t port_id;
+
+			/* <port_id>. */
+			port_id = strtoul(token, &token, 0);
+			if (token[0]) {
+				if (err_line)
+					*err_line = n_lines;
+				if (err_msg)
+					*err_msg = "Invalid port ID.";
+				goto error;
+			}
+
+			/* <port_type>. */
+			if (!strcmp(tokens[3], "ethdev"))
+				p = port_out_ethdev_parse(&tokens[4], n_tokens - 4, err_msg);
+			else if (!strcmp(tokens[3], "ring"))
+				p = port_out_ring_parse(&tokens[4], n_tokens - 4, err_msg);
+			else if (!strcmp(tokens[3], "sink"))
+				p = port_out_sink_parse(&tokens[4], n_tokens - 4, err_msg);
+			else if (!strcmp(tokens[3], "fd"))
+				p = port_out_fd_parse(&tokens[4], n_tokens - 4, err_msg);
+			else {
+				p = NULL;
+				if (err_msg)
+					*err_msg = "Invalid port type.";
+			}
+
+			if (!p) {
+				if (err_line)
+					*err_line = n_lines;
+				goto error;
+			}
+
+			/* New port. */
+			port_type = strdup(tokens[3]);
+			new_id = realloc(s->port_out_id,
+					 (s->n_ports_out + 1) * sizeof(uint32_t));
+			new_type = realloc(s->port_out_type,
+					   (s->n_ports_out + 1) * sizeof(char *));
+			new_params = realloc(s->port_out_params,
+					     (s->n_ports_out + 1) * sizeof(void *));
+			if (!port_type || !new_id || !new_type || !new_params) {
+				uintptr_t pt = (uintptr_t)port_type;
+
+				port_out_params_free(p, tokens[3]);
+				free((void *)pt);
+				free(new_id);
+				free(new_type);
+				free(new_params);
+
+				if (err_line)
+					*err_line = n_lines;
+				if (err_msg)
+					*err_msg = "Memory allocation failed.";
+				goto error;
+			}
+
+			s->port_out_id = new_id;
+			s->port_out_type = new_type;
+			s->port_out_params = new_params;
+
+			s->port_out_id[s->n_ports_out] = port_id;
+			s->port_out_type[s->n_ports_out] = port_type;
+			s->port_out_params[s->n_ports_out] = p;
+			s->n_ports_out++;
+
+			continue;
+		}
+
+		/* Anything else. */
+		if (err_line)
+			*err_line = n_lines;
+		if (err_msg)
+			*err_msg = "Unknown I/O statement.";
+		goto error;
+	}
+
+	return s;
+
+error:
+	pipeline_iospec_free(s);
+
+	return NULL;
+}
+
+int
+pipeline_iospec_configure(struct rte_swx_pipeline *p,
+			  struct pipeline_iospec *s,
+			  const char **err_msg)
+{
+	uint32_t i;
+	int status = 0;
+
+	/* Check input arguments. */
+	if (!p || !s) {
+		if (err_msg)
+			*err_msg = "Invalid input argument";
+		return -EINVAL;
+	}
+
+	/* Mirroring. */
+	status = rte_swx_pipeline_mirroring_config(p, &s->mirroring_params);
+	if (status) {
+		if (err_msg)
+			*err_msg = "Pipeline mirroring configuration error.";
+		return status;
+	}
+
+	/* Input ports. */
+	for (i = 0; i < s->n_ports_in; i++) {
+		status = rte_swx_pipeline_port_in_config(p,
+							 i,
+							 s->port_in_type[i],
+							 s->port_in_params[i]);
+		if (status) {
+			if (err_msg)
+				*err_msg = "Pipeline input port configuration error.";
+			return status;
+		}
+	}
+
+	/* Output ports. */
+	for (i = 0; i < s->n_ports_out; i++) {
+		status = rte_swx_pipeline_port_out_config(p,
+							  i,
+							  s->port_out_type[i],
+							  s->port_out_params[i]);
+		if (status) {
+			if (err_msg)
+				*err_msg = "Pipeline output port configuration error.";
+			return status;
+		}
+	}
+
+	return 0;
+}
diff --git a/lib/pipeline/rte_swx_pipeline_spec.h b/lib/pipeline/rte_swx_pipeline_spec.h
index 707b99ba09..62ac4ecfc4 100644
--- a/lib/pipeline/rte_swx_pipeline_spec.h
+++ b/lib/pipeline/rte_swx_pipeline_spec.h
@@ -1,6 +1,13 @@ 
 /* SPDX-License-Identifier: BSD-3-Clause
  * Copyright(c) 2022 Intel Corporation
  */
+#ifndef __INCLUDE_RTE_SWX_PIPELINE_SPEC_H__
+#define __INCLUDE_RTE_SWX_PIPELINE_SPEC_H__
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 #include <stdint.h>
 #include <stdio.h>
 
@@ -204,6 +211,38 @@  struct pipeline_spec {
 	uint32_t n_apply;
 };
 
+/*
+ * Mirroring:
+ *      mirroring slots <n_slots> sessions <n_sessions>
+ *
+ * Input ports:
+ *      port in <port_id> ethdev <ethdev_name> rxq <queue_id> bsz <burst_size>
+ *      port in <port_id> ring <ring_name> bsz <burst_size>
+ *      port in <port_id> source mempool <mempool_name> file <file_name> loop <n_loops>
+ *                               packets <n_pkts_max>
+ *      port in <port_id> fd <file_descriptor> mtu <mtu> mempool <mempool_name> bsz <burst_size>
+ *
+ * Output ports:
+ *      port out <port_id> ethdev <ethdev_name> txq <queue_id> bsz <burst_size>
+ *      port out <port_id> ring <ring_name> bsz <burst_size>
+ *      port out <port_id> sink file <file_name> | none
+ *      port out <port_id> fd <file_descriptor> bsz <burst_size>
+ */
+struct pipeline_iospec {
+	struct rte_swx_pipeline_mirroring_params mirroring_params;
+
+	uint32_t *port_in_id;
+	const char **port_in_type;
+	void **port_in_params;
+
+	uint32_t *port_out_id;
+	const char **port_out_type;
+	void **port_out_params;
+
+	uint32_t n_ports_in;
+	uint32_t n_ports_out;
+};
+
 void
 pipeline_spec_free(struct pipeline_spec *s);
 
@@ -220,3 +259,22 @@  int
 pipeline_spec_configure(struct rte_swx_pipeline *p,
 			struct pipeline_spec *s,
 			const char **err_msg);
+
+void
+pipeline_iospec_free(struct pipeline_iospec *s);
+
+struct pipeline_iospec *
+pipeline_iospec_parse(FILE *spec,
+		      uint32_t *err_line,
+		      const char **err_msg);
+
+int
+pipeline_iospec_configure(struct rte_swx_pipeline *p,
+			  struct pipeline_iospec *s,
+			  const char **err_msg);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif