[RFC,4/5] app: add IO proxy app using shared memory interfaces

Message ID 20230922081912.7090-5-bruce.richardson@intel.com (mailing list archive)
State New
Delegated to: Thomas Monjalon
Headers
Series Using shared mempools for zero-copy IO proxying |

Checks

Context Check Description
ci/checkpatch warning coding style issues

Commit Message

Bruce Richardson Sept. 22, 2023, 8:19 a.m. UTC
  This app uses the shared memory poll, and shared ethdev infrastructure
to act as a zero-copy IO proxy to other applications. It has been tested
and verified to work successfully proxying data to testpmd instances on
the system, with those testpmd instances each being passed a unix socket
to work with via the shared memory bus "-a sock:/path/to/sock..."
parameter.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 app/io-proxy/command_fns.c | 160 ++++++++++
 app/io-proxy/commands.list |   6 +
 app/io-proxy/datapath.c    | 595 +++++++++++++++++++++++++++++++++++++
 app/io-proxy/datapath.h    |  37 +++
 app/io-proxy/datapath_mp.c |  78 +++++
 app/io-proxy/main.c        |  71 +++++
 app/io-proxy/meson.build   |  12 +
 app/meson.build            |   1 +
 8 files changed, 960 insertions(+)
 create mode 100644 app/io-proxy/command_fns.c
 create mode 100644 app/io-proxy/commands.list
 create mode 100644 app/io-proxy/datapath.c
 create mode 100644 app/io-proxy/datapath.h
 create mode 100644 app/io-proxy/datapath_mp.c
 create mode 100644 app/io-proxy/main.c
 create mode 100644 app/io-proxy/meson.build
  

Patch

diff --git a/app/io-proxy/command_fns.c b/app/io-proxy/command_fns.c
new file mode 100644
index 0000000000..f48921e005
--- /dev/null
+++ b/app/io-proxy/command_fns.c
@@ -0,0 +1,160 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdbool.h>
+
+#include <rte_ethdev.h>
+
+#include "datapath.h"
+#include "commands.h"
+
+extern volatile bool quit;
+extern volatile bool running_startup_script;
+
+void
+cmd_add_socket_parsed(void *parsed_result, struct cmdline *cl __rte_unused,
+		void *data __rte_unused)
+{
+	struct cmd_add_socket_result *res = parsed_result;
+	uint64_t maxmem = 0;
+	char *endchar;
+
+	maxmem = strtoull(res->memsize, &endchar, 0);
+	switch (*endchar) {
+	case 'G': case 'g':
+		maxmem *= 1024;
+		/* fall-through */
+	case 'M': case 'm':
+		maxmem *= 1024;
+		/* fall-through */
+	case 'K': case 'k':
+		maxmem *= 1024;
+		break;
+	}
+	if (res->port >= MAX_PORTS_SUPPORTED) {
+		fprintf(stderr, "Port id out of range. Must be <%u\n", MAX_PORTS_SUPPORTED);
+		goto err;
+	}
+	if (res->queue >= MAX_QUEUES_SUPPORTED) {
+		fprintf(stderr, "Queue id out of range. Must be <%u\n", MAX_QUEUES_SUPPORTED);
+		goto err;
+	}
+	if (listen_unix_socket(res->path, maxmem, res->port, res->queue) != 0) {
+		fprintf(stderr, "error initializing socket: %s\n", res->path);
+		goto err;
+	}
+
+	printf("Created socket = %s with memsize = %s using port = %u, queue = %u\n",
+			res->path, res->memsize, res->port, res->queue);
+	return;
+
+err:
+	if (running_startup_script) {
+		quit = true;
+		/* wait for main thread to quit. Just spin here for condition which
+		 * will never actually come true, as main thread should just exit
+		 */
+		while (quit)
+			usleep(100);
+	}
+	/* if running interactively, do nothing on error except report it above */
+}
+
+void
+cmd_list_sockets_parsed(__rte_unused void *parsed_result,
+		__rte_unused struct cmdline *cl,
+		__rte_unused void *data)
+{
+	const char *path;
+	int sock;
+	uint64_t maxmem;
+	uint16_t port, queue;
+	bool connected;
+
+	for (int i = get_next_socket(0, &path, &sock, &maxmem, &port, &queue, &connected);
+			i < MAX_SOCKETS;
+			i = get_next_socket(i + 1, &path, &sock, &maxmem, &port,
+					&queue, &connected)) {
+		char memstr[32];
+		if (maxmem % (1UL << 30) == 0)
+			snprintf(memstr, sizeof(memstr), "%" PRIu64 "G", maxmem >> 30);
+		else if (maxmem % (1UL << 20) == 0)
+			snprintf(memstr, sizeof(memstr), "%" PRIu64 "M", maxmem >> 20);
+		else if (maxmem % (1UL << 10) == 0)
+			snprintf(memstr, sizeof(memstr), "%" PRIu64 "K", maxmem >> 10);
+		else
+			snprintf(memstr, sizeof(memstr), "%" PRIu64, maxmem);
+
+		printf("Socket %s [%s]: mem=%s, port=%u, queue=%u\n",
+				path, connected ? "connected" : "idle", memstr, port, queue);
+	}
+}
+
+void
+cmd_list_ports_parsed(__rte_unused void *parsed_result,
+		__rte_unused struct cmdline *cl,
+		__rte_unused void *data)
+{
+	for (int i = 0; i < rte_eth_dev_count_avail(); i++) {
+		struct rte_ether_addr addr;
+		int retval = rte_eth_macaddr_get(i, &addr);
+		if (retval != 0) {
+			printf("Port %d - MAC UNKNOWN\n", i);
+			continue;
+		}
+		printf("Port %d - "RTE_ETHER_ADDR_PRT_FMT"\n", i, RTE_ETHER_ADDR_BYTES(&addr));
+	}
+}
+
+void
+cmd_show_port_stats_parsed(__rte_unused void *parsed_result,
+		__rte_unused struct cmdline *cl,
+		__rte_unused void *data)
+{
+	for (int i = 0; i < rte_eth_dev_count_avail(); i++) {
+		struct rte_eth_stats stats = {0};
+		int retval = rte_eth_stats_get(i, &stats);
+		if (retval != 0) {
+			printf("Port %d - Cannot get stats\n", i);
+			continue;
+		}
+		printf("Port %d - ipkts: %"PRIu64", imissed: %"PRIu64
+				", ierrors: %"PRIu64", opkts: %"PRIu64"\n",
+				i, stats.ipackets, stats.imissed, stats.ierrors, stats.opackets);
+	}
+}
+
+void
+cmd_show_socket_stats_parsed(__rte_unused void *parsed_result,
+		__rte_unused struct cmdline *cl,
+		__rte_unused void *data)
+{
+	const char *path;
+	int sock;
+	uint64_t maxmem;
+	uint16_t port, queue;
+	bool connected;
+
+	for (int i = get_next_socket(0, &path, &sock, &maxmem, &port, &queue, &connected);
+			i < MAX_SOCKETS;
+			i = get_next_socket(i + 1, &path, &sock, &maxmem, &port,
+					&queue, &connected)) {
+		if (connected || dp_stats[i].rx != 0 || dp_stats[i].deq != 0)
+			printf("Socket %u [port %u, q %u]: RX %" PRIu64 ", Enq_drops %" PRIu64
+					", Deq %" PRIu64 ", TX_drops %" PRIu64 "\n",
+					i, i / MAX_QUEUES_SUPPORTED, i % MAX_QUEUES_SUPPORTED,
+					dp_stats[i].rx, dp_stats[i].enq_drop,
+					dp_stats[i].deq, dp_stats[i].tx_drop);
+
+	}
+}
+
+void
+cmd_quit_parsed(__rte_unused void *parsed_result, struct cmdline *cl,
+		__rte_unused void *data)
+{
+	cmdline_quit(cl);
+}
diff --git a/app/io-proxy/commands.list b/app/io-proxy/commands.list
new file mode 100644
index 0000000000..9dab9bba28
--- /dev/null
+++ b/app/io-proxy/commands.list
@@ -0,0 +1,6 @@ 
+add socket <STRING>path <STRING>memsize <UINT16>port <UINT16>queue
+list sockets
+list ports
+show port stats
+show socket stats
+quit
diff --git a/app/io-proxy/datapath.c b/app/io-proxy/datapath.c
new file mode 100644
index 0000000000..1f7162de18
--- /dev/null
+++ b/app/io-proxy/datapath.c
@@ -0,0 +1,595 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <errno.h>
+#include <stdarg.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <libgen.h>
+#include <sys/un.h>
+#include <sys/stat.h>
+#include <sys/mman.h>
+#include <sys/socket.h>
+#include <linux/memfd.h>
+
+#include <rte_eal.h>
+#include <rte_dev.h>
+#include <rte_malloc.h>
+#include <rte_ethdev.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_mempool.h>
+#include <shared_mem_bus.h>
+
+#include "datapath.h"
+
+static int mempool_ops_index = -1;
+static struct rte_mempool *default_mempool;
+static volatile unsigned long long port_poll_mask;
+static volatile unsigned long long used_poll_mask;
+
+struct listen_socket_params {
+	const char *path;
+	int sock;
+	uint16_t port_id;
+	uint16_t qid;
+	uint64_t maxmem;
+};
+
+#define S_IDX(p, q) (((p) * MAX_QUEUES_SUPPORTED) + (q))
+static struct rte_ring *rx_rings[MAX_SOCKETS];
+static struct rte_ring *tx_rings[MAX_SOCKETS];
+static uintptr_t base_addrs[MAX_SOCKETS];
+static uint64_t lengths[MAX_SOCKETS];
+static struct rte_mempool *mps[MAX_SOCKETS];
+static struct listen_socket_params sock_params[MAX_SOCKETS];
+struct rxtx_stats dp_stats[MAX_SOCKETS] = {0};
+
+int
+get_next_socket(int start, const char **path, int *sock, uint64_t *maxmem,
+		uint16_t *port, uint16_t *queue, bool *connected)
+{
+	int i;
+	for (i = start; i < MAX_SOCKETS; i++) {
+		if (sock_params[i].sock > 0) {
+			*path = sock_params[i].path;
+			*sock = sock_params[i].sock;
+			*maxmem = sock_params[i].maxmem;
+			*port = sock_params[i].port_id;
+			*queue = sock_params[i].qid;
+			*connected = (port_poll_mask & (1U << i)) != 0;
+			break;
+		}
+	}
+	return i;
+}
+
+static int
+init_port(uint16_t port_id, struct rte_mempool *mbuf_pool)
+{
+	struct rte_eth_conf port_conf = {
+		.rxmode = { .mq_mode = RTE_ETH_MQ_RX_RSS, },
+		.rx_adv_conf = {
+			.rss_conf = { .rss_hf = RTE_ETH_RSS_IP | RTE_ETH_RSS_UDP, },
+		},
+	};
+	struct rte_eth_dev_info dev_info;
+	int socket = rte_socket_id();
+
+	int retval = rte_eth_dev_info_get(port_id, &dev_info);
+	if (retval != 0) {
+		printf("Error during getting device (port %u) info: %s\n",
+				port_id, strerror(-retval));
+		return retval;
+	}
+
+	if (dev_info.tx_offload_capa & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE)
+		port_conf.txmode.offloads |= RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE;
+
+	port_conf.rx_adv_conf.rss_conf.rss_hf &= dev_info.flow_type_rss_offloads;
+
+	if (rte_eth_dev_configure(port_id, MAX_QUEUES_SUPPORTED, MAX_QUEUES_SUPPORTED,
+			&port_conf) < 0) {
+		printf("Error configuring port\n");
+		return -1;
+	}
+
+	for (uint16_t q = 0; q < MAX_QUEUES_SUPPORTED; q++) {
+		retval = rte_eth_rx_queue_setup(port_id, q, 128, socket, NULL, mbuf_pool);
+		if (retval < 0) {
+			printf("Error running rx_queue_setup\n");
+			return retval;
+		}
+		retval = rte_eth_tx_queue_setup(port_id, q, 256, socket, NULL);
+		if (retval < 0) {
+			printf("Error running tx_queue_setup\n");
+			return retval;
+		}
+	}
+
+	retval = rte_eth_dev_start(port_id);
+	if (retval < 0) {
+		printf("Error running dev_start\n");
+		return retval;
+	}
+	printf("Port %u started ok\n", port_id);
+
+	if (rte_eth_promiscuous_enable(port_id) < 0)
+		printf("Warning: could not enable promisc mode on port %u\n", port_id);
+
+	return 0;
+}
+
+int
+datapath_init(const char *corelist)
+{
+	/* eal init requires non-const parameters, so copy */
+	char *cl = strdup(corelist); /* todo, free copy */
+	char l_flag[] = "-l";
+	char in_mem[] = "--in-memory";
+	char use_avx512[] = "--force-max-simd-bitwidth=512";
+	char *argv[] = {
+			program_invocation_short_name,
+			l_flag, cl,
+			in_mem,
+			use_avx512,
+			NULL,
+	};
+
+	RTE_BUILD_BUG_ON(sizeof(port_poll_mask) * CHAR_BIT < MAX_SOCKETS);
+
+	int ret = rte_eal_init(RTE_DIM(argv) - 1, argv);
+	if (ret < 0)
+		return ret;
+
+	mempool_ops_index = check_mempool_ops();
+	if (mempool_ops_index == -1)
+		rte_panic("Cannot get mempool ops");
+	printf("Mempool ops index is %d\n", mempool_ops_index);
+
+	default_mempool = rte_pktmbuf_pool_create("proxy_def",
+			MAX_SOCKETS * 200, 32, 0,
+			RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
+	if (default_mempool == NULL)
+		rte_panic("Cannot create default mempool\n");
+
+	int nb_ethdevs = rte_eth_dev_count_avail();
+	if (nb_ethdevs > MAX_PORTS_SUPPORTED) {
+		fprintf(stderr, "More ports available than supported, some will be unused\n");
+		nb_ethdevs = MAX_PORTS_SUPPORTED;
+	}
+	for (int i = 0; i < nb_ethdevs; i++) {
+		if (init_port(i, default_mempool) != 0)
+			rte_panic("Cannot init port %d\n", i);
+	}
+	return 0;
+}
+
+static int
+send_fd(int to, int fd, uint64_t fd_size, rte_iova_t iova, uint64_t pg_size)
+{
+	struct iovec iov = {0};
+	struct msghdr msg = {0};
+	size_t cmsglen = CMSG_LEN(sizeof(fd));
+	struct cmsghdr *cmhdr = malloc(cmsglen);
+	int ret = 0;
+
+	struct {
+		uint64_t fd_size;
+		rte_iova_t iova;
+		uint64_t pg_size;
+	} data_message = {fd_size, iova, pg_size};
+
+	if (cmhdr == NULL)
+		return -1;
+	iov.iov_base = (void *)&data_message;
+	iov.iov_len = sizeof(data_message);
+	msg.msg_iov = &iov;
+	msg.msg_iovlen = 1;
+	cmhdr->cmsg_level = SOL_SOCKET;
+	cmhdr->cmsg_type = SCM_RIGHTS;
+	cmhdr->cmsg_len = cmsglen;
+	msg.msg_control = cmhdr;
+	msg.msg_controllen = cmsglen;
+	*(int *)CMSG_DATA(cmhdr) = fd;
+
+	if (sendmsg(to, &msg, 0) != (int)iov.iov_len) {
+		printf("Error sending message to client, %s\n", strerror(errno));
+		ret = -1;
+	}
+	free(cmhdr);
+	return ret;
+}
+
+static int
+reconfigure_queue(uint16_t port_id, uint16_t qid, struct rte_mempool *p)
+{
+	if (rte_eth_dev_rx_queue_stop(port_id, qid) != 0) {
+		printf("Error with rx_queue_stop\n");
+		return -1;
+	}
+	if (rte_eth_dev_tx_queue_stop(port_id, qid) != 0) {
+		printf("Error with tx_queue_stop\n");
+		return -1;
+	}
+	if (rte_eth_rx_queue_setup(port_id, qid, 1024,
+			rte_socket_id(), NULL, p) != 0) {
+		printf("Error with rx_queue_setup\n");
+		return -1;
+	}
+	if (rte_eth_dev_tx_queue_start(port_id, qid) != 0) {
+		printf("Error with tx_queue_start\n");
+		return -1;
+	}
+	if (rte_eth_dev_rx_queue_start(port_id, qid) != 0) {
+		printf("Error with rx_queue_start\n");
+		return -1;
+	}
+	return 0;
+}
+
+static void
+handle_connection(int client, void *const client_mem, uint64_t memsize,
+		uint16_t port_id, uint16_t qid)
+{
+	uintptr_t client_mmap_addr = 0;
+	struct rte_ring *rx_ring, *tx_ring;
+	struct rte_mempool *local_mp;
+	size_t mempool_memsize = sizeof(*local_mp)
+					+ sizeof(local_mp->local_cache[0]) * RTE_MAX_LCORE
+					+ sizeof(struct rte_pktmbuf_pool_private);
+	local_mp = rte_malloc(NULL, mempool_memsize, 0);
+	if (local_mp == NULL) {
+		printf("Error allocating mempool struct\n");
+		return;
+	}
+	memset(local_mp, 0, mempool_memsize);
+	*local_mp = (struct rte_mempool){
+		.name = "proxy_mp",
+		.cache_size = 256,
+		.ops_index = mempool_ops_index,
+		.pool_config = client_mem,
+		.private_data_size = sizeof(struct rte_pktmbuf_pool_private),
+		.local_cache = RTE_PTR_ADD(local_mp, sizeof(*local_mp)),
+	};
+	for (uint i = 0; i < RTE_MAX_LCORE; i++) {
+		local_mp->local_cache[i].size = 256;
+		local_mp->local_cache[i].flushthresh = 300;
+	}
+
+	struct eth_shared_mem_msg *msg = malloc(sizeof(*msg) + 1024);
+	if (msg == NULL) {
+		printf("Error mallocing message buffer\n");
+		goto out;
+	}
+	int bytes_read = read(client, msg, sizeof(msg) + 1024);
+	while (bytes_read != 0) {
+		switch (msg->type) {
+		case MSG_TYPE_MMAP_BASE_ADDR:
+			client_mmap_addr = msg->offset;
+			printf("Got mmap base addr of %p\n", (void *)client_mmap_addr);
+			break;
+		case MSG_TYPE_MEMPOOL_OFFSET: {
+			struct rte_mempool *remote_pool;
+			uintptr_t remote_pd_offset;
+
+			remote_pool = RTE_PTR_ADD(client_mem, msg->offset);
+			remote_pd_offset = (uintptr_t)remote_pool->pool_data - client_mmap_addr;
+			local_mp->pool_data = RTE_PTR_ADD(client_mem, remote_pd_offset);
+			memcpy(rte_mempool_get_priv(local_mp), rte_mempool_get_priv(remote_pool),
+					sizeof(struct rte_pktmbuf_pool_private));
+
+			printf("Got mempool offset of %p, stack name is %s\n",
+					(void *)msg->offset, (char *)local_mp->pool_data);
+			struct rte_mbuf *mb = rte_pktmbuf_alloc(local_mp);
+			if (mb == NULL) {
+				printf("Error allocating buffer\n");
+				return;
+			}
+			if ((uintptr_t)mb->buf_addr != (uintptr_t)mb + 128)
+				rte_panic("Error, bad buffer\n");
+			rte_pktmbuf_free(mb);
+			break;
+		}
+		case MSG_TYPE_RX_RING_OFFSET:
+			printf("Got Rx ring offset of %p\n", (void *)msg->offset);
+			rx_ring = RTE_PTR_ADD(client_mem, msg->offset);
+			rx_rings[S_IDX(port_id, qid)] = rx_ring;
+			break;
+		case MSG_TYPE_TX_RING_OFFSET:
+			printf("Got Tx ring offset of %p\n", (void *)msg->offset);
+			tx_ring = RTE_PTR_ADD(client_mem, msg->offset);
+			tx_rings[S_IDX(port_id, qid)] = tx_ring;
+			break;
+
+		case MSG_TYPE_START:
+			base_addrs[S_IDX(port_id, qid)] = (uintptr_t)client_mem;
+			lengths[S_IDX(port_id, qid)] = memsize;
+			mps[S_IDX(port_id, qid)] = local_mp;
+			if (reconfigure_queue(port_id, qid, local_mp) < 0)
+				goto out;
+
+			port_poll_mask |= (1UL << S_IDX(port_id, qid));
+			while (used_poll_mask != port_poll_mask)
+				usleep(10);
+
+			*msg = (struct eth_shared_mem_msg){ .type = MSG_TYPE_ACK, };
+			if (write(client, msg, sizeof(*msg)) < (int)sizeof(*msg))
+				goto out;
+
+			dp_stats[S_IDX(port_id, qid)] = (struct rxtx_stats){0};
+			break;
+
+		case MSG_TYPE_GET_MAC:
+			*msg = (struct eth_shared_mem_msg){
+				.type = MSG_TYPE_REPORT_MAC,
+			};
+			rte_eth_macaddr_get(port_id, &msg->ethaddr);
+			if (write(client, msg, sizeof(*msg)) < (int)sizeof(*msg))
+				goto out;
+			break;
+		default:
+			printf("Unknown message\n");
+		}
+		bytes_read = read(client, msg, sizeof(msg) + 1024);
+	}
+out:
+	port_poll_mask &= ~(1UL << S_IDX(port_id, qid));
+	while (used_poll_mask != port_poll_mask)
+		usleep(10);
+
+	reconfigure_queue(port_id, qid, default_mempool);
+
+	free(msg);
+	rte_free(local_mp);
+
+	printf("Client disconnect\n");
+}
+
+static int
+accept_client(const int sock, uint64_t maxmem, uint16_t port_id, uint16_t qid)
+{
+	int ret = 0;
+	rte_iova_t *iovas = NULL;
+	const int client = accept(sock, NULL, NULL);
+	if (client < 0) {
+		printf("Error with accept\n");
+		return errno;
+	}
+	printf("Client connected\n");
+
+	char filename[32];
+	int flags = MFD_HUGETLB;
+	uint32_t pgsize = (1 << 21);
+	if (maxmem % (1 << 30) == 0) {
+		flags |= MFD_HUGE_1GB;
+		pgsize = (1 << 30);
+	}
+	snprintf(filename, sizeof(filename), "client_memory_%d", client);
+
+	const int memfd = memfd_create(filename, flags);
+	if (memfd < 0) {
+		printf("Error with memfd_create\n");
+		return errno;
+	}
+	if (ftruncate(memfd, maxmem) < 0) {
+		printf("Error with ftruncate\n");
+		close(memfd);
+		return errno;
+	}
+	void * const client_mem = mmap(NULL, maxmem, PROT_READ | PROT_WRITE,
+			MAP_SHARED, memfd, 0);
+	if (client_mem == MAP_FAILED) {
+		printf("Error with mmap\n");
+		ret = errno;
+		goto out;
+	}
+
+	const int nb_pages = maxmem / pgsize;
+	printf("Registering %d pages of memory with DPDK\n", nb_pages);
+	iovas = malloc(sizeof(*iovas) * nb_pages);
+	if (iovas == NULL) {
+		printf("Error with malloc for iovas\n");
+		ret = ENOMEM;
+		goto out;
+	}
+	/* assume vfio, VA = IOVA */
+	iovas[0] = (uintptr_t)client_mem;
+	for (int i = 1; i < nb_pages; i++)
+		iovas[i] = iovas[i - 1] + pgsize;
+
+
+	if (rte_extmem_register(client_mem, maxmem, iovas, nb_pages, pgsize) < 0) {
+		printf("Error registering memory with DPDK, %s\n", strerror(rte_errno));
+		ret = rte_errno;
+		goto out;
+	}
+	printf("Registered memory: %" PRIu64 " @ %p in heap %s\n", maxmem, client_mem, filename);
+
+	struct rte_eth_dev_info info;
+	if (rte_eth_dev_info_get(port_id, &info) < 0) {
+		printf("Error getting ethdev info\n");
+		ret = -1;
+		goto out;
+	}
+	if (rte_dev_dma_map(info.device, client_mem, iovas[0], maxmem) < 0) {
+		printf("Error mapping dma for device, %s\n", strerror(rte_errno));
+		ret = rte_errno;
+		goto out;
+	}
+
+	if (send_fd(client, memfd, maxmem, iovas[0], pgsize) != 0) {
+		printf("Error sending fd to client\n");
+		ret = errno;
+		goto out;
+	}
+	printf("Sent FD to client for mapping\n");
+
+	handle_connection(client, client_mem, maxmem, port_id, qid);
+out:
+	if (iovas != NULL)
+		rte_dev_dma_unmap(info.device, client_mem, iovas[0], maxmem);
+	printf("Unregistering memory: %" PRIu64 " @ %p in heap %s\n", maxmem, client_mem, filename);
+	if (rte_extmem_unregister(client_mem, maxmem) < 0)
+		printf("Error unregistering memory, %s\n", strerror(rte_errno));
+	close(memfd);
+	close(client);
+	if (client_mem != NULL)
+		munmap(client_mem, maxmem);
+	return ret;
+}
+
+static void *
+listen_fn(void *param)
+{
+	struct listen_socket_params *p = param;
+	int ret = 0;
+
+	rte_thread_register();
+
+	while (1) {
+		const int ret = accept_client(p->sock, p->maxmem, p->port_id, p->qid);
+		if (ret != 0)
+			goto out;
+	}
+out:
+	free(p);
+	return (void *)(uintptr_t)ret;
+}
+
+int
+listen_unix_socket(const char *path, const uint64_t maxmem, uint16_t port_id, uint16_t qid)
+{
+	if (sock_params[S_IDX(port_id, qid)].sock != 0) {
+		printf("Error, port already in use\n");
+		return EEXIST;
+	}
+
+	if (port_id >= rte_eth_dev_count_avail()) {
+		printf("Error, port %u does not exist\n", port_id);
+		return EINVAL;
+	}
+
+	printf("Opening and listening on socket: %s\n", path);
+	char *pathcp = strdup(path);
+	if (pathcp == NULL) {
+		printf("Error with strdup()\n");
+		free(pathcp);
+		return ENOMEM;
+	}
+	char *dirpath = dirname(pathcp);
+	mkdir(dirpath, 0700);
+	free(pathcp);
+
+	int sock = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+	if (sock < 0) {
+		printf("Error creating socket\n");
+		return errno;
+	}
+
+	struct sockaddr_un sun = {.sun_family = AF_UNIX};
+	strlcpy(sun.sun_path, path, sizeof(sun.sun_path));
+	printf("Attempting socket bind to path '%s'\n", path);
+	printf("Associated parameters are: maxmem = %"PRIu64", port = %u, qid = %u\n",
+			maxmem, port_id, qid);
+
+	if (bind(sock, (void *) &sun, sizeof(sun)) < 0) {
+		printf("Initial bind to socket '%s' failed.\n", path);
+
+		/* check if current socket is active */
+		if (connect(sock, (void *)&sun, sizeof(sun)) == 0) {
+			close(sock);
+			return EADDRINUSE;
+		}
+
+		/* socket is not active, delete and attempt rebind */
+		printf("Attempting unlink and retrying bind\n");
+		unlink(sun.sun_path);
+		if (bind(sock, (void *) &sun, sizeof(sun)) < 0) {
+			printf("Error binding socket: %s\n", strerror(errno));
+			close(sock);
+			return errno; /* if unlink failed, this will be -EADDRINUSE as above */
+		}
+	}
+
+	if (listen(sock, 1) < 0) {
+		printf("Error calling listen for socket: %s\n", strerror(errno));
+		unlink(sun.sun_path);
+		close(sock);
+		return errno;
+	}
+	printf("Socket %s listening ok\n", path);
+
+	struct listen_socket_params *p = &sock_params[S_IDX(port_id, qid)];
+	pthread_t listen_thread;
+	*p = (struct listen_socket_params){strdup(path), sock, port_id, qid, maxmem};
+	pthread_create(&listen_thread, NULL, listen_fn, p);
+	pthread_detach(listen_thread);
+	return 0;
+}
+
+void
+handle_forwarding(void)
+{
+	const typeof(port_poll_mask) to_poll = port_poll_mask;
+	if (used_poll_mask != to_poll) {
+		printf("Poll mask is now %#llx\n", to_poll);
+		used_poll_mask = to_poll;
+	}
+	if (to_poll == 0) {
+		usleep(100);
+		return;
+	}
+	for (uint16_t i = 0; i < sizeof(to_poll) * CHAR_BIT; i++) {
+		struct rte_mbuf *mbs[32];
+		void *offsets[32];
+		if (((1UL << i) & to_poll) == 0)
+			continue;
+
+		uint16_t port_id = i / MAX_QUEUES_SUPPORTED;
+		uint16_t qid = i % MAX_QUEUES_SUPPORTED;
+		uint16_t nb_rx = rte_eth_rx_burst(port_id, qid, mbs, RTE_DIM(mbs));
+		if (nb_rx != 0) {
+			dp_stats[i].rx += nb_rx;
+			for (uint pkt = 0; pkt < nb_rx; pkt++) {
+				mbs[pkt]->buf_addr = RTE_PTR_SUB(mbs[pkt]->buf_addr, base_addrs[i]);
+				offsets[pkt] = RTE_PTR_SUB(mbs[pkt], base_addrs[i]);
+			}
+			uint16_t nb_enq = rte_ring_enqueue_burst(rx_rings[i], offsets, nb_rx, NULL);
+			if (nb_enq != nb_rx) {
+				dp_stats[i].enq_drop += nb_rx - nb_enq;
+				for (uint pkt = nb_enq; pkt < nb_rx; pkt++) {
+					mbs[pkt]->buf_addr = RTE_PTR_ADD(mbs[pkt]->buf_addr,
+							base_addrs[i]);
+					mbs[pkt]->pool = mps[i];
+				}
+				rte_mempool_put_bulk(mps[i], (void *)&mbs[nb_enq], nb_rx - nb_enq);
+			}
+		}
+
+		uint16_t nb_deq = rte_ring_dequeue_burst(tx_rings[i], offsets,
+				RTE_DIM(offsets), NULL);
+		if (nb_deq != 0) {
+			dp_stats[i].deq += nb_deq;
+			for (uint pkt = 0; pkt < nb_deq; pkt++) {
+				mbs[pkt] = RTE_PTR_ADD(offsets[pkt], base_addrs[i]);
+				rte_prefetch0_write(mbs[pkt]);
+			}
+			for (uint pkt = 0; pkt < nb_deq; pkt++) {
+				mbs[pkt]->pool = mps[i];
+				mbs[pkt]->buf_addr = RTE_PTR_ADD(mbs[pkt]->buf_addr, base_addrs[i]);
+			}
+			uint16_t nb_tx = rte_eth_tx_burst(port_id, qid, mbs, nb_deq);
+			if (nb_tx != nb_deq) {
+				dp_stats[i].tx_drop += (nb_deq - nb_tx);
+				rte_pktmbuf_free_bulk(&mbs[nb_tx], nb_deq - nb_tx);
+			}
+		}
+	}
+}
+
+unsigned int
+lcore_id(void)
+{
+	return rte_lcore_id();
+}
diff --git a/app/io-proxy/datapath.h b/app/io-proxy/datapath.h
new file mode 100644
index 0000000000..ec5b395164
--- /dev/null
+++ b/app/io-proxy/datapath.h
@@ -0,0 +1,37 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#ifndef DATAPATH_H_INC
+#define DATAPATH_H_INC
+
+#include <stdint.h>
+
+#define MEMPOOL_OPS_NAME "proxy_mp"
+#define MAX_PORTS_SUPPORTED 8
+#define MAX_QUEUES_SUPPORTED 2
+#define MAX_SOCKETS (MAX_PORTS_SUPPORTED * MAX_QUEUES_SUPPORTED)
+
+struct rxtx_stats {
+	uint64_t rx;
+	uint64_t enq_drop;
+	uint64_t deq;
+	uint64_t tx_drop;
+};
+
+extern struct rxtx_stats dp_stats[MAX_SOCKETS];
+
+int check_mempool_ops(void);
+
+int datapath_init(const char *corelist);
+
+int listen_unix_socket(const char *path, uint64_t maxmem, uint16_t port, uint16_t qid);
+
+void handle_forwarding(void);
+
+unsigned int lcore_id(void);
+
+int get_next_socket(int start, const char **path, int *sock, uint64_t *maxmem,
+		uint16_t *port, uint16_t *queue, bool *connected);
+
+
+#endif
diff --git a/app/io-proxy/datapath_mp.c b/app/io-proxy/datapath_mp.c
new file mode 100644
index 0000000000..bba21a5b14
--- /dev/null
+++ b/app/io-proxy/datapath_mp.c
@@ -0,0 +1,78 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <sys/types.h>
+#include <rte_stack.h>
+#include <rte_mempool.h>
+#include <rte_mbuf.h>
+#include "datapath.h"
+
+/* Mempool value "pool_config" contains pointer to base address for this mapping */
+/* no alloc/free etc. functions for this pool, as we never create/destroy it, only use
+ * enqueue and dequeue from it.
+ */
+
+static int
+proxy_mp_enqueue(struct rte_mempool *mp, void * const *obj_table,
+	      unsigned int n)
+{
+	struct rte_stack *s = mp->pool_data;
+	void *offset_table[n];
+	uintptr_t mempool_base = (uintptr_t)mp->pool_config;
+
+	for (uint i = 0; i < n; i++)
+		offset_table[i] = RTE_PTR_SUB(obj_table[i], mempool_base);
+
+	return rte_stack_push(s, offset_table, n) == 0 ? -ENOBUFS : 0;
+}
+
+static int
+proxy_mp_dequeue(struct rte_mempool *mp, void **obj_table,
+	      unsigned int n)
+{
+	struct rte_stack *s = mp->pool_data;
+	uintptr_t mempool_base = (uintptr_t)mp->pool_config;
+
+	if (rte_stack_pop(s, obj_table, n) == 0)
+		return -ENOBUFS;
+	for (uint i = 0; i < n; i++) {
+		obj_table[i] = RTE_PTR_ADD(obj_table[i], mempool_base);
+		struct rte_mbuf *mb = obj_table[i];
+		mb->buf_addr = RTE_PTR_ADD(mb, sizeof(struct rte_mbuf) + rte_pktmbuf_priv_size(mp));
+		mb->pool = mp;
+	}
+	return 0;
+}
+
+static int
+proxy_mp_alloc(struct rte_mempool *mp __rte_unused)
+{
+	rte_panic("Should not be called\n");
+}
+
+static unsigned int
+proxy_mp_get_count(const struct rte_mempool *mp __rte_unused)
+{
+	rte_panic("Should not be called\n");
+}
+
+
+static struct rte_mempool_ops ops_proxy_mp = {
+	.name = MEMPOOL_OPS_NAME,
+	.alloc = proxy_mp_alloc,
+	.enqueue = proxy_mp_enqueue,
+	.dequeue = proxy_mp_dequeue,
+	.get_count = proxy_mp_get_count,
+};
+
+RTE_MEMPOOL_REGISTER_OPS(ops_proxy_mp);
+
+int
+check_mempool_ops(void)
+{
+	for (uint i = 0; i < rte_mempool_ops_table.num_ops; i++) {
+		if (strcmp(rte_mempool_ops_table.ops[i].name, MEMPOOL_OPS_NAME) == 0)
+			return i;
+	}
+	return -1;
+}
diff --git a/app/io-proxy/main.c b/app/io-proxy/main.c
new file mode 100644
index 0000000000..82eef81fb0
--- /dev/null
+++ b/app/io-proxy/main.c
@@ -0,0 +1,71 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <stdio.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include <rte_eal.h>
+#include <rte_common.h>
+#include <cmdline.h>
+#include <cmdline_socket.h>
+
+#include "datapath.h"
+#include "commands.h"
+
+volatile bool quit;
+volatile bool running_startup_script;
+static const char *startup_file = "dpdk-io-proxy.cmds";
+
+static void *
+run_cmdline(void *arg __rte_unused)
+{
+	struct cmdline *cl;
+	int fd = open(startup_file, O_RDONLY);
+
+	if (fd >= 0) {
+		running_startup_script = true;
+		cl = cmdline_new(ctx, "\n# ", fd, STDOUT_FILENO);
+		if (cl == NULL) {
+			fprintf(stderr, "Error processing %s\n", startup_file);
+			goto end_startup;
+		}
+		cmdline_interact(cl);
+		cmdline_quit(cl);
+end_startup:
+		running_startup_script = false;
+		close(fd);
+	}
+
+	cl = cmdline_stdin_new(ctx, "\nProxy>> ");
+	if (cl == NULL)
+		goto out;
+
+	cmdline_interact(cl);
+	cmdline_stdin_exit(cl);
+
+out:
+	quit = true;
+	return NULL;
+}
+
+int
+main(int argc, char *argv[])
+{
+	pthread_t cmdline_th;
+
+	if (argc != 2 || datapath_init(argv[1]) < 0) {
+		fprintf(stderr, "Usage %s <corelist>\n", program_invocation_short_name);
+		rte_exit(EXIT_FAILURE, "Cannot init\n");
+	}
+
+	if (pthread_create(&cmdline_th, NULL, run_cmdline, NULL) < 0)
+		rte_exit(EXIT_FAILURE, "Cannot spawn cmdline thread\n");
+	pthread_detach(cmdline_th);
+
+	while (!quit)
+		handle_forwarding();
+	return 0;
+}
diff --git a/app/io-proxy/meson.build b/app/io-proxy/meson.build
new file mode 100644
index 0000000000..f03783b68f
--- /dev/null
+++ b/app/io-proxy/meson.build
@@ -0,0 +1,12 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 Intel Corporation
+
+cmd_h = custom_target('commands_hdr',
+        output: 'commands.h',
+        input: files('commands.list'),
+        capture: true,
+        command: [cmdline_gen_cmd, '@INPUT@']
+)
+sources += files('datapath.c', 'datapath_mp.c', 'main.c', 'command_fns.c')
+sources += cmd_h
+deps += ['cmdline', 'ethdev', 'stack', 'bus_shared_mem']
diff --git a/app/meson.build b/app/meson.build
index e4bf5c531c..27f69d883e 100644
--- a/app/meson.build
+++ b/app/meson.build
@@ -18,6 +18,7 @@  apps = [
         'dumpcap',
         'pdump',
         'proc-info',
+        'io-proxy',
         'test-acl',
         'test-bbdev',
         'test-cmdline',