@@ -1,6 +1,6 @@
# BSD LICENSE
#
-# Copyright(c) 2010-2015 Intel Corporation. All rights reserved.
+# Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -47,6 +47,9 @@ CFLAGS += -I$(RTE_SDK)/lib/librte_eal/common/include
CFLAGS += -I$(RTE_SDK)/lib/librte_ring
CFLAGS += -I$(RTE_SDK)/lib/librte_mempool
CFLAGS += -I$(RTE_SDK)/lib/librte_ivshmem
+CFLAGS += -I$(RTE_SDK)/lib/librte_mbuf
+CFLAGS += -I$(RTE_SDK)/lib/librte_ether
+CFLAGS += -I$(RTE_SDK)/lib/librte_net
CFLAGS += $(WERROR_FLAGS) -O3
# specific to linuxapp exec-env
@@ -1,7 +1,7 @@
/*-
* BSD LICENSE
*
- * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -45,7 +45,11 @@
#include <sys/signalfd.h>
#include <sys/ioctl.h>
#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <sys/un.h>
#include <assert.h>
+#include <arpa/inet.h>
+#include <sys/stat.h>
#include <rte_common.h>
#include <rte_interrupts.h>
@@ -65,15 +69,40 @@
#include <rte_malloc.h>
#include <rte_errno.h>
#include <rte_spinlock.h>
+#include <rte_memcpy.h>
+#include <rte_mbuf.h>
+#include <rte_ethdev.h>
+#include <rte_ether.h>
+#include <rte_ip.h>
#include "eal_private.h"
#include "eal_vfio.h"
#include "eal_thread.h"
+#include "eal_internal_cfg.h"
#define EAL_INTR_EPOLL_WAIT_FOREVER (-1)
#define NB_OTHER_INTR 1
+#define TCPDUMP_SOCKET_PATH "%s/tcpdump_mp_socket"
+#define TCPDUMP_SOCKET_ERR 0xFF
+#define TCPDUMP_REQ 0x1
+#define RING_SIZE 1024
+#define BURST_SIZE 32
+#define NUM_MBUFS 65536
+#define MBUF_CACHE_SIZE 250
+#define MAX_CBS 54
static RTE_DEFINE_PER_LCORE(int, _epfd) = -1; /**< epoll fd per thread */
+static uint32_t src_ip_filter;
+static int tcpdump_socket_fd;
+struct rte_ring *prim_to_sec_rx;
+struct rte_ring *prim_to_sec_tx;
+struct rte_mempool *tcpdump_pktmbuf_pool;
+static struct rxtx_cbs {
+ uint8_t port;
+ uint16_t queue;
+ struct rte_eth_rxtx_callback *rx_cb;
+ struct rte_eth_rxtx_callback *tx_cb;
+} cbs[54];
/**
* union for pipe fds.
@@ -644,6 +673,259 @@ rte_intr_disable(struct rte_intr_handle *intr_handle)
return 0;
}
+static inline void
+tcpdump_pktmbuf_duplicate(struct rte_mbuf *mi, struct rte_mbuf *m)
+{
+
+ mi->data_len = m->data_len;
+ mi->port = m->port;
+ mi->vlan_tci = m->vlan_tci;
+ mi->vlan_tci_outer = m->vlan_tci_outer;
+ mi->tx_offload = m->tx_offload;
+ mi->hash = m->hash;
+
+ mi->pkt_len = mi->data_len;
+ mi->ol_flags = m->ol_flags;
+ mi->packet_type = m->packet_type;
+
+ rte_memcpy(rte_pktmbuf_mtod(mi, void *),
+ rte_pktmbuf_mtod(m, void *),
+ rte_pktmbuf_data_len(mi));
+
+ __rte_mbuf_sanity_check(mi, 1);
+ __rte_mbuf_sanity_check(m, 0);
+}
+
+static inline struct rte_mbuf *
+tcpdump_pktmbuf_clone(struct rte_mbuf *md, struct rte_mempool *mp)
+{
+ struct rte_mbuf *mc, *mi, **prev;
+ uint32_t pktlen;
+ uint8_t nseg;
+
+ mc = rte_pktmbuf_alloc(mp);
+ if (unlikely(mc == NULL))
+ return NULL;
+
+ mi = mc;
+ prev = &mi->next;
+ pktlen = md->pkt_len;
+ nseg = 0;
+
+ do {
+ nseg++;
+ tcpdump_pktmbuf_duplicate(mi, md);
+ *prev = mi;
+ prev = &mi->next;
+ } while ((md = md->next) != NULL &&
+ (mi = rte_pktmbuf_alloc(mp)) != NULL);
+
+ *prev = NULL;
+ mc->nb_segs = nseg;
+ mc->pkt_len = pktlen;
+
+ /* Allocation of new indirect segment failed */
+ if (unlikely(mi == NULL)) {
+ rte_pktmbuf_free(mc);
+ return NULL;
+ }
+
+ __rte_mbuf_sanity_check(mc, 1);
+ return mc;
+
+}
+
+static int
+compare_filter(struct rte_mbuf *pkt)
+{
+ struct ipv4_hdr *pkt_hdr = rte_pktmbuf_mtod_offset(pkt, struct ipv4_hdr *,
+ sizeof(struct ether_hdr));
+ if (pkt_hdr->src_addr != src_ip_filter)
+ return -1;
+
+ return 0;
+}
+
+static uint16_t
+tcpdump_rx(uint8_t port __rte_unused, uint16_t qidx __rte_unused,
+ struct rte_mbuf **pkts, uint16_t nb_pkts,
+ uint16_t max_pkts __rte_unused, void *_ __rte_unused)
+{
+ unsigned i;
+ uint16_t filtered_pkts = 0;
+ int ring_enq = 0;
+ struct rte_mbuf *dup_bufs[nb_pkts];
+
+ for (i = 0; i < nb_pkts; i++) {
+ if (compare_filter(pkts[i]) == 0)
+ dup_bufs[filtered_pkts++] = tcpdump_pktmbuf_clone(pkts[i],
+ tcpdump_pktmbuf_pool);
+ }
+
+ ring_enq = rte_ring_enqueue_burst(prim_to_sec_rx, (void *)dup_bufs,
+ filtered_pkts);
+ if (unlikely(ring_enq < filtered_pkts)) {
+ do {
+ rte_pktmbuf_free(dup_bufs[ring_enq]);
+ } while (++ring_enq < filtered_pkts);
+ }
+ return nb_pkts;
+}
+
+static uint16_t
+tcpdump_tx(uint8_t port __rte_unused, uint16_t qidx __rte_unused,
+ struct rte_mbuf **pkts, uint16_t nb_pkts,
+ void *_ __rte_unused)
+{
+ int i;
+ int ring_enq = 0;
+ uint16_t filtered_pkts = 0;
+ struct rte_mbuf *dup_bufs[nb_pkts];
+
+ /*
+ * Increment reference count of mbuf to avoid accidental returrn of mbuf
+ * to pool while tcpdump processing is still on.
+ */
+ for (i = 0; i < nb_pkts; i++) {
+ if (compare_filter(pkts[i]) == 0) {
+ rte_pktmbuf_refcnt_update(pkts[i], 1);
+ dup_bufs[filtered_pkts++] = pkts[i];
+ }
+ }
+
+ ring_enq = rte_ring_enqueue_burst(prim_to_sec_tx, (void *)dup_bufs,
+ filtered_pkts);
+ if (unlikely(ring_enq < filtered_pkts)) {
+ do {
+ rte_pktmbuf_free(dup_bufs[ring_enq]);
+ } while (++ring_enq < filtered_pkts);
+ }
+ return nb_pkts;
+}
+
+static void
+tcpdump_create_mpool_n_rings(void)
+{
+ /* Create the mbuf pool */
+ tcpdump_pktmbuf_pool = rte_pktmbuf_pool_create("tcpdump_pktmbuf_pool", NUM_MBUFS,
+ MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
+ if (tcpdump_pktmbuf_pool == NULL)
+ rte_exit(EXIT_FAILURE, "Could not initialize tcpdump_pktmbuf_pool\n");
+
+ /* Create rings */
+ prim_to_sec_rx = rte_ring_create("prim_to_sec_rx", RING_SIZE, rte_socket_id(),
+ RING_F_SC_DEQ);
+ prim_to_sec_tx = rte_ring_create("prim_to_sec_tx", RING_SIZE, rte_socket_id(),
+ RING_F_SC_DEQ);
+}
+
+static void
+tcpdump_register_rxtx_callbacks(int port, int queue)
+{
+ static int cnt;
+
+ cbs[cnt].port = port;
+ cbs[cnt].queue = queue;
+ cbs[cnt].rx_cb = rte_eth_add_rx_callback(port, queue, tcpdump_rx, NULL);
+ cbs[cnt].tx_cb = rte_eth_add_tx_callback(port, queue, tcpdump_tx, NULL);
+ cnt++;
+}
+
+static void
+tcpdump_remove_rxtx_callbacks(int port, int queue)
+{
+ int i;
+
+ for (i = 0; i < MAX_CBS; i++) {
+ if ((cbs[i].port == port) && (cbs[i].queue == queue)) {
+ rte_eth_remove_rx_callback(port, queue, cbs[i].rx_cb);
+ rte_eth_remove_tx_callback(port, queue, cbs[i].tx_cb);
+ }
+ }
+}
+
+/* receive a request and return it */
+static int
+tcpdump_receive_request(int socket)
+{
+ char buffer[256];
+ char *buf;
+ int msg_type;
+
+ int port, queue;
+ int rval;
+ int buf_offset;
+
+ struct msghdr reg_cbs_msg;
+ struct iovec msg[3];
+
+ memset(®_cbs_msg, 0, sizeof(reg_cbs_msg));
+ reg_cbs_msg.msg_iov = msg;
+ reg_cbs_msg.msg_iovlen = 3;
+
+ msg[0].iov_base = (char *) &msg_type;
+ msg[0].iov_len = 1;
+
+ msg[1].iov_base = (char *) buffer;
+ msg[1].iov_len = sizeof(buffer);
+
+ msg[2].iov_base = (char *) &src_ip_filter;
+ msg[2].iov_len = sizeof(uint32_t);
+
+ rval = recvmsg(socket, ®_cbs_msg, 0);
+ if (rval < 0) {
+ RTE_LOG(ERR, EAL, "Error reading from file descriptor %d: %s\n",
+ socket,
+ strerror(errno));
+ return -1;
+ } else if (rval == 0) {
+ RTE_LOG(ERR, EAL, "Read nothing from file "
+ "descriptor %d\n", socket);
+ return -1;
+ }
+
+ buf = buffer;
+
+ /* Update port and queue */
+ while (sscanf(buf, "%*[^0123456789]%d%*[^0123456789]%d%n", &port,
+ &queue, &buf_offset) == 2) {
+ if (msg_type == 2)
+ tcpdump_register_rxtx_callbacks(port, queue);
+ else if (msg_type == 1)
+ tcpdump_remove_rxtx_callbacks(port, queue);
+ buf += buf_offset;
+ }
+
+ return 0;
+}
+
+static void
+tcpdump_socket_ready(int socket)
+{
+ for (;;) {
+ int conn_sock;
+ struct sockaddr_un addr;
+
+ socklen_t sockaddr_len = sizeof(addr);
+ /* this is a blocking call */
+ conn_sock = accept(socket, (struct sockaddr *) &addr, &sockaddr_len);
+ /* just restart on error */
+ if (conn_sock == -1)
+ continue;
+
+ /* set socket to linger after close */
+ struct linger l;
+
+ l.l_onoff = 1;
+ l.l_linger = 60;
+ setsockopt(conn_sock, SOL_SOCKET, SO_LINGER, &l, sizeof(l));
+
+ tcpdump_receive_request(conn_sock);
+ close(conn_sock);
+ break;
+ }
+}
+
static int
eal_intr_process_interrupts(struct epoll_event *events, int nfds)
{
@@ -655,6 +937,13 @@ eal_intr_process_interrupts(struct epoll_event *events, int nfds)
for (n = 0; n < nfds; n++) {
+ if (internal_config.process_type == RTE_PROC_PRIMARY) {
+
+ /** tcpdump socket fd */
+ if (events[n].data.fd == tcpdump_socket_fd)
+ tcpdump_socket_ready(tcpdump_socket_fd);
+ }
+
/**
* if the pipe fd is ready to read, return out to
* rebuild the wait list.
@@ -786,6 +1075,61 @@ eal_intr_handle_interrupts(int pfd, unsigned totalfds)
}
}
+/* get socket path (/var/run if root, $HOME otherwise) */
+ static void
+tcpdump_get_socket_path(char *buffer, int bufsz)
+{
+ const char *dir = "/var/run/tcpdump_socket";
+ const char *home_dir = getenv("HOME/tcpdump_socket");
+
+ if (getuid() != 0 && home_dir != NULL)
+ dir = home_dir;
+ mkdir(dir, 700);
+ /* use current prefix as file path */
+ snprintf(buffer, bufsz, TCPDUMP_SOCKET_PATH, dir);
+}
+
+static int
+tcpdump_create_primary_socket(void)
+{
+ int ret, socket_fd;
+ struct sockaddr_un addr;
+ socklen_t sockaddr_len;
+
+ /* set up a socket */
+ socket_fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+ if (socket_fd < 0) {
+ RTE_LOG(ERR, EAL, "Failed to create socket!\n");
+ return -1;
+ }
+
+ tcpdump_get_socket_path(addr.sun_path, sizeof(addr.sun_path));
+ addr.sun_family = AF_UNIX;
+ sockaddr_len = sizeof(struct sockaddr_un);
+
+ /* unlink() before bind() to remove the socket if it already exists */
+ unlink(addr.sun_path);
+
+ ret = bind(socket_fd, (struct sockaddr *) &addr, sockaddr_len);
+ if (ret) {
+ RTE_LOG(ERR, EAL, "Failed to bind socket: %s!\n", strerror(errno));
+ close(socket_fd);
+ return -1;
+ }
+
+ ret = listen(socket_fd, 1);
+ if (ret) {
+ RTE_LOG(ERR, EAL, "Failed to listen: %s!\n", strerror(errno));
+ close(socket_fd);
+ return -1;
+ }
+
+ /* save the socket in local configuration */
+ tcpdump_socket_fd = socket_fd;
+
+ return 0;
+}
+
/**
* It builds/rebuilds up the epoll file descriptor with all the
* file descriptors being waited on. Then handles the interrupts.
@@ -800,9 +1144,9 @@ static __attribute__((noreturn)) void *
eal_intr_thread_main(__rte_unused void *arg)
{
struct epoll_event ev;
-
/* host thread, never break out */
for (;;) {
+
/* build up the epoll fd with all descriptors we are to
* wait on then pass it to the handle_interrupts function
*/
@@ -829,6 +1173,23 @@ eal_intr_thread_main(__rte_unused void *arg)
}
numfds++;
+ /* build up the epoll fd with tcpdump descriptor.
+ */
+ static struct epoll_event tcpdump_event = {
+ .events = EPOLLIN | EPOLLPRI,
+ };
+
+ if (internal_config.process_type == RTE_PROC_PRIMARY) {
+ tcpdump_event.data.fd = tcpdump_socket_fd;
+ if (epoll_ctl(pfd, EPOLL_CTL_ADD, tcpdump_socket_fd,
+ &tcpdump_event) < 0) {
+ rte_panic("Error adding tcpdump socket fd to %d "
+ "epoll_ctl, %s\n",
+ tcpdump_socket_fd, strerror(errno));
+ }
+ numfds++;
+ }
+
rte_spinlock_lock(&intr_lock);
TAILQ_FOREACH(src, &intr_sources, next) {
@@ -877,6 +1238,16 @@ rte_eal_intr_init(void)
if (pipe(intr_pipe.pipefd) < 0)
return -1;
+ /* if primary, try to open tcpdump socket */
+ if (internal_config.process_type == RTE_PROC_PRIMARY) {
+ if (tcpdump_create_primary_socket() < 0) {
+ RTE_LOG(ERR, EAL, "Failed to set up tcpdump_socket_fd for "
+ "tcpdump in primary\n");
+ return -1;
+ }
+ tcpdump_create_mpool_n_rings();
+ }
+
/* create the host thread to wait/handle the interrupt */
ret = pthread_create(&intr_thread, NULL,
eal_intr_thread_main, NULL);