From patchwork Fri Sep 22 08:19:11 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Bruce Richardson X-Patchwork-Id: 131825 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id F15B842612; Fri, 22 Sep 2023 10:20:09 +0200 (CEST) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 2A29840A6E; Fri, 22 Sep 2023 10:19:48 +0200 (CEST) Received: from mgamail.intel.com (mgamail.intel.com [192.55.52.93]) by mails.dpdk.org (Postfix) with ESMTP id 86A97402EC for ; Fri, 22 Sep 2023 10:19:43 +0200 (CEST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=intel.com; i=@intel.com; q=dns/txt; s=Intel; t=1695370783; x=1726906783; h=from:to:cc:subject:date:message-id:in-reply-to: references:mime-version:content-transfer-encoding; bh=xF9yrPvNRndGFLbQAc2hNdUvCFdStS47rLDogqd0gCY=; b=nlJREX5cVD7Y52dF0JbdsjZ/0TOJBmPja/VSHnu+tk/+GY3hPqilbY8M HIiQm5x8hQQwbrD8fNF0HKjaMEKS/uabwqA3iOISd0UzV2sYCBo3F9ILp Txe/lfI99kPXww6cOVXAniPM02/IWpwqvWq6QzKew9a+qpjn3oZNC/lbx q+lAJjp1NHAZML3wOQyIPqKsDMKN8GBri+lJ+tYLi+5rPOTreyNFEF9Qk PQBwHsA2FDP8zYMZ1y3VqTeZ8sv6t0RgOaXp0tFFnKrLopXGpJGxX9ZCv Hihwf/KLPj/Sa9JkytMHLGvNC1qVZG/JCnzuRwQmR1vcxoRpvuq+Wpc4K w==; X-IronPort-AV: E=McAfee;i="6600,9927,10840"; a="378063977" X-IronPort-AV: E=Sophos;i="6.03,167,1694761200"; d="scan'208";a="378063977" Received: from fmsmga005.fm.intel.com ([10.253.24.32]) by fmsmga102.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 22 Sep 2023 01:19:35 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=McAfee;i="6600,9927,10840"; a="1078281148" X-IronPort-AV: E=Sophos;i="6.03,167,1694761200"; d="scan'208";a="1078281148" Received: from silpixa00401385.ir.intel.com ([10.237.214.14]) by fmsmga005.fm.intel.com with ESMTP; 22 Sep 2023 01:19:34 -0700 From: Bruce Richardson To: dev@dpdk.org Cc: Bruce Richardson Subject: [RFC PATCH 4/5] app: add IO proxy app using shared memory interfaces Date: Fri, 22 Sep 2023 09:19:11 +0100 Message-Id: <20230922081912.7090-5-bruce.richardson@intel.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20230922081912.7090-1-bruce.richardson@intel.com> References: <20230922081912.7090-1-bruce.richardson@intel.com> MIME-Version: 1.0 X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org This app uses the shared memory poll, and shared ethdev infrastructure to act as a zero-copy IO proxy to other applications. It has been tested and verified to work successfully proxying data to testpmd instances on the system, with those testpmd instances each being passed a unix socket to work with via the shared memory bus "-a sock:/path/to/sock..." parameter. Signed-off-by: Bruce Richardson --- app/io-proxy/command_fns.c | 160 ++++++++++ app/io-proxy/commands.list | 6 + app/io-proxy/datapath.c | 595 +++++++++++++++++++++++++++++++++++++ app/io-proxy/datapath.h | 37 +++ app/io-proxy/datapath_mp.c | 78 +++++ app/io-proxy/main.c | 71 +++++ app/io-proxy/meson.build | 12 + app/meson.build | 1 + 8 files changed, 960 insertions(+) create mode 100644 app/io-proxy/command_fns.c create mode 100644 app/io-proxy/commands.list create mode 100644 app/io-proxy/datapath.c create mode 100644 app/io-proxy/datapath.h create mode 100644 app/io-proxy/datapath_mp.c create mode 100644 app/io-proxy/main.c create mode 100644 app/io-proxy/meson.build diff --git a/app/io-proxy/command_fns.c b/app/io-proxy/command_fns.c new file mode 100644 index 0000000000..f48921e005 --- /dev/null +++ b/app/io-proxy/command_fns.c @@ -0,0 +1,160 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2023 Intel Corporation + */ + +#include +#include +#include + +#include + +#include "datapath.h" +#include "commands.h" + +extern volatile bool quit; +extern volatile bool running_startup_script; + +void +cmd_add_socket_parsed(void *parsed_result, struct cmdline *cl __rte_unused, + void *data __rte_unused) +{ + struct cmd_add_socket_result *res = parsed_result; + uint64_t maxmem = 0; + char *endchar; + + maxmem = strtoull(res->memsize, &endchar, 0); + switch (*endchar) { + case 'G': case 'g': + maxmem *= 1024; + /* fall-through */ + case 'M': case 'm': + maxmem *= 1024; + /* fall-through */ + case 'K': case 'k': + maxmem *= 1024; + break; + } + if (res->port >= MAX_PORTS_SUPPORTED) { + fprintf(stderr, "Port id out of range. Must be <%u\n", MAX_PORTS_SUPPORTED); + goto err; + } + if (res->queue >= MAX_QUEUES_SUPPORTED) { + fprintf(stderr, "Queue id out of range. Must be <%u\n", MAX_QUEUES_SUPPORTED); + goto err; + } + if (listen_unix_socket(res->path, maxmem, res->port, res->queue) != 0) { + fprintf(stderr, "error initializing socket: %s\n", res->path); + goto err; + } + + printf("Created socket = %s with memsize = %s using port = %u, queue = %u\n", + res->path, res->memsize, res->port, res->queue); + return; + +err: + if (running_startup_script) { + quit = true; + /* wait for main thread to quit. Just spin here for condition which + * will never actually come true, as main thread should just exit + */ + while (quit) + usleep(100); + } + /* if running interactively, do nothing on error except report it above */ +} + +void +cmd_list_sockets_parsed(__rte_unused void *parsed_result, + __rte_unused struct cmdline *cl, + __rte_unused void *data) +{ + const char *path; + int sock; + uint64_t maxmem; + uint16_t port, queue; + bool connected; + + for (int i = get_next_socket(0, &path, &sock, &maxmem, &port, &queue, &connected); + i < MAX_SOCKETS; + i = get_next_socket(i + 1, &path, &sock, &maxmem, &port, + &queue, &connected)) { + char memstr[32]; + if (maxmem % (1UL << 30) == 0) + snprintf(memstr, sizeof(memstr), "%" PRIu64 "G", maxmem >> 30); + else if (maxmem % (1UL << 20) == 0) + snprintf(memstr, sizeof(memstr), "%" PRIu64 "M", maxmem >> 20); + else if (maxmem % (1UL << 10) == 0) + snprintf(memstr, sizeof(memstr), "%" PRIu64 "K", maxmem >> 10); + else + snprintf(memstr, sizeof(memstr), "%" PRIu64, maxmem); + + printf("Socket %s [%s]: mem=%s, port=%u, queue=%u\n", + path, connected ? "connected" : "idle", memstr, port, queue); + } +} + +void +cmd_list_ports_parsed(__rte_unused void *parsed_result, + __rte_unused struct cmdline *cl, + __rte_unused void *data) +{ + for (int i = 0; i < rte_eth_dev_count_avail(); i++) { + struct rte_ether_addr addr; + int retval = rte_eth_macaddr_get(i, &addr); + if (retval != 0) { + printf("Port %d - MAC UNKNOWN\n", i); + continue; + } + printf("Port %d - "RTE_ETHER_ADDR_PRT_FMT"\n", i, RTE_ETHER_ADDR_BYTES(&addr)); + } +} + +void +cmd_show_port_stats_parsed(__rte_unused void *parsed_result, + __rte_unused struct cmdline *cl, + __rte_unused void *data) +{ + for (int i = 0; i < rte_eth_dev_count_avail(); i++) { + struct rte_eth_stats stats = {0}; + int retval = rte_eth_stats_get(i, &stats); + if (retval != 0) { + printf("Port %d - Cannot get stats\n", i); + continue; + } + printf("Port %d - ipkts: %"PRIu64", imissed: %"PRIu64 + ", ierrors: %"PRIu64", opkts: %"PRIu64"\n", + i, stats.ipackets, stats.imissed, stats.ierrors, stats.opackets); + } +} + +void +cmd_show_socket_stats_parsed(__rte_unused void *parsed_result, + __rte_unused struct cmdline *cl, + __rte_unused void *data) +{ + const char *path; + int sock; + uint64_t maxmem; + uint16_t port, queue; + bool connected; + + for (int i = get_next_socket(0, &path, &sock, &maxmem, &port, &queue, &connected); + i < MAX_SOCKETS; + i = get_next_socket(i + 1, &path, &sock, &maxmem, &port, + &queue, &connected)) { + if (connected || dp_stats[i].rx != 0 || dp_stats[i].deq != 0) + printf("Socket %u [port %u, q %u]: RX %" PRIu64 ", Enq_drops %" PRIu64 + ", Deq %" PRIu64 ", TX_drops %" PRIu64 "\n", + i, i / MAX_QUEUES_SUPPORTED, i % MAX_QUEUES_SUPPORTED, + dp_stats[i].rx, dp_stats[i].enq_drop, + dp_stats[i].deq, dp_stats[i].tx_drop); + + } +} + +void +cmd_quit_parsed(__rte_unused void *parsed_result, struct cmdline *cl, + __rte_unused void *data) +{ + cmdline_quit(cl); +} diff --git a/app/io-proxy/commands.list b/app/io-proxy/commands.list new file mode 100644 index 0000000000..9dab9bba28 --- /dev/null +++ b/app/io-proxy/commands.list @@ -0,0 +1,6 @@ +add socket path memsize port queue +list sockets +list ports +show port stats +show socket stats +quit diff --git a/app/io-proxy/datapath.c b/app/io-proxy/datapath.c new file mode 100644 index 0000000000..1f7162de18 --- /dev/null +++ b/app/io-proxy/datapath.c @@ -0,0 +1,595 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2023 Intel Corporation + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "datapath.h" + +static int mempool_ops_index = -1; +static struct rte_mempool *default_mempool; +static volatile unsigned long long port_poll_mask; +static volatile unsigned long long used_poll_mask; + +struct listen_socket_params { + const char *path; + int sock; + uint16_t port_id; + uint16_t qid; + uint64_t maxmem; +}; + +#define S_IDX(p, q) (((p) * MAX_QUEUES_SUPPORTED) + (q)) +static struct rte_ring *rx_rings[MAX_SOCKETS]; +static struct rte_ring *tx_rings[MAX_SOCKETS]; +static uintptr_t base_addrs[MAX_SOCKETS]; +static uint64_t lengths[MAX_SOCKETS]; +static struct rte_mempool *mps[MAX_SOCKETS]; +static struct listen_socket_params sock_params[MAX_SOCKETS]; +struct rxtx_stats dp_stats[MAX_SOCKETS] = {0}; + +int +get_next_socket(int start, const char **path, int *sock, uint64_t *maxmem, + uint16_t *port, uint16_t *queue, bool *connected) +{ + int i; + for (i = start; i < MAX_SOCKETS; i++) { + if (sock_params[i].sock > 0) { + *path = sock_params[i].path; + *sock = sock_params[i].sock; + *maxmem = sock_params[i].maxmem; + *port = sock_params[i].port_id; + *queue = sock_params[i].qid; + *connected = (port_poll_mask & (1U << i)) != 0; + break; + } + } + return i; +} + +static int +init_port(uint16_t port_id, struct rte_mempool *mbuf_pool) +{ + struct rte_eth_conf port_conf = { + .rxmode = { .mq_mode = RTE_ETH_MQ_RX_RSS, }, + .rx_adv_conf = { + .rss_conf = { .rss_hf = RTE_ETH_RSS_IP | RTE_ETH_RSS_UDP, }, + }, + }; + struct rte_eth_dev_info dev_info; + int socket = rte_socket_id(); + + int retval = rte_eth_dev_info_get(port_id, &dev_info); + if (retval != 0) { + printf("Error during getting device (port %u) info: %s\n", + port_id, strerror(-retval)); + return retval; + } + + if (dev_info.tx_offload_capa & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE) + port_conf.txmode.offloads |= RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE; + + port_conf.rx_adv_conf.rss_conf.rss_hf &= dev_info.flow_type_rss_offloads; + + if (rte_eth_dev_configure(port_id, MAX_QUEUES_SUPPORTED, MAX_QUEUES_SUPPORTED, + &port_conf) < 0) { + printf("Error configuring port\n"); + return -1; + } + + for (uint16_t q = 0; q < MAX_QUEUES_SUPPORTED; q++) { + retval = rte_eth_rx_queue_setup(port_id, q, 128, socket, NULL, mbuf_pool); + if (retval < 0) { + printf("Error running rx_queue_setup\n"); + return retval; + } + retval = rte_eth_tx_queue_setup(port_id, q, 256, socket, NULL); + if (retval < 0) { + printf("Error running tx_queue_setup\n"); + return retval; + } + } + + retval = rte_eth_dev_start(port_id); + if (retval < 0) { + printf("Error running dev_start\n"); + return retval; + } + printf("Port %u started ok\n", port_id); + + if (rte_eth_promiscuous_enable(port_id) < 0) + printf("Warning: could not enable promisc mode on port %u\n", port_id); + + return 0; +} + +int +datapath_init(const char *corelist) +{ + /* eal init requires non-const parameters, so copy */ + char *cl = strdup(corelist); /* todo, free copy */ + char l_flag[] = "-l"; + char in_mem[] = "--in-memory"; + char use_avx512[] = "--force-max-simd-bitwidth=512"; + char *argv[] = { + program_invocation_short_name, + l_flag, cl, + in_mem, + use_avx512, + NULL, + }; + + RTE_BUILD_BUG_ON(sizeof(port_poll_mask) * CHAR_BIT < MAX_SOCKETS); + + int ret = rte_eal_init(RTE_DIM(argv) - 1, argv); + if (ret < 0) + return ret; + + mempool_ops_index = check_mempool_ops(); + if (mempool_ops_index == -1) + rte_panic("Cannot get mempool ops"); + printf("Mempool ops index is %d\n", mempool_ops_index); + + default_mempool = rte_pktmbuf_pool_create("proxy_def", + MAX_SOCKETS * 200, 32, 0, + RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); + if (default_mempool == NULL) + rte_panic("Cannot create default mempool\n"); + + int nb_ethdevs = rte_eth_dev_count_avail(); + if (nb_ethdevs > MAX_PORTS_SUPPORTED) { + fprintf(stderr, "More ports available than supported, some will be unused\n"); + nb_ethdevs = MAX_PORTS_SUPPORTED; + } + for (int i = 0; i < nb_ethdevs; i++) { + if (init_port(i, default_mempool) != 0) + rte_panic("Cannot init port %d\n", i); + } + return 0; +} + +static int +send_fd(int to, int fd, uint64_t fd_size, rte_iova_t iova, uint64_t pg_size) +{ + struct iovec iov = {0}; + struct msghdr msg = {0}; + size_t cmsglen = CMSG_LEN(sizeof(fd)); + struct cmsghdr *cmhdr = malloc(cmsglen); + int ret = 0; + + struct { + uint64_t fd_size; + rte_iova_t iova; + uint64_t pg_size; + } data_message = {fd_size, iova, pg_size}; + + if (cmhdr == NULL) + return -1; + iov.iov_base = (void *)&data_message; + iov.iov_len = sizeof(data_message); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + cmhdr->cmsg_level = SOL_SOCKET; + cmhdr->cmsg_type = SCM_RIGHTS; + cmhdr->cmsg_len = cmsglen; + msg.msg_control = cmhdr; + msg.msg_controllen = cmsglen; + *(int *)CMSG_DATA(cmhdr) = fd; + + if (sendmsg(to, &msg, 0) != (int)iov.iov_len) { + printf("Error sending message to client, %s\n", strerror(errno)); + ret = -1; + } + free(cmhdr); + return ret; +} + +static int +reconfigure_queue(uint16_t port_id, uint16_t qid, struct rte_mempool *p) +{ + if (rte_eth_dev_rx_queue_stop(port_id, qid) != 0) { + printf("Error with rx_queue_stop\n"); + return -1; + } + if (rte_eth_dev_tx_queue_stop(port_id, qid) != 0) { + printf("Error with tx_queue_stop\n"); + return -1; + } + if (rte_eth_rx_queue_setup(port_id, qid, 1024, + rte_socket_id(), NULL, p) != 0) { + printf("Error with rx_queue_setup\n"); + return -1; + } + if (rte_eth_dev_tx_queue_start(port_id, qid) != 0) { + printf("Error with tx_queue_start\n"); + return -1; + } + if (rte_eth_dev_rx_queue_start(port_id, qid) != 0) { + printf("Error with rx_queue_start\n"); + return -1; + } + return 0; +} + +static void +handle_connection(int client, void *const client_mem, uint64_t memsize, + uint16_t port_id, uint16_t qid) +{ + uintptr_t client_mmap_addr = 0; + struct rte_ring *rx_ring, *tx_ring; + struct rte_mempool *local_mp; + size_t mempool_memsize = sizeof(*local_mp) + + sizeof(local_mp->local_cache[0]) * RTE_MAX_LCORE + + sizeof(struct rte_pktmbuf_pool_private); + local_mp = rte_malloc(NULL, mempool_memsize, 0); + if (local_mp == NULL) { + printf("Error allocating mempool struct\n"); + return; + } + memset(local_mp, 0, mempool_memsize); + *local_mp = (struct rte_mempool){ + .name = "proxy_mp", + .cache_size = 256, + .ops_index = mempool_ops_index, + .pool_config = client_mem, + .private_data_size = sizeof(struct rte_pktmbuf_pool_private), + .local_cache = RTE_PTR_ADD(local_mp, sizeof(*local_mp)), + }; + for (uint i = 0; i < RTE_MAX_LCORE; i++) { + local_mp->local_cache[i].size = 256; + local_mp->local_cache[i].flushthresh = 300; + } + + struct eth_shared_mem_msg *msg = malloc(sizeof(*msg) + 1024); + if (msg == NULL) { + printf("Error mallocing message buffer\n"); + goto out; + } + int bytes_read = read(client, msg, sizeof(msg) + 1024); + while (bytes_read != 0) { + switch (msg->type) { + case MSG_TYPE_MMAP_BASE_ADDR: + client_mmap_addr = msg->offset; + printf("Got mmap base addr of %p\n", (void *)client_mmap_addr); + break; + case MSG_TYPE_MEMPOOL_OFFSET: { + struct rte_mempool *remote_pool; + uintptr_t remote_pd_offset; + + remote_pool = RTE_PTR_ADD(client_mem, msg->offset); + remote_pd_offset = (uintptr_t)remote_pool->pool_data - client_mmap_addr; + local_mp->pool_data = RTE_PTR_ADD(client_mem, remote_pd_offset); + memcpy(rte_mempool_get_priv(local_mp), rte_mempool_get_priv(remote_pool), + sizeof(struct rte_pktmbuf_pool_private)); + + printf("Got mempool offset of %p, stack name is %s\n", + (void *)msg->offset, (char *)local_mp->pool_data); + struct rte_mbuf *mb = rte_pktmbuf_alloc(local_mp); + if (mb == NULL) { + printf("Error allocating buffer\n"); + return; + } + if ((uintptr_t)mb->buf_addr != (uintptr_t)mb + 128) + rte_panic("Error, bad buffer\n"); + rte_pktmbuf_free(mb); + break; + } + case MSG_TYPE_RX_RING_OFFSET: + printf("Got Rx ring offset of %p\n", (void *)msg->offset); + rx_ring = RTE_PTR_ADD(client_mem, msg->offset); + rx_rings[S_IDX(port_id, qid)] = rx_ring; + break; + case MSG_TYPE_TX_RING_OFFSET: + printf("Got Tx ring offset of %p\n", (void *)msg->offset); + tx_ring = RTE_PTR_ADD(client_mem, msg->offset); + tx_rings[S_IDX(port_id, qid)] = tx_ring; + break; + + case MSG_TYPE_START: + base_addrs[S_IDX(port_id, qid)] = (uintptr_t)client_mem; + lengths[S_IDX(port_id, qid)] = memsize; + mps[S_IDX(port_id, qid)] = local_mp; + if (reconfigure_queue(port_id, qid, local_mp) < 0) + goto out; + + port_poll_mask |= (1UL << S_IDX(port_id, qid)); + while (used_poll_mask != port_poll_mask) + usleep(10); + + *msg = (struct eth_shared_mem_msg){ .type = MSG_TYPE_ACK, }; + if (write(client, msg, sizeof(*msg)) < (int)sizeof(*msg)) + goto out; + + dp_stats[S_IDX(port_id, qid)] = (struct rxtx_stats){0}; + break; + + case MSG_TYPE_GET_MAC: + *msg = (struct eth_shared_mem_msg){ + .type = MSG_TYPE_REPORT_MAC, + }; + rte_eth_macaddr_get(port_id, &msg->ethaddr); + if (write(client, msg, sizeof(*msg)) < (int)sizeof(*msg)) + goto out; + break; + default: + printf("Unknown message\n"); + } + bytes_read = read(client, msg, sizeof(msg) + 1024); + } +out: + port_poll_mask &= ~(1UL << S_IDX(port_id, qid)); + while (used_poll_mask != port_poll_mask) + usleep(10); + + reconfigure_queue(port_id, qid, default_mempool); + + free(msg); + rte_free(local_mp); + + printf("Client disconnect\n"); +} + +static int +accept_client(const int sock, uint64_t maxmem, uint16_t port_id, uint16_t qid) +{ + int ret = 0; + rte_iova_t *iovas = NULL; + const int client = accept(sock, NULL, NULL); + if (client < 0) { + printf("Error with accept\n"); + return errno; + } + printf("Client connected\n"); + + char filename[32]; + int flags = MFD_HUGETLB; + uint32_t pgsize = (1 << 21); + if (maxmem % (1 << 30) == 0) { + flags |= MFD_HUGE_1GB; + pgsize = (1 << 30); + } + snprintf(filename, sizeof(filename), "client_memory_%d", client); + + const int memfd = memfd_create(filename, flags); + if (memfd < 0) { + printf("Error with memfd_create\n"); + return errno; + } + if (ftruncate(memfd, maxmem) < 0) { + printf("Error with ftruncate\n"); + close(memfd); + return errno; + } + void * const client_mem = mmap(NULL, maxmem, PROT_READ | PROT_WRITE, + MAP_SHARED, memfd, 0); + if (client_mem == MAP_FAILED) { + printf("Error with mmap\n"); + ret = errno; + goto out; + } + + const int nb_pages = maxmem / pgsize; + printf("Registering %d pages of memory with DPDK\n", nb_pages); + iovas = malloc(sizeof(*iovas) * nb_pages); + if (iovas == NULL) { + printf("Error with malloc for iovas\n"); + ret = ENOMEM; + goto out; + } + /* assume vfio, VA = IOVA */ + iovas[0] = (uintptr_t)client_mem; + for (int i = 1; i < nb_pages; i++) + iovas[i] = iovas[i - 1] + pgsize; + + + if (rte_extmem_register(client_mem, maxmem, iovas, nb_pages, pgsize) < 0) { + printf("Error registering memory with DPDK, %s\n", strerror(rte_errno)); + ret = rte_errno; + goto out; + } + printf("Registered memory: %" PRIu64 " @ %p in heap %s\n", maxmem, client_mem, filename); + + struct rte_eth_dev_info info; + if (rte_eth_dev_info_get(port_id, &info) < 0) { + printf("Error getting ethdev info\n"); + ret = -1; + goto out; + } + if (rte_dev_dma_map(info.device, client_mem, iovas[0], maxmem) < 0) { + printf("Error mapping dma for device, %s\n", strerror(rte_errno)); + ret = rte_errno; + goto out; + } + + if (send_fd(client, memfd, maxmem, iovas[0], pgsize) != 0) { + printf("Error sending fd to client\n"); + ret = errno; + goto out; + } + printf("Sent FD to client for mapping\n"); + + handle_connection(client, client_mem, maxmem, port_id, qid); +out: + if (iovas != NULL) + rte_dev_dma_unmap(info.device, client_mem, iovas[0], maxmem); + printf("Unregistering memory: %" PRIu64 " @ %p in heap %s\n", maxmem, client_mem, filename); + if (rte_extmem_unregister(client_mem, maxmem) < 0) + printf("Error unregistering memory, %s\n", strerror(rte_errno)); + close(memfd); + close(client); + if (client_mem != NULL) + munmap(client_mem, maxmem); + return ret; +} + +static void * +listen_fn(void *param) +{ + struct listen_socket_params *p = param; + int ret = 0; + + rte_thread_register(); + + while (1) { + const int ret = accept_client(p->sock, p->maxmem, p->port_id, p->qid); + if (ret != 0) + goto out; + } +out: + free(p); + return (void *)(uintptr_t)ret; +} + +int +listen_unix_socket(const char *path, const uint64_t maxmem, uint16_t port_id, uint16_t qid) +{ + if (sock_params[S_IDX(port_id, qid)].sock != 0) { + printf("Error, port already in use\n"); + return EEXIST; + } + + if (port_id >= rte_eth_dev_count_avail()) { + printf("Error, port %u does not exist\n", port_id); + return EINVAL; + } + + printf("Opening and listening on socket: %s\n", path); + char *pathcp = strdup(path); + if (pathcp == NULL) { + printf("Error with strdup()\n"); + free(pathcp); + return ENOMEM; + } + char *dirpath = dirname(pathcp); + mkdir(dirpath, 0700); + free(pathcp); + + int sock = socket(AF_UNIX, SOCK_SEQPACKET, 0); + if (sock < 0) { + printf("Error creating socket\n"); + return errno; + } + + struct sockaddr_un sun = {.sun_family = AF_UNIX}; + strlcpy(sun.sun_path, path, sizeof(sun.sun_path)); + printf("Attempting socket bind to path '%s'\n", path); + printf("Associated parameters are: maxmem = %"PRIu64", port = %u, qid = %u\n", + maxmem, port_id, qid); + + if (bind(sock, (void *) &sun, sizeof(sun)) < 0) { + printf("Initial bind to socket '%s' failed.\n", path); + + /* check if current socket is active */ + if (connect(sock, (void *)&sun, sizeof(sun)) == 0) { + close(sock); + return EADDRINUSE; + } + + /* socket is not active, delete and attempt rebind */ + printf("Attempting unlink and retrying bind\n"); + unlink(sun.sun_path); + if (bind(sock, (void *) &sun, sizeof(sun)) < 0) { + printf("Error binding socket: %s\n", strerror(errno)); + close(sock); + return errno; /* if unlink failed, this will be -EADDRINUSE as above */ + } + } + + if (listen(sock, 1) < 0) { + printf("Error calling listen for socket: %s\n", strerror(errno)); + unlink(sun.sun_path); + close(sock); + return errno; + } + printf("Socket %s listening ok\n", path); + + struct listen_socket_params *p = &sock_params[S_IDX(port_id, qid)]; + pthread_t listen_thread; + *p = (struct listen_socket_params){strdup(path), sock, port_id, qid, maxmem}; + pthread_create(&listen_thread, NULL, listen_fn, p); + pthread_detach(listen_thread); + return 0; +} + +void +handle_forwarding(void) +{ + const typeof(port_poll_mask) to_poll = port_poll_mask; + if (used_poll_mask != to_poll) { + printf("Poll mask is now %#llx\n", to_poll); + used_poll_mask = to_poll; + } + if (to_poll == 0) { + usleep(100); + return; + } + for (uint16_t i = 0; i < sizeof(to_poll) * CHAR_BIT; i++) { + struct rte_mbuf *mbs[32]; + void *offsets[32]; + if (((1UL << i) & to_poll) == 0) + continue; + + uint16_t port_id = i / MAX_QUEUES_SUPPORTED; + uint16_t qid = i % MAX_QUEUES_SUPPORTED; + uint16_t nb_rx = rte_eth_rx_burst(port_id, qid, mbs, RTE_DIM(mbs)); + if (nb_rx != 0) { + dp_stats[i].rx += nb_rx; + for (uint pkt = 0; pkt < nb_rx; pkt++) { + mbs[pkt]->buf_addr = RTE_PTR_SUB(mbs[pkt]->buf_addr, base_addrs[i]); + offsets[pkt] = RTE_PTR_SUB(mbs[pkt], base_addrs[i]); + } + uint16_t nb_enq = rte_ring_enqueue_burst(rx_rings[i], offsets, nb_rx, NULL); + if (nb_enq != nb_rx) { + dp_stats[i].enq_drop += nb_rx - nb_enq; + for (uint pkt = nb_enq; pkt < nb_rx; pkt++) { + mbs[pkt]->buf_addr = RTE_PTR_ADD(mbs[pkt]->buf_addr, + base_addrs[i]); + mbs[pkt]->pool = mps[i]; + } + rte_mempool_put_bulk(mps[i], (void *)&mbs[nb_enq], nb_rx - nb_enq); + } + } + + uint16_t nb_deq = rte_ring_dequeue_burst(tx_rings[i], offsets, + RTE_DIM(offsets), NULL); + if (nb_deq != 0) { + dp_stats[i].deq += nb_deq; + for (uint pkt = 0; pkt < nb_deq; pkt++) { + mbs[pkt] = RTE_PTR_ADD(offsets[pkt], base_addrs[i]); + rte_prefetch0_write(mbs[pkt]); + } + for (uint pkt = 0; pkt < nb_deq; pkt++) { + mbs[pkt]->pool = mps[i]; + mbs[pkt]->buf_addr = RTE_PTR_ADD(mbs[pkt]->buf_addr, base_addrs[i]); + } + uint16_t nb_tx = rte_eth_tx_burst(port_id, qid, mbs, nb_deq); + if (nb_tx != nb_deq) { + dp_stats[i].tx_drop += (nb_deq - nb_tx); + rte_pktmbuf_free_bulk(&mbs[nb_tx], nb_deq - nb_tx); + } + } + } +} + +unsigned int +lcore_id(void) +{ + return rte_lcore_id(); +} diff --git a/app/io-proxy/datapath.h b/app/io-proxy/datapath.h new file mode 100644 index 0000000000..ec5b395164 --- /dev/null +++ b/app/io-proxy/datapath.h @@ -0,0 +1,37 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2023 Intel Corporation + */ +#ifndef DATAPATH_H_INC +#define DATAPATH_H_INC + +#include + +#define MEMPOOL_OPS_NAME "proxy_mp" +#define MAX_PORTS_SUPPORTED 8 +#define MAX_QUEUES_SUPPORTED 2 +#define MAX_SOCKETS (MAX_PORTS_SUPPORTED * MAX_QUEUES_SUPPORTED) + +struct rxtx_stats { + uint64_t rx; + uint64_t enq_drop; + uint64_t deq; + uint64_t tx_drop; +}; + +extern struct rxtx_stats dp_stats[MAX_SOCKETS]; + +int check_mempool_ops(void); + +int datapath_init(const char *corelist); + +int listen_unix_socket(const char *path, uint64_t maxmem, uint16_t port, uint16_t qid); + +void handle_forwarding(void); + +unsigned int lcore_id(void); + +int get_next_socket(int start, const char **path, int *sock, uint64_t *maxmem, + uint16_t *port, uint16_t *queue, bool *connected); + + +#endif diff --git a/app/io-proxy/datapath_mp.c b/app/io-proxy/datapath_mp.c new file mode 100644 index 0000000000..bba21a5b14 --- /dev/null +++ b/app/io-proxy/datapath_mp.c @@ -0,0 +1,78 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2023 Intel Corporation + */ +#include +#include +#include +#include +#include "datapath.h" + +/* Mempool value "pool_config" contains pointer to base address for this mapping */ +/* no alloc/free etc. functions for this pool, as we never create/destroy it, only use + * enqueue and dequeue from it. + */ + +static int +proxy_mp_enqueue(struct rte_mempool *mp, void * const *obj_table, + unsigned int n) +{ + struct rte_stack *s = mp->pool_data; + void *offset_table[n]; + uintptr_t mempool_base = (uintptr_t)mp->pool_config; + + for (uint i = 0; i < n; i++) + offset_table[i] = RTE_PTR_SUB(obj_table[i], mempool_base); + + return rte_stack_push(s, offset_table, n) == 0 ? -ENOBUFS : 0; +} + +static int +proxy_mp_dequeue(struct rte_mempool *mp, void **obj_table, + unsigned int n) +{ + struct rte_stack *s = mp->pool_data; + uintptr_t mempool_base = (uintptr_t)mp->pool_config; + + if (rte_stack_pop(s, obj_table, n) == 0) + return -ENOBUFS; + for (uint i = 0; i < n; i++) { + obj_table[i] = RTE_PTR_ADD(obj_table[i], mempool_base); + struct rte_mbuf *mb = obj_table[i]; + mb->buf_addr = RTE_PTR_ADD(mb, sizeof(struct rte_mbuf) + rte_pktmbuf_priv_size(mp)); + mb->pool = mp; + } + return 0; +} + +static int +proxy_mp_alloc(struct rte_mempool *mp __rte_unused) +{ + rte_panic("Should not be called\n"); +} + +static unsigned int +proxy_mp_get_count(const struct rte_mempool *mp __rte_unused) +{ + rte_panic("Should not be called\n"); +} + + +static struct rte_mempool_ops ops_proxy_mp = { + .name = MEMPOOL_OPS_NAME, + .alloc = proxy_mp_alloc, + .enqueue = proxy_mp_enqueue, + .dequeue = proxy_mp_dequeue, + .get_count = proxy_mp_get_count, +}; + +RTE_MEMPOOL_REGISTER_OPS(ops_proxy_mp); + +int +check_mempool_ops(void) +{ + for (uint i = 0; i < rte_mempool_ops_table.num_ops; i++) { + if (strcmp(rte_mempool_ops_table.ops[i].name, MEMPOOL_OPS_NAME) == 0) + return i; + } + return -1; +} diff --git a/app/io-proxy/main.c b/app/io-proxy/main.c new file mode 100644 index 0000000000..82eef81fb0 --- /dev/null +++ b/app/io-proxy/main.c @@ -0,0 +1,71 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2023 Intel Corporation + */ +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "datapath.h" +#include "commands.h" + +volatile bool quit; +volatile bool running_startup_script; +static const char *startup_file = "dpdk-io-proxy.cmds"; + +static void * +run_cmdline(void *arg __rte_unused) +{ + struct cmdline *cl; + int fd = open(startup_file, O_RDONLY); + + if (fd >= 0) { + running_startup_script = true; + cl = cmdline_new(ctx, "\n# ", fd, STDOUT_FILENO); + if (cl == NULL) { + fprintf(stderr, "Error processing %s\n", startup_file); + goto end_startup; + } + cmdline_interact(cl); + cmdline_quit(cl); +end_startup: + running_startup_script = false; + close(fd); + } + + cl = cmdline_stdin_new(ctx, "\nProxy>> "); + if (cl == NULL) + goto out; + + cmdline_interact(cl); + cmdline_stdin_exit(cl); + +out: + quit = true; + return NULL; +} + +int +main(int argc, char *argv[]) +{ + pthread_t cmdline_th; + + if (argc != 2 || datapath_init(argv[1]) < 0) { + fprintf(stderr, "Usage %s \n", program_invocation_short_name); + rte_exit(EXIT_FAILURE, "Cannot init\n"); + } + + if (pthread_create(&cmdline_th, NULL, run_cmdline, NULL) < 0) + rte_exit(EXIT_FAILURE, "Cannot spawn cmdline thread\n"); + pthread_detach(cmdline_th); + + while (!quit) + handle_forwarding(); + return 0; +} diff --git a/app/io-proxy/meson.build b/app/io-proxy/meson.build new file mode 100644 index 0000000000..f03783b68f --- /dev/null +++ b/app/io-proxy/meson.build @@ -0,0 +1,12 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2023 Intel Corporation + +cmd_h = custom_target('commands_hdr', + output: 'commands.h', + input: files('commands.list'), + capture: true, + command: [cmdline_gen_cmd, '@INPUT@'] +) +sources += files('datapath.c', 'datapath_mp.c', 'main.c', 'command_fns.c') +sources += cmd_h +deps += ['cmdline', 'ethdev', 'stack', 'bus_shared_mem'] diff --git a/app/meson.build b/app/meson.build index e4bf5c531c..27f69d883e 100644 --- a/app/meson.build +++ b/app/meson.build @@ -18,6 +18,7 @@ apps = [ 'dumpcap', 'pdump', 'proc-info', + 'io-proxy', 'test-acl', 'test-bbdev', 'test-cmdline',