[RFC,1/5] bus: new driver to accept shared memory over unix socket

Message ID 20230922081912.7090-2-bruce.richardson@intel.com (mailing list archive)
State New
Delegated to: Thomas Monjalon
Headers
Series Using shared mempools for zero-copy IO proxying |

Checks

Context Check Description
ci/checkpatch warning coding style issues

Commit Message

Bruce Richardson Sept. 22, 2023, 8:19 a.m. UTC
  Add a new driver to DPDK which supports taking in memory e.g. hugepage
memory via a unix socket connection and maps it into the DPDK process
replacing the current socket memory as the default memory for use by
future requests.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 drivers/bus/meson.build                 |   1 +
 drivers/bus/shared_mem/meson.build      |  11 +
 drivers/bus/shared_mem/shared_mem_bus.c | 323 ++++++++++++++++++++++++
 drivers/bus/shared_mem/shared_mem_bus.h |  75 ++++++
 drivers/bus/shared_mem/version.map      |  11 +
 5 files changed, 421 insertions(+)
 create mode 100644 drivers/bus/shared_mem/meson.build
 create mode 100644 drivers/bus/shared_mem/shared_mem_bus.c
 create mode 100644 drivers/bus/shared_mem/shared_mem_bus.h
 create mode 100644 drivers/bus/shared_mem/version.map
  

Comments

Jerin Jacob Nov. 23, 2023, 2:50 p.m. UTC | #1
On Fri, Sep 22, 2023 at 1:49 PM Bruce Richardson
<bruce.richardson@intel.com> wrote:
>
> Add a new driver to DPDK which supports taking in memory e.g. hugepage
> memory via a unix socket connection and maps it into the DPDK process
> replacing the current socket memory as the default memory for use by
> future requests.
>
> Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>

Thanks Bruce for this work. IMO, This will open up a lot of use cases
like CPU based offload on different process.

> +
> +enum shared_mem_msg_type {
> +       MSG_TYPE_ACK = 0,
> +       MSG_TYPE_MMAP_BASE_ADDR,
> +       MSG_TYPE_MEMPOOL_OFFSET,
> +       MSG_TYPE_RX_RING_OFFSET,
> +       MSG_TYPE_TX_RING_OFFSET,
> +       MSG_TYPE_START,
> +       MSG_TYPE_GET_MAC,
> +       MSG_TYPE_REPORT_MAC,
> +};

In order to cater to different use cases, IMO, drivers/bus/shared_mem/
should be generic and act only
as transport for communicating with other process. That would
translate to the following
1) drivers/bus/shared_mem/ provides means to register message type and
its callback
2) The consumers(drivers/mempool/shared_mem and drivers/net/sharedmem)
register the
the callback. Definition the callback and message type should be in consumer.

Also, We may change the bus/sharedmem to bus/socket or so, to limit
the scope of bus
driver as communion mechanism to talk to different process. That way
there can different DPDK driver
can based on socket bus in the future.
  

Patch

diff --git a/drivers/bus/meson.build b/drivers/bus/meson.build
index a78b4283bf..0e64959d1a 100644
--- a/drivers/bus/meson.build
+++ b/drivers/bus/meson.build
@@ -9,6 +9,7 @@  drivers = [
         'ifpga',
         'pci',
         'platform',
+        'shared_mem',
         'vdev',
         'vmbus',
 ]
diff --git a/drivers/bus/shared_mem/meson.build b/drivers/bus/shared_mem/meson.build
new file mode 100644
index 0000000000..1fa21f3a09
--- /dev/null
+++ b/drivers/bus/shared_mem/meson.build
@@ -0,0 +1,11 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 Intel Corporation
+
+if is_windows
+    build = false
+    reason = 'not supported on Windows'
+endif
+
+sources = files('shared_mem_bus.c')
+require_iova_in_mbuf = false
+deps += ['mbuf', 'net']
diff --git a/drivers/bus/shared_mem/shared_mem_bus.c b/drivers/bus/shared_mem/shared_mem_bus.c
new file mode 100644
index 0000000000..e0369ed416
--- /dev/null
+++ b/drivers/bus/shared_mem/shared_mem_bus.c
@@ -0,0 +1,323 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <errno.h>
+#include <malloc.h>
+#include <inttypes.h>
+#include <sys/un.h>
+#include <sys/stat.h>
+#include <sys/mman.h>
+#include <sys/socket.h>
+
+#include <rte_log.h>
+#include <rte_lcore.h>
+#include <rte_errno.h>
+#include <rte_malloc.h>
+#include <rte_devargs.h>
+#include <rte_mbuf_pool_ops.h>
+
+#include <bus_driver.h>
+#include <dev_driver.h>
+#include "shared_mem_bus.h"
+
+RTE_LOG_REGISTER_DEFAULT(shared_mem_bus_logtype, DEBUG);
+#define BUS_LOG(level, fmt, args...) rte_log(RTE_LOG_ ## level, \
+		shared_mem_bus_logtype, "## SHARED MEM BUS: %s(): " fmt "\n", __func__, ##args)
+#define BUS_ERR(fmt, args...)  BUS_LOG(ERR, fmt, ## args)
+#define BUS_INFO(fmt, args...)  BUS_LOG(INFO, fmt, ## args)
+#define BUS_DEBUG(fmt, args...)  BUS_LOG(DEBUG, fmt, ## args)
+
+static int dev_scan(void);
+static int dev_probe(void);
+static struct rte_device *find_device(const struct rte_device *start, rte_dev_cmp_t cmp,
+		 const void *data);
+static enum rte_iova_mode get_iommu_class(void);
+static int addr_parse(const char *, void *);
+
+struct socket_device {
+	struct rte_device rte_device;
+	TAILQ_ENTRY(socket_device) next;
+	int fd;
+	uintptr_t membase;
+	uintptr_t memlen;
+};
+
+/** List of devices */
+TAILQ_HEAD(socket_list, socket_device);
+TAILQ_HEAD(device_list, rte_device);
+
+struct shared_mem_bus {
+	struct rte_bus bus;
+	struct socket_list socket_list;
+	struct shared_mem_drv *ethdrv;
+	struct device_list device_list;
+};
+
+static struct shared_mem_bus shared_mem_bus = {
+	.bus = {
+		.scan = dev_scan,
+		.probe = dev_probe,
+		.find_device = find_device,
+		.get_iommu_class = get_iommu_class,
+		.parse = addr_parse,
+	},
+
+	.socket_list = TAILQ_HEAD_INITIALIZER(shared_mem_bus.socket_list),
+	.device_list = TAILQ_HEAD_INITIALIZER(shared_mem_bus.device_list),
+};
+
+RTE_REGISTER_BUS(shared_mem, shared_mem_bus.bus);
+
+int
+rte_shm_bus_send_message(void *msg, size_t msglen)
+{
+	return send(shared_mem_bus.socket_list.tqh_first->fd, msg, msglen, 0);
+}
+
+int
+rte_shm_bus_recv_message(void *msg, size_t msglen)
+{
+	return recv(shared_mem_bus.socket_list.tqh_first->fd, msg, msglen, 0);
+}
+
+uintptr_t
+rte_shm_bus_get_mem_offset(void *ptr)
+{
+	struct socket_device *dev;
+	uintptr_t pval = (uintptr_t)ptr;
+
+	TAILQ_FOREACH(dev, &shared_mem_bus.socket_list, next) {
+		if (dev->membase < pval && dev->membase + dev->memlen > pval)
+			return pval - dev->membase;
+	}
+	return (uintptr_t)-1;
+}
+
+void *
+rte_shm_bus_get_mem_ptr(uintptr_t offset)
+{
+	struct socket_device *dev;
+
+	TAILQ_FOREACH(dev, &shared_mem_bus.socket_list, next) {
+		if (offset < dev->memlen)
+			return RTE_PTR_ADD(dev->membase, offset);
+	}
+	return (void *)-1;
+}
+
+static int
+dev_scan(void)
+{
+	if (shared_mem_bus.bus.conf.scan_mode != RTE_BUS_SCAN_ALLOWLIST)
+		return 0;
+
+	struct rte_devargs *devargs;
+	RTE_EAL_DEVARGS_FOREACH(shared_mem_bus.bus.name, devargs) {
+
+		int fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+		if (fd < 0) {
+			BUS_ERR("Error creating socket");
+			return -errno;
+		}
+
+		struct sockaddr_un sun = {.sun_family = AF_UNIX};
+		if (strlen(devargs->name) - 5 >= sizeof(sun.sun_path) ||
+				addr_parse(devargs->name, sun.sun_path) != 0) {
+			BUS_ERR("Error parsing device address");
+			return -EINVAL;
+		}
+
+		if (connect(fd, (void *)&sun, sizeof(sun)) != 0) {
+			BUS_ERR("Error connecting to socket");
+			return -errno;
+		}
+
+		struct socket_device *sdev = malloc(sizeof(*sdev));
+		if (sdev == NULL) {
+			BUS_ERR("Error with malloc");
+			return -ENOMEM;
+		}
+		BUS_INFO("Allocating dev for %s", devargs->name);
+		sdev->rte_device.name = devargs->name;
+		sdev->rte_device.numa_node = rte_socket_id();
+		sdev->rte_device.bus = &shared_mem_bus.bus;
+		sdev->fd = fd;
+		TAILQ_INSERT_TAIL(&shared_mem_bus.socket_list, sdev, next);
+	}
+
+	return 0;
+}
+
+static int
+recv_fd(int from, uint64_t *memsize, rte_iova_t *iova, uint64_t *pg_size)
+{
+	int fd = 0;
+	struct {
+		uint64_t fd_size;
+		rte_iova_t iova;
+		uint64_t pg_size;
+	} data_message;
+
+	size_t cmsglen = CMSG_LEN(sizeof(fd));
+	struct cmsghdr *cmhdr = malloc(cmsglen);
+	if (cmhdr == NULL) {
+		BUS_ERR("Malloc error");
+		return -1;
+	}
+
+	struct iovec iov = {
+			.iov_base = (void *)&data_message,
+			.iov_len = sizeof(data_message)
+	};
+	struct msghdr msg = {
+			.msg_iov = &iov,
+			.msg_iovlen = 1,
+			.msg_control = cmhdr,
+			.msg_controllen = cmsglen,
+	};
+	if (recvmsg(from, &msg, 0) != (int)iov.iov_len) {
+		BUS_ERR("recvmsg error %s", strerror(errno));
+		return -1;
+	}
+	if (msg.msg_controllen != cmsglen) {
+		BUS_ERR("Error with fd on message received");
+		return -1;
+	}
+	fd = *(int *)CMSG_DATA(cmhdr);
+
+	free(cmhdr);
+
+	*memsize = data_message.fd_size;
+	*iova = data_message.iova;
+	*pg_size = data_message.pg_size;
+	return fd;
+}
+
+static int
+dev_probe(void)
+{
+	if (TAILQ_EMPTY(&shared_mem_bus.socket_list))
+		return 0;
+
+	if (rte_mbuf_set_platform_mempool_ops("shared_mem") != 0) {
+		BUS_ERR("Error setting default mempool ops\n");
+		return -1;
+	}
+	BUS_INFO("Set default mempool ops to 'shared_mem'");
+
+	struct socket_device *dev;
+	TAILQ_FOREACH(dev, &shared_mem_bus.socket_list, next) {
+		uint64_t memsize = 0;
+		uint64_t pgsize = 0;
+		rte_iova_t iova = 0;
+		int memfd = recv_fd(dev->fd, &memsize, &iova, &pgsize);
+		/* check memfd is valid, the size is non-zero and multiple of 2MB */
+		if (memfd < 0 || memsize <= 0 || memsize % (1 << 21) != 0) {
+			BUS_ERR("Error getting memfd and size");
+			return -1;
+		}
+		BUS_DEBUG("Received fd %d with memsize %"PRIu64" and pgsize %"PRIu64,
+				memfd, memsize, pgsize);
+
+		void *mem = mmap(NULL, memsize, PROT_READ|PROT_WRITE, MAP_SHARED, memfd, 0);
+		if (mem == MAP_FAILED) {
+			BUS_ERR("Error mmapping the received fd");
+			return -1;
+		}
+		BUS_DEBUG("%u MB of memory mapped at %p\n", (unsigned int)(memsize >> 20), mem);
+		dev->membase = (uintptr_t)mem;
+		dev->memlen = memsize;
+
+		struct eth_shared_mem_msg msg = {
+				.type = MSG_TYPE_MMAP_BASE_ADDR,
+				.offset = dev->membase,
+		};
+		rte_shm_bus_send_message(&msg, sizeof(msg));
+
+		char malloc_heap_name[32];
+		snprintf(malloc_heap_name, sizeof(malloc_heap_name),
+				"socket_%d_ext", rte_socket_id());
+		if (rte_malloc_heap_create(malloc_heap_name) != 0) {
+			BUS_ERR("Error creating heap %s\n", malloc_heap_name);
+			return -1;
+		}
+
+		int nb_pages = (memsize / pgsize);
+		rte_iova_t *iovas = malloc(sizeof(iovas[0]) * nb_pages);
+		iovas[0] = iova;
+		for (int i = 1; i < nb_pages; i++)
+			iovas[i] = iovas[i - 1] + pgsize;
+		BUS_DEBUG("Attempting to add memory to heap: %s", malloc_heap_name);
+		if (rte_malloc_heap_memory_add(malloc_heap_name, mem, memsize,
+				iovas, nb_pages, pgsize) < 0) {
+			BUS_ERR("Error adding to malloc heap: %s", strerror(rte_errno));
+			free(iovas);
+			return -1;
+		}
+		free(iovas);
+		BUS_DEBUG("Added memory to heap");
+		rte_malloc_heap_swap_socket(rte_socket_id(),
+				rte_malloc_heap_get_socket(malloc_heap_name));
+		BUS_DEBUG("Swapped in memory as socket %d memory\n", rte_socket_id());
+
+		if (shared_mem_bus.ethdrv != NULL) {
+			struct rte_device *dev = malloc(sizeof(*dev));
+			if (dev == NULL)
+				return -1;
+			*dev = (struct rte_device){
+				.name = "shared_mem_ethdev",
+				.driver = &shared_mem_bus.ethdrv->driver,
+				.bus = &shared_mem_bus.bus,
+				.numa_node = SOCKET_ID_ANY,
+			};
+			shared_mem_bus.ethdrv->probe(shared_mem_bus.ethdrv, dev);
+		}
+	}
+	return 0;
+}
+static struct rte_device *
+find_device(const struct rte_device *start, rte_dev_cmp_t cmp,
+			 const void *data)
+{
+	RTE_SET_USED(start);
+	RTE_SET_USED(cmp);
+	RTE_SET_USED(data);
+	return NULL;
+}
+
+static enum rte_iova_mode
+get_iommu_class(void)
+{
+	/* if there are no devices, report don't care, otherwise VA mode */
+	return TAILQ_EMPTY(&shared_mem_bus.socket_list) ?  RTE_IOVA_DC : RTE_IOVA_VA;
+}
+
+static int
+addr_parse(const char *name, void *addr)
+{
+	if (strncmp(name, "sock:", 5) != 0) {
+		BUS_DEBUG("no sock: prefix on %s", name);
+		return -1;
+	}
+
+	const char *filename = &name[5];
+	struct stat st;
+	if (stat(filename, &st) < 0 || (st.st_mode & S_IFMT) != S_IFSOCK) {
+		BUS_ERR("stat failed, or not a socket, %s", filename);
+		return -1;
+	}
+	if (addr != NULL)
+		strcpy(addr, filename);
+	BUS_DEBUG("Matched filename: %s", filename);
+	return 0;
+}
+
+int
+shared_mem_register_driver(struct shared_mem_drv *drv)
+{
+	if (drv->probe == NULL)
+		return -1;
+	shared_mem_bus.ethdrv = drv;
+	return 0;
+}
+
diff --git a/drivers/bus/shared_mem/shared_mem_bus.h b/drivers/bus/shared_mem/shared_mem_bus.h
new file mode 100644
index 0000000000..01a9a2a99a
--- /dev/null
+++ b/drivers/bus/shared_mem/shared_mem_bus.h
@@ -0,0 +1,75 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+
+#ifndef DRIVERS_BUS_SHARED_MEM_H_
+#define DRIVERS_BUS_SHARED_MEM_H_
+
+#include <stdint.h>
+#include <rte_common.h>
+#include <rte_ether.h>
+#include <dev_driver.h>
+
+enum shared_mem_msg_type {
+	MSG_TYPE_ACK = 0,
+	MSG_TYPE_MMAP_BASE_ADDR,
+	MSG_TYPE_MEMPOOL_OFFSET,
+	MSG_TYPE_RX_RING_OFFSET,
+	MSG_TYPE_TX_RING_OFFSET,
+	MSG_TYPE_START,
+	MSG_TYPE_GET_MAC,
+	MSG_TYPE_REPORT_MAC,
+};
+
+struct eth_shared_mem_msg {
+	enum shared_mem_msg_type type;  /* type implicitly defines which union member is used */
+	union {
+		uintptr_t offset;    /* for many messages, just pass an offset */
+		struct rte_ether_addr ethaddr; /* allow passing mac address */
+		uintptr_t datalen;   /* for other messages, pass a data length after the data */
+	};
+	char data[];
+};
+
+struct shared_mem_drv;
+
+/**
+ * Initialisation function for the driver
+ */
+typedef int (c_eth_probe_t)(struct shared_mem_drv *drv, struct rte_device *dev);
+
+struct shared_mem_drv {
+	struct rte_driver driver;
+	c_eth_probe_t *probe;            /**< Device probe function. */
+};
+
+/** Helper for PCI device registration from driver (eth, crypto) instance */
+#define RTE_PMD_REGISTER_SHMEM_DRV(nm, c_drv) \
+RTE_INIT(shared_mem_initfn_ ##nm) \
+{\
+	(c_drv).driver.name = RTE_STR(nm);\
+	shared_mem_register_driver(&c_drv); \
+} \
+RTE_PMD_EXPORT_NAME(nm, __COUNTER__)
+
+__rte_internal
+int
+shared_mem_register_driver(struct shared_mem_drv *drv);
+
+__rte_internal
+int
+rte_shm_bus_send_message(void *msg, size_t msglen);
+
+__rte_internal
+int
+rte_shm_bus_recv_message(void *msg, size_t msglen);
+
+__rte_internal
+uintptr_t
+rte_shm_bus_get_mem_offset(void *ptr);
+
+__rte_internal
+void *
+rte_shm_bus_get_mem_ptr(uintptr_t offset);
+
+#endif /* DRIVERS_BUS_SHARED_MEM_H_ */
diff --git a/drivers/bus/shared_mem/version.map b/drivers/bus/shared_mem/version.map
new file mode 100644
index 0000000000..2af82689b1
--- /dev/null
+++ b/drivers/bus/shared_mem/version.map
@@ -0,0 +1,11 @@ 
+INTERNAL {
+	global:
+
+	shared_mem_register_driver;
+	rte_shm_bus_get_mem_offset;
+	rte_shm_bus_get_mem_ptr;
+	rte_shm_bus_recv_message;
+	rte_shm_bus_send_message;
+
+	local: *;
+};