[dpdk-dev,v2,4/5] lib/librte_eal: add tcpdump support in primary process

Message ID 1455289045-25915-5-git-send-email-reshma.pattan@intel.com (mailing list archive)
State Superseded, archived
Headers

Commit Message

Pattan, Reshma Feb. 12, 2016, 2:57 p.m. UTC
Added tcpdump functionality to eal interrupt thread.

Enhanced interrupt thread to support tcpdump socket
and message processing from secondary.

Created new mempool and rings to handle packets of tcpdump.

Added rte_eth_rxtx_callbacks for ingress/egress packets processing
for tcpdump.

Added functionality to remove registered rte_eth_rxtx_callbacks
once secondary process is terminated.

Signed-off-by: Reshma Pattan <reshma.pattan@intel.com>
---
 lib/librte_eal/linuxapp/eal/Makefile         |    5 +-
 lib/librte_eal/linuxapp/eal/eal_interrupts.c |  375 +++++++++++++++++++++++++-
 2 files changed, 377 insertions(+), 3 deletions(-)
  

Comments

Pavel Fedin Feb. 17, 2016, 9:57 a.m. UTC | #1
Hello!

> +static int
> +compare_filter(struct rte_mbuf *pkt)
> +{
> +	struct ipv4_hdr *pkt_hdr = rte_pktmbuf_mtod_offset(pkt, struct ipv4_hdr *,
> +						sizeof(struct ether_hdr));
> +	if (pkt_hdr->src_addr != src_ip_filter)
> +		return -1;
> +
> +	return 0;
> +}

 Some critics to this...
 What if i want to capture packets coming from more than one host?
 What if i want to capture all packets?
 What if it's not IPv4 at all?

 May be this function should always return 0 if src_ip_filter == 0? This would at least be a quick way to disable filtering.

Kind regards,
Pavel Fedin
Senior Engineer
Samsung Electronics Research center Russia
  

Patch

diff --git a/lib/librte_eal/linuxapp/eal/Makefile b/lib/librte_eal/linuxapp/eal/Makefile
index 6e26250..425152c 100644
--- a/lib/librte_eal/linuxapp/eal/Makefile
+++ b/lib/librte_eal/linuxapp/eal/Makefile
@@ -1,6 +1,6 @@ 
 #   BSD LICENSE
 #
-#   Copyright(c) 2010-2015 Intel Corporation. All rights reserved.
+#   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
 #   All rights reserved.
 #
 #   Redistribution and use in source and binary forms, with or without
@@ -47,6 +47,9 @@  CFLAGS += -I$(RTE_SDK)/lib/librte_eal/common/include
 CFLAGS += -I$(RTE_SDK)/lib/librte_ring
 CFLAGS += -I$(RTE_SDK)/lib/librte_mempool
 CFLAGS += -I$(RTE_SDK)/lib/librte_ivshmem
+CFLAGS += -I$(RTE_SDK)/lib/librte_mbuf
+CFLAGS += -I$(RTE_SDK)/lib/librte_ether
+CFLAGS += -I$(RTE_SDK)/lib/librte_net
 CFLAGS += $(WERROR_FLAGS) -O3
 
 # specific to linuxapp exec-env
diff --git a/lib/librte_eal/linuxapp/eal/eal_interrupts.c b/lib/librte_eal/linuxapp/eal/eal_interrupts.c
index 06b26a9..dafe0bb 100644
--- a/lib/librte_eal/linuxapp/eal/eal_interrupts.c
+++ b/lib/librte_eal/linuxapp/eal/eal_interrupts.c
@@ -1,7 +1,7 @@ 
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
  *   All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
@@ -45,7 +45,11 @@ 
 #include <sys/signalfd.h>
 #include <sys/ioctl.h>
 #include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <sys/un.h>
 #include <assert.h>
+#include <arpa/inet.h>
+#include <sys/stat.h>
 
 #include <rte_common.h>
 #include <rte_interrupts.h>
@@ -65,15 +69,40 @@ 
 #include <rte_malloc.h>
 #include <rte_errno.h>
 #include <rte_spinlock.h>
+#include <rte_memcpy.h>
+#include <rte_mbuf.h>
+#include <rte_ethdev.h>
+#include <rte_ether.h>
+#include <rte_ip.h>
 
 #include "eal_private.h"
 #include "eal_vfio.h"
 #include "eal_thread.h"
+#include "eal_internal_cfg.h"
 
 #define EAL_INTR_EPOLL_WAIT_FOREVER (-1)
 #define NB_OTHER_INTR               1
+#define TCPDUMP_SOCKET_PATH "%s/tcpdump_mp_socket"
+#define TCPDUMP_SOCKET_ERR 0xFF
+#define TCPDUMP_REQ 0x1
+#define RING_SIZE 1024
+#define BURST_SIZE 32
+#define NUM_MBUFS 65536
+#define MBUF_CACHE_SIZE 250
+#define MAX_CBS 54
 
 static RTE_DEFINE_PER_LCORE(int, _epfd) = -1; /**< epoll fd per thread */
+static uint32_t src_ip_filter;
+static int tcpdump_socket_fd;
+struct rte_ring *prim_to_sec_rx;
+struct rte_ring *prim_to_sec_tx;
+struct rte_mempool *tcpdump_pktmbuf_pool;
+static struct rxtx_cbs {
+	uint8_t port;
+	uint16_t queue;
+	struct rte_eth_rxtx_callback *rx_cb;
+	struct rte_eth_rxtx_callback *tx_cb;
+} cbs[54];
 
 /**
  * union for pipe fds.
@@ -644,6 +673,259 @@  rte_intr_disable(struct rte_intr_handle *intr_handle)
 	return 0;
 }
 
+static inline void
+tcpdump_pktmbuf_duplicate(struct rte_mbuf *mi, struct rte_mbuf *m)
+{
+
+	mi->data_len = m->data_len;
+	mi->port = m->port;
+	mi->vlan_tci = m->vlan_tci;
+	mi->vlan_tci_outer = m->vlan_tci_outer;
+	mi->tx_offload = m->tx_offload;
+	mi->hash = m->hash;
+
+	mi->pkt_len = mi->data_len;
+	mi->ol_flags = m->ol_flags;
+	mi->packet_type = m->packet_type;
+
+	rte_memcpy(rte_pktmbuf_mtod(mi, void *),
+			rte_pktmbuf_mtod(m, void *),
+			rte_pktmbuf_data_len(mi));
+
+	__rte_mbuf_sanity_check(mi, 1);
+	__rte_mbuf_sanity_check(m, 0);
+}
+
+static inline struct rte_mbuf *
+tcpdump_pktmbuf_clone(struct rte_mbuf *md, struct rte_mempool *mp)
+{
+	struct rte_mbuf *mc, *mi, **prev;
+	uint32_t pktlen;
+	uint8_t nseg;
+
+	mc = rte_pktmbuf_alloc(mp);
+	if (unlikely(mc == NULL))
+		return NULL;
+
+	mi = mc;
+	prev = &mi->next;
+	pktlen = md->pkt_len;
+	nseg = 0;
+
+	do {
+		nseg++;
+		tcpdump_pktmbuf_duplicate(mi, md);
+		*prev = mi;
+		prev = &mi->next;
+	} while ((md = md->next) != NULL &&
+			(mi = rte_pktmbuf_alloc(mp)) != NULL);
+
+	*prev = NULL;
+	mc->nb_segs = nseg;
+	mc->pkt_len = pktlen;
+
+	/* Allocation of new indirect segment failed */
+	if (unlikely(mi == NULL)) {
+		rte_pktmbuf_free(mc);
+		return NULL;
+	}
+
+	__rte_mbuf_sanity_check(mc, 1);
+	return mc;
+
+}
+
+static int
+compare_filter(struct rte_mbuf *pkt)
+{
+	struct ipv4_hdr *pkt_hdr = rte_pktmbuf_mtod_offset(pkt, struct ipv4_hdr *,
+						sizeof(struct ether_hdr));
+	if (pkt_hdr->src_addr != src_ip_filter)
+		return -1;
+
+	return 0;
+}
+
+static uint16_t
+tcpdump_rx(uint8_t port __rte_unused, uint16_t qidx __rte_unused,
+	struct rte_mbuf **pkts, uint16_t nb_pkts,
+	uint16_t max_pkts __rte_unused, void *_ __rte_unused)
+{
+	unsigned i;
+	uint16_t filtered_pkts = 0;
+	int ring_enq = 0;
+	struct rte_mbuf *dup_bufs[nb_pkts];
+
+	for (i = 0; i < nb_pkts; i++) {
+		if (compare_filter(pkts[i]) == 0)
+			dup_bufs[filtered_pkts++] = tcpdump_pktmbuf_clone(pkts[i],
+							tcpdump_pktmbuf_pool);
+	}
+
+	ring_enq = rte_ring_enqueue_burst(prim_to_sec_rx, (void *)dup_bufs,
+						filtered_pkts);
+	if (unlikely(ring_enq < filtered_pkts)) {
+		do {
+			rte_pktmbuf_free(dup_bufs[ring_enq]);
+		} while (++ring_enq < filtered_pkts);
+	}
+	return nb_pkts;
+}
+
+static uint16_t
+tcpdump_tx(uint8_t port __rte_unused, uint16_t qidx __rte_unused,
+		struct rte_mbuf **pkts, uint16_t nb_pkts,
+		void *_ __rte_unused)
+{
+	int i;
+	int ring_enq = 0;
+	uint16_t filtered_pkts = 0;
+	struct rte_mbuf *dup_bufs[nb_pkts];
+
+	/*
+	 * Increment reference count of mbuf to avoid accidental returrn of mbuf
+	 * to pool while tcpdump processing is still on.
+	 */
+	for (i = 0; i < nb_pkts; i++) {
+		if (compare_filter(pkts[i]) == 0) {
+			rte_pktmbuf_refcnt_update(pkts[i], 1);
+			dup_bufs[filtered_pkts++] = pkts[i];
+		}
+	}
+
+	ring_enq = rte_ring_enqueue_burst(prim_to_sec_tx, (void *)dup_bufs,
+						filtered_pkts);
+	if (unlikely(ring_enq < filtered_pkts)) {
+		do {
+			rte_pktmbuf_free(dup_bufs[ring_enq]);
+		} while (++ring_enq < filtered_pkts);
+	}
+	return nb_pkts;
+}
+
+static void
+tcpdump_create_mpool_n_rings(void)
+{
+	/* Create the mbuf pool */
+	tcpdump_pktmbuf_pool = rte_pktmbuf_pool_create("tcpdump_pktmbuf_pool", NUM_MBUFS,
+			MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
+	if (tcpdump_pktmbuf_pool == NULL)
+		rte_exit(EXIT_FAILURE, "Could not initialize tcpdump_pktmbuf_pool\n");
+
+	/* Create rings */
+	prim_to_sec_rx = rte_ring_create("prim_to_sec_rx", RING_SIZE, rte_socket_id(),
+						RING_F_SC_DEQ);
+	prim_to_sec_tx = rte_ring_create("prim_to_sec_tx", RING_SIZE, rte_socket_id(),
+						RING_F_SC_DEQ);
+}
+
+static void
+tcpdump_register_rxtx_callbacks(int port, int queue)
+{
+	static int cnt;
+
+	cbs[cnt].port = port;
+	cbs[cnt].queue = queue;
+	cbs[cnt].rx_cb = rte_eth_add_rx_callback(port, queue, tcpdump_rx, NULL);
+	cbs[cnt].tx_cb = rte_eth_add_tx_callback(port, queue, tcpdump_tx, NULL);
+	cnt++;
+}
+
+static void
+tcpdump_remove_rxtx_callbacks(int port, int queue)
+{
+	int i;
+
+	for (i = 0; i < MAX_CBS; i++) {
+		if ((cbs[i].port == port) && (cbs[i].queue == queue)) {
+			rte_eth_remove_rx_callback(port, queue, cbs[i].rx_cb);
+			rte_eth_remove_tx_callback(port, queue, cbs[i].tx_cb);
+		}
+	}
+}
+
+/* receive a request and return it */
+static int
+tcpdump_receive_request(int socket)
+{
+	char buffer[256];
+	char *buf;
+	int msg_type;
+
+	int port, queue;
+	int rval;
+	int buf_offset;
+
+	struct msghdr reg_cbs_msg;
+	struct iovec msg[3];
+
+	memset(&reg_cbs_msg, 0, sizeof(reg_cbs_msg));
+	reg_cbs_msg.msg_iov = msg;
+	reg_cbs_msg.msg_iovlen = 3;
+
+	msg[0].iov_base = (char *) &msg_type;
+	msg[0].iov_len = 1;
+
+	msg[1].iov_base = (char *) buffer;
+	msg[1].iov_len = sizeof(buffer);
+
+	msg[2].iov_base	= (char *) &src_ip_filter;
+	msg[2].iov_len = sizeof(uint32_t);
+
+	rval = recvmsg(socket, &reg_cbs_msg, 0);
+	if (rval < 0) {
+		RTE_LOG(ERR, EAL, "Error reading from file descriptor %d: %s\n",
+				socket,
+				strerror(errno));
+		return -1;
+	} else if (rval == 0) {
+		RTE_LOG(ERR, EAL, "Read nothing from file "
+				"descriptor %d\n", socket);
+		return -1;
+	}
+
+	buf = buffer;
+
+	/* Update port and queue */
+	while (sscanf(buf, "%*[^0123456789]%d%*[^0123456789]%d%n", &port,
+				&queue, &buf_offset) == 2) {
+		if (msg_type == 2)
+			tcpdump_register_rxtx_callbacks(port, queue);
+		else if (msg_type == 1)
+			tcpdump_remove_rxtx_callbacks(port, queue);
+		buf += buf_offset;
+	}
+
+	return 0;
+}
+
+static void
+tcpdump_socket_ready(int socket)
+{
+	for (;;) {
+		int conn_sock;
+		struct sockaddr_un addr;
+
+		socklen_t sockaddr_len = sizeof(addr);
+		/* this is a blocking call */
+		conn_sock = accept(socket, (struct sockaddr *) &addr, &sockaddr_len);
+		/* just restart on error */
+		if (conn_sock == -1)
+			continue;
+
+		/* set socket to linger after close */
+		struct linger l;
+
+		l.l_onoff = 1;
+		l.l_linger = 60;
+		setsockopt(conn_sock, SOL_SOCKET, SO_LINGER, &l, sizeof(l));
+
+		tcpdump_receive_request(conn_sock);
+		close(conn_sock);
+		break;
+	}
+}
+
 static int
 eal_intr_process_interrupts(struct epoll_event *events, int nfds)
 {
@@ -655,6 +937,13 @@  eal_intr_process_interrupts(struct epoll_event *events, int nfds)
 
 	for (n = 0; n < nfds; n++) {
 
+		if (internal_config.process_type == RTE_PROC_PRIMARY) {
+
+			/** tcpdump socket fd */
+			if (events[n].data.fd ==  tcpdump_socket_fd)
+				tcpdump_socket_ready(tcpdump_socket_fd);
+		}
+
 		/**
 		 * if the pipe fd is ready to read, return out to
 		 * rebuild the wait list.
@@ -786,6 +1075,61 @@  eal_intr_handle_interrupts(int pfd, unsigned totalfds)
 	}
 }
 
+/* get socket path (/var/run if root, $HOME otherwise) */
+	static void
+tcpdump_get_socket_path(char *buffer, int bufsz)
+{
+	const char *dir = "/var/run/tcpdump_socket";
+	const char *home_dir = getenv("HOME/tcpdump_socket");
+
+	if (getuid() != 0 && home_dir != NULL)
+		dir = home_dir;
+	mkdir(dir, 700);
+	/* use current prefix as file path */
+	snprintf(buffer, bufsz, TCPDUMP_SOCKET_PATH, dir);
+}
+
+static int
+tcpdump_create_primary_socket(void)
+{
+	int ret, socket_fd;
+	struct sockaddr_un addr;
+	socklen_t sockaddr_len;
+
+	/* set up a socket */
+	socket_fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+	if (socket_fd < 0) {
+		RTE_LOG(ERR, EAL, "Failed to create socket!\n");
+		return -1;
+	}
+
+	tcpdump_get_socket_path(addr.sun_path, sizeof(addr.sun_path));
+	addr.sun_family = AF_UNIX;
+	sockaddr_len = sizeof(struct sockaddr_un);
+
+	/* unlink() before bind() to remove the socket if it already exists */
+	unlink(addr.sun_path);
+
+	ret = bind(socket_fd, (struct sockaddr *) &addr, sockaddr_len);
+	if (ret) {
+		RTE_LOG(ERR, EAL, "Failed to bind socket: %s!\n", strerror(errno));
+		close(socket_fd);
+		return -1;
+	}
+
+	ret = listen(socket_fd, 1);
+	if (ret) {
+		RTE_LOG(ERR, EAL, "Failed to listen: %s!\n", strerror(errno));
+		close(socket_fd);
+		return -1;
+	}
+
+	/* save the socket in local configuration */
+	tcpdump_socket_fd = socket_fd;
+
+	return 0;
+}
+
 /**
  * It builds/rebuilds up the epoll file descriptor with all the
  * file descriptors being waited on. Then handles the interrupts.
@@ -800,9 +1144,9 @@  static __attribute__((noreturn)) void *
 eal_intr_thread_main(__rte_unused void *arg)
 {
 	struct epoll_event ev;
-
 	/* host thread, never break out */
 	for (;;) {
+
 		/* build up the epoll fd with all descriptors we are to
 		 * wait on then pass it to the handle_interrupts function
 		 */
@@ -829,6 +1173,23 @@  eal_intr_thread_main(__rte_unused void *arg)
 		}
 		numfds++;
 
+		/* build up the epoll fd with tcpdump descriptor.
+		 */
+		static struct epoll_event tcpdump_event = {
+			.events = EPOLLIN | EPOLLPRI,
+		};
+
+		if (internal_config.process_type == RTE_PROC_PRIMARY) {
+			tcpdump_event.data.fd = tcpdump_socket_fd;
+			if (epoll_ctl(pfd, EPOLL_CTL_ADD, tcpdump_socket_fd,
+						&tcpdump_event) < 0) {
+				rte_panic("Error adding tcpdump socket fd to %d "
+						"epoll_ctl, %s\n",
+						tcpdump_socket_fd, strerror(errno));
+			}
+			numfds++;
+		}
+
 		rte_spinlock_lock(&intr_lock);
 
 		TAILQ_FOREACH(src, &intr_sources, next) {
@@ -877,6 +1238,16 @@  rte_eal_intr_init(void)
 	if (pipe(intr_pipe.pipefd) < 0)
 		return -1;
 
+	/* if primary, try to open tcpdump socket */
+	if (internal_config.process_type == RTE_PROC_PRIMARY) {
+		if (tcpdump_create_primary_socket() < 0) {
+			RTE_LOG(ERR, EAL, "Failed to set up tcpdump_socket_fd for "
+					"tcpdump in primary\n");
+			return -1;
+		}
+		tcpdump_create_mpool_n_rings();
+	}
+
 	/* create the host thread to wait/handle the interrupt */
 	ret = pthread_create(&intr_thread, NULL,
 			eal_intr_thread_main, NULL);