Message ID | 1412073577-12248-1-git-send-email-reshma.pattan@intel.com (mailing list archive) |
---|---|
State | Superseded, archived |
Headers |
Return-Path: <dev-bounces@dpdk.org> X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [IPv6:::1]) by dpdk.org (Postfix) with ESMTP id 147CC7E23; Tue, 30 Sep 2014 12:33:08 +0200 (CEST) Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id 7C4E87E20 for <dev@dpdk.org>; Tue, 30 Sep 2014 12:33:05 +0200 (CEST) Received: from orsmga002.jf.intel.com ([10.7.209.21]) by orsmga101.jf.intel.com with ESMTP; 30 Sep 2014 03:39:44 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.04,625,1406617200"; d="scan'208";a="610799671" Received: from irvmail001.ir.intel.com ([163.33.26.43]) by orsmga002.jf.intel.com with ESMTP; 30 Sep 2014 03:39:42 -0700 Received: from sivswdev02.ir.intel.com (sivswdev02.ir.intel.com [10.237.217.46]) by irvmail001.ir.intel.com (8.14.3/8.13.6/MailSET/Hub) with ESMTP id s8UAdfaB015575; Tue, 30 Sep 2014 11:39:41 +0100 Received: from sivswdev02.ir.intel.com (localhost [127.0.0.1]) by sivswdev02.ir.intel.com with ESMTP id s8UAdfBe012288; Tue, 30 Sep 2014 11:39:41 +0100 Received: (from reshmapa@localhost) by sivswdev02.ir.intel.com with id s8UAdeW3012284; Tue, 30 Sep 2014 11:39:40 +0100 From: reshmapa <reshma.pattan@intel.com> To: dev@dpdk.org Date: Tue, 30 Sep 2014 11:39:37 +0100 Message-Id: <1412073577-12248-1-git-send-email-reshma.pattan@intel.com> X-Mailer: git-send-email 1.7.4.1 In-Reply-To: <1411568210-2555-1-git-send-email-reshma.pattan@intel.com> References: <1411568210-2555-1-git-send-email-reshma.pattan@intel.com> Subject: [dpdk-dev] [PATCH v3] distributor_app: new sample app X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: patches and discussions about DPDK <dev.dpdk.org> List-Unsubscribe: <http://dpdk.org/ml/options/dev>, <mailto:dev-request@dpdk.org?subject=unsubscribe> List-Archive: <http://dpdk.org/ml/archives/dev/> List-Post: <mailto:dev@dpdk.org> List-Help: <mailto:dev-request@dpdk.org?subject=help> List-Subscribe: <http://dpdk.org/ml/listinfo/dev>, <mailto:dev-request@dpdk.org?subject=subscribe> Errors-To: dev-bounces@dpdk.org Sender: "dev" <dev-bounces@dpdk.org> |
Commit Message
Pattan, Reshma
Sept. 30, 2014, 10:39 a.m. UTC
From: Reshma Pattan <reshma.pattan@intel.com> A new sample app that shows the usage of the distributor library. This app works as follows: * An RX thread runs which pulls packets from each ethernet port in turn and passes those packets to worker using a distributor component. * The workers take the packets in turn, and determine the output port for those packets using basic l2forwarding doing an xor on the source port id. * The RX thread takes the returned packets from the workers and enqueue those packets into an rte_ring structure. * A TX thread pulls the packets off the rte_ring structure and then sends each packet out the output port specified previously by the worker * Command-line option support provided only for portmask. Signed-off-by: Bruce Richardson <bruce.richardson@intel.com> Signed-off-by: Reshma Pattan <reshma.pattan@intel.com> --- examples/Makefile | 1 + examples/distributor_app/Makefile | 57 ++++ examples/distributor_app/main.c | 600 +++++++++++++++++++++++++++++++++++++ examples/distributor_app/main.h | 46 +++ 4 files changed, 704 insertions(+), 0 deletions(-) create mode 100644 examples/distributor_app/Makefile create mode 100644 examples/distributor_app/main.c create mode 100644 examples/distributor_app/main.h
Comments
On Tue, Sep 30, 2014 at 11:39:37AM +0100, reshmapa wrote: > From: Reshma Pattan <reshma.pattan@intel.com> > > A new sample app that shows the usage of the distributor library. This > app works as follows: > > * An RX thread runs which pulls packets from each ethernet port in turn > and passes those packets to worker using a distributor component. > * The workers take the packets in turn, and determine the output port > for those packets using basic l2forwarding doing an xor on the source > port id. > * The RX thread takes the returned packets from the workers and enqueue > those packets into an rte_ring structure. > * A TX thread pulls the packets off the rte_ring structure and then > sends each packet out the output port specified previously by the worker > * Command-line option support provided only for portmask. > > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com> > Signed-off-by: Reshma Pattan <reshma.pattan@intel.com> > --- > examples/Makefile | 1 + > examples/distributor_app/Makefile | 57 ++++ > examples/distributor_app/main.c | 600 +++++++++++++++++++++++++++++++++++++ > examples/distributor_app/main.h | 46 +++ > 4 files changed, 704 insertions(+), 0 deletions(-) > create mode 100644 examples/distributor_app/Makefile > create mode 100644 examples/distributor_app/main.c > create mode 100644 examples/distributor_app/main.h > > diff --git a/examples/Makefile b/examples/Makefile > index 6245f83..2ba82b0 100644 > --- a/examples/Makefile > +++ b/examples/Makefile > @@ -66,5 +66,6 @@ DIRS-y += vhost > DIRS-$(CONFIG_RTE_LIBRTE_XEN_DOM0) += vhost_xen > DIRS-y += vmdq > DIRS-y += vmdq_dcb > +DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += distributor_app > > include $(RTE_SDK)/mk/rte.extsubdir.mk > diff --git a/examples/distributor_app/Makefile b/examples/distributor_app/Makefile > new file mode 100644 > index 0000000..6a5bada > --- /dev/null > +++ b/examples/distributor_app/Makefile > @@ -0,0 +1,57 @@ > +# BSD LICENSE > +# > +# Copyright(c) 2010-2014 Intel Corporation. All rights reserved. > +# All rights reserved. > +# > +# Redistribution and use in source and binary forms, with or without > +# modification, are permitted provided that the following conditions > +# are met: > +# > +# * Redistributions of source code must retain the above copyright > +# notice, this list of conditions and the following disclaimer. > +# * Redistributions in binary form must reproduce the above copyright > +# notice, this list of conditions and the following disclaimer in > +# the documentation and/or other materials provided with the > +# distribution. > +# * Neither the name of Intel Corporation nor the names of its > +# contributors may be used to endorse or promote products derived > +# from this software without specific prior written permission. > +# > +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS > +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT > +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR > +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT > +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, > +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT > +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, > +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY > +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE > +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. > + > +ifeq ($(RTE_SDK),) > +$(error "Please define RTE_SDK environment variable") > +endif > + > +# Default target, can be overriden by command line or environment > +RTE_TARGET ?= x86_64-native-linuxapp-gcc > + > +include $(RTE_SDK)/mk/rte.vars.mk > + > +# binary name > +APP = distributor_app > + > +# all source are stored in SRCS-y > +SRCS-y := main.c > + > +CFLAGS += $(WERROR_FLAGS) > + > +# workaround for a gcc bug with noreturn attribute > +# http://gcc.gnu.org/bugzilla/show_bug.cgi?id=12603 > +ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y) > +CFLAGS_main.o += -Wno-return-type > +endif > + > +EXTRA_CFLAGS += -O3 -Wfatal-errors > + > +include $(RTE_SDK)/mk/rte.extapp.mk > diff --git a/examples/distributor_app/main.c b/examples/distributor_app/main.c > new file mode 100644 > index 0000000..f555d93 > --- /dev/null > +++ b/examples/distributor_app/main.c > @@ -0,0 +1,600 @@ > +/*- > + * BSD LICENSE > + * > + * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. > + * All rights reserved. > + * > + * Redistribution and use in source and binary forms, with or without > + * modification, are permitted provided that the following conditions > + * are met: > + * > + * * Redistributions of source code must retain the above copyright > + * notice, this list of conditions and the following disclaimer. > + * * Redistributions in binary form must reproduce the above copyright > + * notice, this list of conditions and the following disclaimer in > + * the documentation and/or other materials provided with the > + * distribution. > + * * Neither the name of Intel Corporation nor the names of its > + * contributors may be used to endorse or promote products derived > + * from this software without specific prior written permission. > + * > + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS > + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT > + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT > + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, > + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT > + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, > + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY > + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE > + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. > + */ > + > +#include <stdint.h> > +#include <inttypes.h> > +#include <unistd.h> > +#include <signal.h> > +#include <getopt.h> > + > +#include <rte_eal.h> > +#include <rte_ethdev.h> > +#include <rte_cycles.h> > +#include <rte_malloc.h> > +#include <rte_debug.h> > +#include <rte_distributor.h> > + > +#include "main.h" > + > +#define RX_RING_SIZE 256 > +#define RX_FREE_THRESH 32 > +#define RX_PTHRESH 8 > +#define RX_HTHRESH 8 > +#define RX_WTHRESH 0 > + > +#define TX_RING_SIZE 512 > +#define TX_FREE_THRESH 32 > +#define TX_PTHRESH 32 > +#define TX_HTHRESH 0 > +#define TX_WTHRESH 0 > +#define TX_RSBIT_THRESH 32 > +#define TX_Q_FLAGS (ETH_TXQ_FLAGS_NOMULTSEGS | ETH_TXQ_FLAGS_NOVLANOFFL |\ > + ETH_TXQ_FLAGS_NOXSUMSCTP | ETH_TXQ_FLAGS_NOXSUMUDP | \ > + ETH_TXQ_FLAGS_NOXSUMTCP) > + > +#define NUM_MBUFS ((64*1024)-1) > +#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM) > +#define MBUF_CACHE_SIZE 250 > +#define BURST_SIZE 32 > +#define RTE_RING_SZ 1024 > + > +/* uncommnet below line to enable debug logs */ > +/* #define DEBUG */ > + > +#ifdef DEBUG > +#define LOG_LEVEL RTE_LOG_DEBUG > +#define LOG_DEBUG(log_type, fmt, args...) do { \ > + RTE_LOG(DEBUG, log_type, fmt, ##args) \ > +} while (0) > +#else > +#define LOG_LEVEL RTE_LOG_INFO > +#define LOG_DEBUG(log_type, fmt, args...) do {} while (0) > +#endif > + > +#define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1 > + > +/* mask of enabled ports */ > +static uint32_t enabled_port_mask = 0; > + > +static volatile struct app_stats { > + struct { > + uint64_t rx_pkts; > + uint64_t returned_pkts; > + uint64_t enqueued_pkts; > + } rx __rte_cache_aligned; > + > + struct { > + uint64_t dequeue_pkts; > + uint64_t tx_pkts; > + } tx __rte_cache_aligned; > +} app_stats; > + > +static const struct rte_eth_conf port_conf_default = { > + .rxmode = { > + .mq_mode = ETH_MQ_RX_RSS, > + .max_rx_pkt_len = ETHER_MAX_LEN, > + .split_hdr_size = 0, > + .header_split = 0, /**< Header Split disabled */ > + .hw_ip_checksum = 0, /**< IP checksum offload enabled */ > + .hw_vlan_filter = 0, /**< VLAN filtering disabled */ > + .jumbo_frame = 0, /**< Jumbo Frame Support disabled */ > + .hw_strip_crc = 0, /**< CRC stripped by hardware */ > + }, > + .txmode = { > + .mq_mode = ETH_MQ_TX_NONE, > + }, > + .lpbk_mode = 0, > + .rx_adv_conf = { > + .rss_conf = { > + .rss_hf = ETH_RSS_IPV4 | ETH_RSS_IPV6 | > + ETH_RSS_IPV4_TCP | ETH_RSS_IPV4_UDP | > + ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP, > + } > + }, > +}; > + > +static const struct rte_eth_rxconf rx_conf_default = { > + .rx_thresh = { > + .pthresh = RX_PTHRESH, > + .hthresh = RX_HTHRESH, > + .wthresh = RX_WTHRESH, > + }, > + .rx_free_thresh = RX_FREE_THRESH, > + .rx_drop_en = 0, > +}; > + > +static const struct rte_eth_txconf tx_conf_default = { > + .tx_thresh = { > + .pthresh = TX_PTHRESH, > + .hthresh = TX_HTHRESH, > + .wthresh = TX_WTHRESH, > + }, > + .tx_free_thresh = TX_FREE_THRESH, > + .tx_rs_thresh = TX_RSBIT_THRESH, > + .txq_flags = TX_Q_FLAGS > + > +}; > + > +struct output_buffer { > + unsigned count; > + struct rte_mbuf *mbufs[BURST_SIZE]; > +}; > + > +/* > + * Initialises a given port using global settings and with the rx buffers > + * coming from the mbuf_pool passed as parameter > + */ > +static inline int > +port_init(uint8_t port, struct rte_mempool *mbuf_pool) > +{ > + struct rte_eth_conf port_conf = port_conf_default; > + const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1; > + int retval; > + uint16_t q; > + > + if (port >= rte_eth_dev_count()) > + return -1; > + > + retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf); > + if (retval != 0) > + return retval; > + > + for (q = 0; q < rxRings; q++) { > + retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE, > + rte_eth_dev_socket_id(port), > + &rx_conf_default, mbuf_pool); > + if (retval < 0) > + return retval; > + } > + > + for (q = 0; q < txRings; q++) { > + retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE, > + rte_eth_dev_socket_id(port), > + &tx_conf_default); > + if (retval < 0) > + return retval; > + } > + > + retval = rte_eth_dev_start(port); > + if (retval < 0) > + return retval; > + > + struct rte_eth_link link; > + rte_eth_link_get_nowait(port, &link); > + if (!link.link_status) { > + sleep(1); > + rte_eth_link_get_nowait(port, &link); > + } > + > + if (!link.link_status) { > + printf("Link down on port %"PRIu8"\n", port); > + return 0; > + } > + > + struct ether_addr addr; > + rte_eth_macaddr_get(port, &addr); > + printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 > + " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", > + (unsigned)port, > + addr.addr_bytes[0], addr.addr_bytes[1], > + addr.addr_bytes[2], addr.addr_bytes[3], > + addr.addr_bytes[4], addr.addr_bytes[5]); > + > + rte_eth_promiscuous_enable(port); > + > + return 0; > +} > + > +struct lcore_params { > + unsigned worker_id; > + struct rte_distributor *d; > + struct rte_ring *r; > +}; > + > +static __attribute__((noreturn)) void > +lcore_rx(struct lcore_params *p) > +{ > + struct rte_distributor *d = p->d; > + struct rte_ring *r = p->r; > + const uint8_t nb_ports = rte_eth_dev_count(); > + const int socket_id = rte_socket_id(); > + uint8_t port; > + > + for (port = 0; port < nb_ports; port++) { > + /* skip ports that are not enabled */ > + if ((enabled_port_mask & (1 << port)) == 0) > + continue; > + > + if (rte_eth_dev_socket_id(port) > 0 && > + rte_eth_dev_socket_id(port) != socket_id) > + printf("WARNING, port %u is on remote NUMA node to " > + "RX thread.\n\tPerformance will not " > + "be optimal.\n", port); > + } > + > + printf("\nCore %u doing packet RX.\n", rte_lcore_id()); > + port = 0; > + for (;;) { > + /* skip ports that are not enabled */ > + if ((enabled_port_mask & (1 << port)) == 0) { > + if (++port == nb_ports) > + port = 0; > + continue; > + } > + struct rte_mbuf *bufs[BURST_SIZE*2]; > + const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, > + BURST_SIZE); > + app_stats.rx.rx_pkts += nb_rx; > + > + rte_distributor_process(d, bufs, nb_rx); > + const uint16_t nb_ret = rte_distributor_returned_pkts(d, > + bufs, BURST_SIZE*2); > + app_stats.rx.returned_pkts += nb_ret; > + if (unlikely(nb_ret == 0)) > + continue; > + > + uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret); > + app_stats.rx.enqueued_pkts += sent; > + if (unlikely(sent < nb_ret)) { > + LOG_DEBUG(DISTRAPP, "%s:Packet loss due to full ring\n", __func__); > + while (sent < nb_ret) > + rte_pktmbuf_free(bufs[sent++]); > + } > + if (++port == nb_ports) > + port = 0; > + } > +} > + > +static inline void > +flush_one_port(struct output_buffer *outbuf, uint8_t outp) > +{ > + unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs, > + outbuf->count); > + app_stats.tx.tx_pkts += nb_tx; > + > + if (unlikely(nb_tx < outbuf->count)) { > + LOG_DEBUG(DISTRAPP, "%s:Packet loss with tx_burst\n", __func__); > + do { > + rte_pktmbuf_free(outbuf->mbufs[nb_tx]); > + } while (++nb_tx < outbuf->count); > + } > + outbuf->count = 0; > +} > + > +static inline void > +flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports) > +{ > + uint8_t outp; > + for (outp = 0; outp < nb_ports; outp++) { > + /* skip ports that are not enabled */ > + if ((enabled_port_mask & (1 << outp)) == 0) > + continue; > + > + if (tx_buffers[outp].count == 0) > + continue; > + > + flush_one_port(&tx_buffers[outp], outp); > + } > +} > + > +static __attribute__((noreturn)) void > +lcore_tx(struct rte_ring *in_r) > +{ > + static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS]; > + const uint8_t nb_ports = rte_eth_dev_count(); > + const int socket_id = rte_socket_id(); > + uint8_t port; > + > + for (port = 0; port < nb_ports; port++) { > + /* skip ports that are not enabled */ > + if ((enabled_port_mask & (1 << port)) == 0) > + continue; > + > + if (rte_eth_dev_socket_id(port) > 0 && > + rte_eth_dev_socket_id(port) != socket_id) > + printf("WARNING, port %u is on remote NUMA node to " > + "TX thread.\n\tPerformance will not " > + "be optimal.\n", port); > + } > + > + printf("\nCore %u doing packet TX.\n", rte_lcore_id()); > + for (;;) { > + for (port = 0; port < nb_ports; port++) { > + /* skip ports that are not enabled */ > + if ((enabled_port_mask & (1 << port)) == 0) > + continue; > + > + struct rte_mbuf *bufs[BURST_SIZE]; > + const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, > + (void *)bufs, BURST_SIZE); > + app_stats.tx.dequeue_pkts += nb_rx; > + > + /* if we get no traffic, flush anything we have */ > + if (unlikely(nb_rx == 0)) { > + flush_all_ports(tx_buffers, nb_ports); > + continue; > + } > + > + /* for traffic we receive, queue it up for transmit */ > + uint16_t i; > + _mm_prefetch(bufs[0], 0); > + _mm_prefetch(bufs[1], 0); > + _mm_prefetch(bufs[2], 0); > + for (i = 0; i < nb_rx; i++) { > + struct output_buffer *outbuf; > + uint8_t outp; > + _mm_prefetch(bufs[i + 3], 0); > + /* workers should update in_port to hold the > + * output port value */ > + outp = bufs[i]->port; > + /* skip ports that are not enabled */ > + if ((enabled_port_mask & (1 << outp)) == 0) > + continue; > + > + outbuf = &tx_buffers[outp]; > + outbuf->mbufs[outbuf->count++] = bufs[i]; > + if (outbuf->count == BURST_SIZE) > + flush_one_port(outbuf, outp); > + } > + } > + } > +} > + > + > +static __attribute__((noreturn)) void > +lcore_worker(struct lcore_params *p) > +{ > + struct rte_distributor *d = p->d; > + const unsigned id = p->worker_id; > + /* for single port, xor_val will be zero so we won't modify the output > + * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa > + */ > + const unsigned xor_val = (rte_eth_dev_count() > 1); > + struct rte_mbuf *buf = NULL; > + > + printf("\nCore %u acting as worker core.\n", rte_lcore_id()); > + for (;;) { > + buf = rte_distributor_get_pkt(d, id, buf); > + buf->port ^= xor_val; > + } > +} > + > +static void > +int_handler(int sig_num) > +{ > + struct rte_eth_stats eth_stats; > + unsigned i; > + > + printf("Exiting on signal %d\n", sig_num); > + > + printf("\nRX thread stats:\n"); > + printf(" - Received: %"PRIu64"\n", app_stats.rx.rx_pkts); > + printf(" - Processed: %"PRIu64"\n", app_stats.rx.returned_pkts); > + printf(" - Enqueued: %"PRIu64"\n", app_stats.rx.enqueued_pkts); > + > + printf("\nTX thread stats:\n"); > + printf(" - Dequeued: %"PRIu64"\n", app_stats.tx.dequeue_pkts); > + printf(" - Transmitted: %"PRIu64"\n", app_stats.tx.tx_pkts); > + > + for (i = 0; i < rte_eth_dev_count(); i++) { > + rte_eth_stats_get(i, ð_stats); > + printf("\nPort %u stats:\n", i); > + printf(" - Pkts in: %"PRIu64"\n", eth_stats.ipackets); > + printf(" - Pkts out: %"PRIu64"\n", eth_stats.opackets); > + printf(" - In Errs: %"PRIu64"\n", eth_stats.ierrors); > + printf(" - Out Errs: %"PRIu64"\n", eth_stats.oerrors); > + printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf); > + } > + exit(0); rte_exit here? Also, this is a pretty ungraceful exit strategy as all the threads you've created and memory you've allocated are just forgotten here. Given that dpdk mempools are shared, this has the potential to leak lots of memory if other apps are using the dpdk at the same time that you run this. You probably want to use the sigint handler to raise a flag to the tx/rx threads to shutdown gracefully, and then free your allocated memory and mempool. Neil
On Tue, Sep 30, 2014 at 07:34:45AM -0400, Neil Horman wrote: > On Tue, Sep 30, 2014 at 11:39:37AM +0100, reshmapa wrote: > > From: Reshma Pattan <reshma.pattan@intel.com> > > > > A new sample app that shows the usage of the distributor library. This > > app works as follows: > > > > * An RX thread runs which pulls packets from each ethernet port in turn > > and passes those packets to worker using a distributor component. > > * The workers take the packets in turn, and determine the output port > > for those packets using basic l2forwarding doing an xor on the source > > port id. > > * The RX thread takes the returned packets from the workers and enqueue > > those packets into an rte_ring structure. > > * A TX thread pulls the packets off the rte_ring structure and then > > sends each packet out the output port specified previously by the worker > > * Command-line option support provided only for portmask. > > > > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com> > > Signed-off-by: Reshma Pattan <reshma.pattan@intel.com> > > --- > > examples/Makefile | 1 + > > examples/distributor_app/Makefile | 57 ++++ > > examples/distributor_app/main.c | 600 +++++++++++++++++++++++++++++++++++++ > > examples/distributor_app/main.h | 46 +++ > > 4 files changed, 704 insertions(+), 0 deletions(-) > > create mode 100644 examples/distributor_app/Makefile > > create mode 100644 examples/distributor_app/main.c > > create mode 100644 examples/distributor_app/main.h > > > > diff --git a/examples/Makefile b/examples/Makefile > > index 6245f83..2ba82b0 100644 > > --- a/examples/Makefile > > +++ b/examples/Makefile > > @@ -66,5 +66,6 @@ DIRS-y += vhost > > DIRS-$(CONFIG_RTE_LIBRTE_XEN_DOM0) += vhost_xen > > DIRS-y += vmdq > > DIRS-y += vmdq_dcb > > +DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += distributor_app > > > > include $(RTE_SDK)/mk/rte.extsubdir.mk > > diff --git a/examples/distributor_app/Makefile b/examples/distributor_app/Makefile > > new file mode 100644 > > index 0000000..6a5bada > > --- /dev/null > > +++ b/examples/distributor_app/Makefile > > @@ -0,0 +1,57 @@ > > +# BSD LICENSE > > +# > > +# Copyright(c) 2010-2014 Intel Corporation. All rights reserved. > > +# All rights reserved. > > +# > > +# Redistribution and use in source and binary forms, with or without > > +# modification, are permitted provided that the following conditions > > +# are met: > > +# > > +# * Redistributions of source code must retain the above copyright > > +# notice, this list of conditions and the following disclaimer. > > +# * Redistributions in binary form must reproduce the above copyright > > +# notice, this list of conditions and the following disclaimer in > > +# the documentation and/or other materials provided with the > > +# distribution. > > +# * Neither the name of Intel Corporation nor the names of its > > +# contributors may be used to endorse or promote products derived > > +# from this software without specific prior written permission. > > +# > > +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS > > +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT > > +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR > > +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT > > +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, > > +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT > > +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, > > +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY > > +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > > +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE > > +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. > > + > > +ifeq ($(RTE_SDK),) > > +$(error "Please define RTE_SDK environment variable") > > +endif > > + > > +# Default target, can be overriden by command line or environment > > +RTE_TARGET ?= x86_64-native-linuxapp-gcc > > + > > +include $(RTE_SDK)/mk/rte.vars.mk > > + > > +# binary name > > +APP = distributor_app > > + > > +# all source are stored in SRCS-y > > +SRCS-y := main.c > > + > > +CFLAGS += $(WERROR_FLAGS) > > + > > +# workaround for a gcc bug with noreturn attribute > > +# http://gcc.gnu.org/bugzilla/show_bug.cgi?id=12603 > > +ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y) > > +CFLAGS_main.o += -Wno-return-type > > +endif > > + > > +EXTRA_CFLAGS += -O3 -Wfatal-errors > > + > > +include $(RTE_SDK)/mk/rte.extapp.mk > > diff --git a/examples/distributor_app/main.c b/examples/distributor_app/main.c > > new file mode 100644 > > index 0000000..f555d93 > > --- /dev/null > > +++ b/examples/distributor_app/main.c > > @@ -0,0 +1,600 @@ > > +/*- > > + * BSD LICENSE > > + * > > + * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. > > + * All rights reserved. > > + * > > + * Redistribution and use in source and binary forms, with or without > > + * modification, are permitted provided that the following conditions > > + * are met: > > + * > > + * * Redistributions of source code must retain the above copyright > > + * notice, this list of conditions and the following disclaimer. > > + * * Redistributions in binary form must reproduce the above copyright > > + * notice, this list of conditions and the following disclaimer in > > + * the documentation and/or other materials provided with the > > + * distribution. > > + * * Neither the name of Intel Corporation nor the names of its > > + * contributors may be used to endorse or promote products derived > > + * from this software without specific prior written permission. > > + * > > + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS > > + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT > > + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR > > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT > > + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, > > + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT > > + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, > > + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY > > + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE > > + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. > > + */ > > + > > +#include <stdint.h> > > +#include <inttypes.h> > > +#include <unistd.h> > > +#include <signal.h> > > +#include <getopt.h> > > + > > +#include <rte_eal.h> > > +#include <rte_ethdev.h> > > +#include <rte_cycles.h> > > +#include <rte_malloc.h> > > +#include <rte_debug.h> > > +#include <rte_distributor.h> > > + > > +#include "main.h" > > + > > +#define RX_RING_SIZE 256 > > +#define RX_FREE_THRESH 32 > > +#define RX_PTHRESH 8 > > +#define RX_HTHRESH 8 > > +#define RX_WTHRESH 0 > > + > > +#define TX_RING_SIZE 512 > > +#define TX_FREE_THRESH 32 > > +#define TX_PTHRESH 32 > > +#define TX_HTHRESH 0 > > +#define TX_WTHRESH 0 > > +#define TX_RSBIT_THRESH 32 > > +#define TX_Q_FLAGS (ETH_TXQ_FLAGS_NOMULTSEGS | ETH_TXQ_FLAGS_NOVLANOFFL |\ > > + ETH_TXQ_FLAGS_NOXSUMSCTP | ETH_TXQ_FLAGS_NOXSUMUDP | \ > > + ETH_TXQ_FLAGS_NOXSUMTCP) > > + > > +#define NUM_MBUFS ((64*1024)-1) > > +#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM) > > +#define MBUF_CACHE_SIZE 250 > > +#define BURST_SIZE 32 > > +#define RTE_RING_SZ 1024 > > + > > +/* uncommnet below line to enable debug logs */ > > +/* #define DEBUG */ > > + > > +#ifdef DEBUG > > +#define LOG_LEVEL RTE_LOG_DEBUG > > +#define LOG_DEBUG(log_type, fmt, args...) do { \ > > + RTE_LOG(DEBUG, log_type, fmt, ##args) \ > > +} while (0) > > +#else > > +#define LOG_LEVEL RTE_LOG_INFO > > +#define LOG_DEBUG(log_type, fmt, args...) do {} while (0) > > +#endif > > + > > +#define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1 > > + > > +/* mask of enabled ports */ > > +static uint32_t enabled_port_mask = 0; > > + > > +static volatile struct app_stats { > > + struct { > > + uint64_t rx_pkts; > > + uint64_t returned_pkts; > > + uint64_t enqueued_pkts; > > + } rx __rte_cache_aligned; > > + > > + struct { > > + uint64_t dequeue_pkts; > > + uint64_t tx_pkts; > > + } tx __rte_cache_aligned; > > +} app_stats; > > + > > +static const struct rte_eth_conf port_conf_default = { > > + .rxmode = { > > + .mq_mode = ETH_MQ_RX_RSS, > > + .max_rx_pkt_len = ETHER_MAX_LEN, > > + .split_hdr_size = 0, > > + .header_split = 0, /**< Header Split disabled */ > > + .hw_ip_checksum = 0, /**< IP checksum offload enabled */ > > + .hw_vlan_filter = 0, /**< VLAN filtering disabled */ > > + .jumbo_frame = 0, /**< Jumbo Frame Support disabled */ > > + .hw_strip_crc = 0, /**< CRC stripped by hardware */ > > + }, > > + .txmode = { > > + .mq_mode = ETH_MQ_TX_NONE, > > + }, > > + .lpbk_mode = 0, > > + .rx_adv_conf = { > > + .rss_conf = { > > + .rss_hf = ETH_RSS_IPV4 | ETH_RSS_IPV6 | > > + ETH_RSS_IPV4_TCP | ETH_RSS_IPV4_UDP | > > + ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP, > > + } > > + }, > > +}; > > + > > +static const struct rte_eth_rxconf rx_conf_default = { > > + .rx_thresh = { > > + .pthresh = RX_PTHRESH, > > + .hthresh = RX_HTHRESH, > > + .wthresh = RX_WTHRESH, > > + }, > > + .rx_free_thresh = RX_FREE_THRESH, > > + .rx_drop_en = 0, > > +}; > > + > > +static const struct rte_eth_txconf tx_conf_default = { > > + .tx_thresh = { > > + .pthresh = TX_PTHRESH, > > + .hthresh = TX_HTHRESH, > > + .wthresh = TX_WTHRESH, > > + }, > > + .tx_free_thresh = TX_FREE_THRESH, > > + .tx_rs_thresh = TX_RSBIT_THRESH, > > + .txq_flags = TX_Q_FLAGS > > + > > +}; > > + > > +struct output_buffer { > > + unsigned count; > > + struct rte_mbuf *mbufs[BURST_SIZE]; > > +}; > > + > > +/* > > + * Initialises a given port using global settings and with the rx buffers > > + * coming from the mbuf_pool passed as parameter > > + */ > > +static inline int > > +port_init(uint8_t port, struct rte_mempool *mbuf_pool) > > +{ > > + struct rte_eth_conf port_conf = port_conf_default; > > + const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1; > > + int retval; > > + uint16_t q; > > + > > + if (port >= rte_eth_dev_count()) > > + return -1; > > + > > + retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf); > > + if (retval != 0) > > + return retval; > > + > > + for (q = 0; q < rxRings; q++) { > > + retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE, > > + rte_eth_dev_socket_id(port), > > + &rx_conf_default, mbuf_pool); > > + if (retval < 0) > > + return retval; > > + } > > + > > + for (q = 0; q < txRings; q++) { > > + retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE, > > + rte_eth_dev_socket_id(port), > > + &tx_conf_default); > > + if (retval < 0) > > + return retval; > > + } > > + > > + retval = rte_eth_dev_start(port); > > + if (retval < 0) > > + return retval; > > + > > + struct rte_eth_link link; > > + rte_eth_link_get_nowait(port, &link); > > + if (!link.link_status) { > > + sleep(1); > > + rte_eth_link_get_nowait(port, &link); > > + } > > + > > + if (!link.link_status) { > > + printf("Link down on port %"PRIu8"\n", port); > > + return 0; > > + } > > + > > + struct ether_addr addr; > > + rte_eth_macaddr_get(port, &addr); > > + printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 > > + " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", > > + (unsigned)port, > > + addr.addr_bytes[0], addr.addr_bytes[1], > > + addr.addr_bytes[2], addr.addr_bytes[3], > > + addr.addr_bytes[4], addr.addr_bytes[5]); > > + > > + rte_eth_promiscuous_enable(port); > > + > > + return 0; > > +} > > + > > +struct lcore_params { > > + unsigned worker_id; > > + struct rte_distributor *d; > > + struct rte_ring *r; > > +}; > > + > > +static __attribute__((noreturn)) void > > +lcore_rx(struct lcore_params *p) > > +{ > > + struct rte_distributor *d = p->d; > > + struct rte_ring *r = p->r; > > + const uint8_t nb_ports = rte_eth_dev_count(); > > + const int socket_id = rte_socket_id(); > > + uint8_t port; > > + > > + for (port = 0; port < nb_ports; port++) { > > + /* skip ports that are not enabled */ > > + if ((enabled_port_mask & (1 << port)) == 0) > > + continue; > > + > > + if (rte_eth_dev_socket_id(port) > 0 && > > + rte_eth_dev_socket_id(port) != socket_id) > > + printf("WARNING, port %u is on remote NUMA node to " > > + "RX thread.\n\tPerformance will not " > > + "be optimal.\n", port); > > + } > > + > > + printf("\nCore %u doing packet RX.\n", rte_lcore_id()); > > + port = 0; > > + for (;;) { > > + /* skip ports that are not enabled */ > > + if ((enabled_port_mask & (1 << port)) == 0) { > > + if (++port == nb_ports) > > + port = 0; > > + continue; > > + } > > + struct rte_mbuf *bufs[BURST_SIZE*2]; > > + const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, > > + BURST_SIZE); > > + app_stats.rx.rx_pkts += nb_rx; > > + > > + rte_distributor_process(d, bufs, nb_rx); > > + const uint16_t nb_ret = rte_distributor_returned_pkts(d, > > + bufs, BURST_SIZE*2); > > + app_stats.rx.returned_pkts += nb_ret; > > + if (unlikely(nb_ret == 0)) > > + continue; > > + > > + uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret); > > + app_stats.rx.enqueued_pkts += sent; > > + if (unlikely(sent < nb_ret)) { > > + LOG_DEBUG(DISTRAPP, "%s:Packet loss due to full ring\n", __func__); > > + while (sent < nb_ret) > > + rte_pktmbuf_free(bufs[sent++]); > > + } > > + if (++port == nb_ports) > > + port = 0; > > + } > > +} > > + > > +static inline void > > +flush_one_port(struct output_buffer *outbuf, uint8_t outp) > > +{ > > + unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs, > > + outbuf->count); > > + app_stats.tx.tx_pkts += nb_tx; > > + > > + if (unlikely(nb_tx < outbuf->count)) { > > + LOG_DEBUG(DISTRAPP, "%s:Packet loss with tx_burst\n", __func__); > > + do { > > + rte_pktmbuf_free(outbuf->mbufs[nb_tx]); > > + } while (++nb_tx < outbuf->count); > > + } > > + outbuf->count = 0; > > +} > > + > > +static inline void > > +flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports) > > +{ > > + uint8_t outp; > > + for (outp = 0; outp < nb_ports; outp++) { > > + /* skip ports that are not enabled */ > > + if ((enabled_port_mask & (1 << outp)) == 0) > > + continue; > > + > > + if (tx_buffers[outp].count == 0) > > + continue; > > + > > + flush_one_port(&tx_buffers[outp], outp); > > + } > > +} > > + > > +static __attribute__((noreturn)) void > > +lcore_tx(struct rte_ring *in_r) > > +{ > > + static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS]; > > + const uint8_t nb_ports = rte_eth_dev_count(); > > + const int socket_id = rte_socket_id(); > > + uint8_t port; > > + > > + for (port = 0; port < nb_ports; port++) { > > + /* skip ports that are not enabled */ > > + if ((enabled_port_mask & (1 << port)) == 0) > > + continue; > > + > > + if (rte_eth_dev_socket_id(port) > 0 && > > + rte_eth_dev_socket_id(port) != socket_id) > > + printf("WARNING, port %u is on remote NUMA node to " > > + "TX thread.\n\tPerformance will not " > > + "be optimal.\n", port); > > + } > > + > > + printf("\nCore %u doing packet TX.\n", rte_lcore_id()); > > + for (;;) { > > + for (port = 0; port < nb_ports; port++) { > > + /* skip ports that are not enabled */ > > + if ((enabled_port_mask & (1 << port)) == 0) > > + continue; > > + > > + struct rte_mbuf *bufs[BURST_SIZE]; > > + const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, > > + (void *)bufs, BURST_SIZE); > > + app_stats.tx.dequeue_pkts += nb_rx; > > + > > + /* if we get no traffic, flush anything we have */ > > + if (unlikely(nb_rx == 0)) { > > + flush_all_ports(tx_buffers, nb_ports); > > + continue; > > + } > > + > > + /* for traffic we receive, queue it up for transmit */ > > + uint16_t i; > > + _mm_prefetch(bufs[0], 0); > > + _mm_prefetch(bufs[1], 0); > > + _mm_prefetch(bufs[2], 0); > > + for (i = 0; i < nb_rx; i++) { > > + struct output_buffer *outbuf; > > + uint8_t outp; > > + _mm_prefetch(bufs[i + 3], 0); > > + /* workers should update in_port to hold the > > + * output port value */ > > + outp = bufs[i]->port; > > + /* skip ports that are not enabled */ > > + if ((enabled_port_mask & (1 << outp)) == 0) > > + continue; > > + > > + outbuf = &tx_buffers[outp]; > > + outbuf->mbufs[outbuf->count++] = bufs[i]; > > + if (outbuf->count == BURST_SIZE) > > + flush_one_port(outbuf, outp); > > + } > > + } > > + } > > +} > > + > > + > > +static __attribute__((noreturn)) void > > +lcore_worker(struct lcore_params *p) > > +{ > > + struct rte_distributor *d = p->d; > > + const unsigned id = p->worker_id; > > + /* for single port, xor_val will be zero so we won't modify the output > > + * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa > > + */ > > + const unsigned xor_val = (rte_eth_dev_count() > 1); > > + struct rte_mbuf *buf = NULL; > > + > > + printf("\nCore %u acting as worker core.\n", rte_lcore_id()); > > + for (;;) { > > + buf = rte_distributor_get_pkt(d, id, buf); > > + buf->port ^= xor_val; > > + } > > +} > > + > > +static void > > +int_handler(int sig_num) > > +{ > > + struct rte_eth_stats eth_stats; > > + unsigned i; > > + > > + printf("Exiting on signal %d\n", sig_num); > > + > > + printf("\nRX thread stats:\n"); > > + printf(" - Received: %"PRIu64"\n", app_stats.rx.rx_pkts); > > + printf(" - Processed: %"PRIu64"\n", app_stats.rx.returned_pkts); > > + printf(" - Enqueued: %"PRIu64"\n", app_stats.rx.enqueued_pkts); > > + > > + printf("\nTX thread stats:\n"); > > + printf(" - Dequeued: %"PRIu64"\n", app_stats.tx.dequeue_pkts); > > + printf(" - Transmitted: %"PRIu64"\n", app_stats.tx.tx_pkts); > > + > > + for (i = 0; i < rte_eth_dev_count(); i++) { > > + rte_eth_stats_get(i, ð_stats); > > + printf("\nPort %u stats:\n", i); > > + printf(" - Pkts in: %"PRIu64"\n", eth_stats.ipackets); > > + printf(" - Pkts out: %"PRIu64"\n", eth_stats.opackets); > > + printf(" - In Errs: %"PRIu64"\n", eth_stats.ierrors); > > + printf(" - Out Errs: %"PRIu64"\n", eth_stats.oerrors); > > + printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf); > > + } > > + exit(0); > rte_exit here? Also, this is a pretty ungraceful exit strategy as all the > threads you've created and memory you've allocated are just forgotten here. > Given that dpdk mempools are shared, this has the potential to leak lots of > memory if other apps are using the dpdk at the same time that you run this. You > probably want to use the sigint handler to raise a flag to the tx/rx threads to > shutdown gracefully, and then free your allocated memory and mempool. > > Neil > Unless the different processes are explicitly cooperating as primary/secondary, the mempools are not shared. I just don't see the need for this app to do more cleanup on ctrl-c signal, as it's not intended to be a multiprocess app, and there is little that any secondary process could do to work with this app, except possibly some resource monitoring, which would be completely unaffected by it exiting the way it does. /Bruce
On Tue, Sep 30, 2014 at 01:18:28PM +0100, Bruce Richardson wrote: > On Tue, Sep 30, 2014 at 07:34:45AM -0400, Neil Horman wrote: > > On Tue, Sep 30, 2014 at 11:39:37AM +0100, reshmapa wrote: > > > From: Reshma Pattan <reshma.pattan@intel.com> > > > > > > A new sample app that shows the usage of the distributor library. This > > > app works as follows: > > > > > > * An RX thread runs which pulls packets from each ethernet port in turn > > > and passes those packets to worker using a distributor component. > > > * The workers take the packets in turn, and determine the output port > > > for those packets using basic l2forwarding doing an xor on the source > > > port id. > > > * The RX thread takes the returned packets from the workers and enqueue > > > those packets into an rte_ring structure. > > > * A TX thread pulls the packets off the rte_ring structure and then > > > sends each packet out the output port specified previously by the worker > > > * Command-line option support provided only for portmask. > > > > > > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com> > > > Signed-off-by: Reshma Pattan <reshma.pattan@intel.com> > > > --- > > > examples/Makefile | 1 + > > > examples/distributor_app/Makefile | 57 ++++ > > > examples/distributor_app/main.c | 600 +++++++++++++++++++++++++++++++++++++ > > > examples/distributor_app/main.h | 46 +++ > > > 4 files changed, 704 insertions(+), 0 deletions(-) > > > create mode 100644 examples/distributor_app/Makefile > > > create mode 100644 examples/distributor_app/main.c > > > create mode 100644 examples/distributor_app/main.h > > > > > > diff --git a/examples/Makefile b/examples/Makefile > > > index 6245f83..2ba82b0 100644 > > > --- a/examples/Makefile > > > +++ b/examples/Makefile > > > @@ -66,5 +66,6 @@ DIRS-y += vhost > > > DIRS-$(CONFIG_RTE_LIBRTE_XEN_DOM0) += vhost_xen > > > DIRS-y += vmdq > > > DIRS-y += vmdq_dcb > > > +DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += distributor_app > > > > > > include $(RTE_SDK)/mk/rte.extsubdir.mk > > > diff --git a/examples/distributor_app/Makefile b/examples/distributor_app/Makefile > > > new file mode 100644 > > > index 0000000..6a5bada > > > --- /dev/null > > > +++ b/examples/distributor_app/Makefile > > > @@ -0,0 +1,57 @@ > > > +# BSD LICENSE > > > +# > > > +# Copyright(c) 2010-2014 Intel Corporation. All rights reserved. > > > +# All rights reserved. > > > +# > > > +# Redistribution and use in source and binary forms, with or without > > > +# modification, are permitted provided that the following conditions > > > +# are met: > > > +# > > > +# * Redistributions of source code must retain the above copyright > > > +# notice, this list of conditions and the following disclaimer. > > > +# * Redistributions in binary form must reproduce the above copyright > > > +# notice, this list of conditions and the following disclaimer in > > > +# the documentation and/or other materials provided with the > > > +# distribution. > > > +# * Neither the name of Intel Corporation nor the names of its > > > +# contributors may be used to endorse or promote products derived > > > +# from this software without specific prior written permission. > > > +# > > > +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS > > > +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT > > > +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR > > > +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT > > > +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, > > > +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT > > > +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, > > > +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY > > > +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > > > +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE > > > +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. > > > + > > > +ifeq ($(RTE_SDK),) > > > +$(error "Please define RTE_SDK environment variable") > > > +endif > > > + > > > +# Default target, can be overriden by command line or environment > > > +RTE_TARGET ?= x86_64-native-linuxapp-gcc > > > + > > > +include $(RTE_SDK)/mk/rte.vars.mk > > > + > > > +# binary name > > > +APP = distributor_app > > > + > > > +# all source are stored in SRCS-y > > > +SRCS-y := main.c > > > + > > > +CFLAGS += $(WERROR_FLAGS) > > > + > > > +# workaround for a gcc bug with noreturn attribute > > > +# http://gcc.gnu.org/bugzilla/show_bug.cgi?id=12603 > > > +ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y) > > > +CFLAGS_main.o += -Wno-return-type > > > +endif > > > + > > > +EXTRA_CFLAGS += -O3 -Wfatal-errors > > > + > > > +include $(RTE_SDK)/mk/rte.extapp.mk > > > diff --git a/examples/distributor_app/main.c b/examples/distributor_app/main.c > > > new file mode 100644 > > > index 0000000..f555d93 > > > --- /dev/null > > > +++ b/examples/distributor_app/main.c > > > @@ -0,0 +1,600 @@ > > > +/*- > > > + * BSD LICENSE > > > + * > > > + * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. > > > + * All rights reserved. > > > + * > > > + * Redistribution and use in source and binary forms, with or without > > > + * modification, are permitted provided that the following conditions > > > + * are met: > > > + * > > > + * * Redistributions of source code must retain the above copyright > > > + * notice, this list of conditions and the following disclaimer. > > > + * * Redistributions in binary form must reproduce the above copyright > > > + * notice, this list of conditions and the following disclaimer in > > > + * the documentation and/or other materials provided with the > > > + * distribution. > > > + * * Neither the name of Intel Corporation nor the names of its > > > + * contributors may be used to endorse or promote products derived > > > + * from this software without specific prior written permission. > > > + * > > > + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS > > > + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT > > > + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR > > > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT > > > + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, > > > + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT > > > + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, > > > + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY > > > + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > > > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE > > > + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. > > > + */ > > > + > > > +#include <stdint.h> > > > +#include <inttypes.h> > > > +#include <unistd.h> > > > +#include <signal.h> > > > +#include <getopt.h> > > > + > > > +#include <rte_eal.h> > > > +#include <rte_ethdev.h> > > > +#include <rte_cycles.h> > > > +#include <rte_malloc.h> > > > +#include <rte_debug.h> > > > +#include <rte_distributor.h> > > > + > > > +#include "main.h" > > > + > > > +#define RX_RING_SIZE 256 > > > +#define RX_FREE_THRESH 32 > > > +#define RX_PTHRESH 8 > > > +#define RX_HTHRESH 8 > > > +#define RX_WTHRESH 0 > > > + > > > +#define TX_RING_SIZE 512 > > > +#define TX_FREE_THRESH 32 > > > +#define TX_PTHRESH 32 > > > +#define TX_HTHRESH 0 > > > +#define TX_WTHRESH 0 > > > +#define TX_RSBIT_THRESH 32 > > > +#define TX_Q_FLAGS (ETH_TXQ_FLAGS_NOMULTSEGS | ETH_TXQ_FLAGS_NOVLANOFFL |\ > > > + ETH_TXQ_FLAGS_NOXSUMSCTP | ETH_TXQ_FLAGS_NOXSUMUDP | \ > > > + ETH_TXQ_FLAGS_NOXSUMTCP) > > > + > > > +#define NUM_MBUFS ((64*1024)-1) > > > +#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM) > > > +#define MBUF_CACHE_SIZE 250 > > > +#define BURST_SIZE 32 > > > +#define RTE_RING_SZ 1024 > > > + > > > +/* uncommnet below line to enable debug logs */ > > > +/* #define DEBUG */ > > > + > > > +#ifdef DEBUG > > > +#define LOG_LEVEL RTE_LOG_DEBUG > > > +#define LOG_DEBUG(log_type, fmt, args...) do { \ > > > + RTE_LOG(DEBUG, log_type, fmt, ##args) \ > > > +} while (0) > > > +#else > > > +#define LOG_LEVEL RTE_LOG_INFO > > > +#define LOG_DEBUG(log_type, fmt, args...) do {} while (0) > > > +#endif > > > + > > > +#define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1 > > > + > > > +/* mask of enabled ports */ > > > +static uint32_t enabled_port_mask = 0; > > > + > > > +static volatile struct app_stats { > > > + struct { > > > + uint64_t rx_pkts; > > > + uint64_t returned_pkts; > > > + uint64_t enqueued_pkts; > > > + } rx __rte_cache_aligned; > > > + > > > + struct { > > > + uint64_t dequeue_pkts; > > > + uint64_t tx_pkts; > > > + } tx __rte_cache_aligned; > > > +} app_stats; > > > + > > > +static const struct rte_eth_conf port_conf_default = { > > > + .rxmode = { > > > + .mq_mode = ETH_MQ_RX_RSS, > > > + .max_rx_pkt_len = ETHER_MAX_LEN, > > > + .split_hdr_size = 0, > > > + .header_split = 0, /**< Header Split disabled */ > > > + .hw_ip_checksum = 0, /**< IP checksum offload enabled */ > > > + .hw_vlan_filter = 0, /**< VLAN filtering disabled */ > > > + .jumbo_frame = 0, /**< Jumbo Frame Support disabled */ > > > + .hw_strip_crc = 0, /**< CRC stripped by hardware */ > > > + }, > > > + .txmode = { > > > + .mq_mode = ETH_MQ_TX_NONE, > > > + }, > > > + .lpbk_mode = 0, > > > + .rx_adv_conf = { > > > + .rss_conf = { > > > + .rss_hf = ETH_RSS_IPV4 | ETH_RSS_IPV6 | > > > + ETH_RSS_IPV4_TCP | ETH_RSS_IPV4_UDP | > > > + ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP, > > > + } > > > + }, > > > +}; > > > + > > > +static const struct rte_eth_rxconf rx_conf_default = { > > > + .rx_thresh = { > > > + .pthresh = RX_PTHRESH, > > > + .hthresh = RX_HTHRESH, > > > + .wthresh = RX_WTHRESH, > > > + }, > > > + .rx_free_thresh = RX_FREE_THRESH, > > > + .rx_drop_en = 0, > > > +}; > > > + > > > +static const struct rte_eth_txconf tx_conf_default = { > > > + .tx_thresh = { > > > + .pthresh = TX_PTHRESH, > > > + .hthresh = TX_HTHRESH, > > > + .wthresh = TX_WTHRESH, > > > + }, > > > + .tx_free_thresh = TX_FREE_THRESH, > > > + .tx_rs_thresh = TX_RSBIT_THRESH, > > > + .txq_flags = TX_Q_FLAGS > > > + > > > +}; > > > + > > > +struct output_buffer { > > > + unsigned count; > > > + struct rte_mbuf *mbufs[BURST_SIZE]; > > > +}; > > > + > > > +/* > > > + * Initialises a given port using global settings and with the rx buffers > > > + * coming from the mbuf_pool passed as parameter > > > + */ > > > +static inline int > > > +port_init(uint8_t port, struct rte_mempool *mbuf_pool) > > > +{ > > > + struct rte_eth_conf port_conf = port_conf_default; > > > + const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1; > > > + int retval; > > > + uint16_t q; > > > + > > > + if (port >= rte_eth_dev_count()) > > > + return -1; > > > + > > > + retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf); > > > + if (retval != 0) > > > + return retval; > > > + > > > + for (q = 0; q < rxRings; q++) { > > > + retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE, > > > + rte_eth_dev_socket_id(port), > > > + &rx_conf_default, mbuf_pool); > > > + if (retval < 0) > > > + return retval; > > > + } > > > + > > > + for (q = 0; q < txRings; q++) { > > > + retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE, > > > + rte_eth_dev_socket_id(port), > > > + &tx_conf_default); > > > + if (retval < 0) > > > + return retval; > > > + } > > > + > > > + retval = rte_eth_dev_start(port); > > > + if (retval < 0) > > > + return retval; > > > + > > > + struct rte_eth_link link; > > > + rte_eth_link_get_nowait(port, &link); > > > + if (!link.link_status) { > > > + sleep(1); > > > + rte_eth_link_get_nowait(port, &link); > > > + } > > > + > > > + if (!link.link_status) { > > > + printf("Link down on port %"PRIu8"\n", port); > > > + return 0; > > > + } > > > + > > > + struct ether_addr addr; > > > + rte_eth_macaddr_get(port, &addr); > > > + printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 > > > + " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", > > > + (unsigned)port, > > > + addr.addr_bytes[0], addr.addr_bytes[1], > > > + addr.addr_bytes[2], addr.addr_bytes[3], > > > + addr.addr_bytes[4], addr.addr_bytes[5]); > > > + > > > + rte_eth_promiscuous_enable(port); > > > + > > > + return 0; > > > +} > > > + > > > +struct lcore_params { > > > + unsigned worker_id; > > > + struct rte_distributor *d; > > > + struct rte_ring *r; > > > +}; > > > + > > > +static __attribute__((noreturn)) void > > > +lcore_rx(struct lcore_params *p) > > > +{ > > > + struct rte_distributor *d = p->d; > > > + struct rte_ring *r = p->r; > > > + const uint8_t nb_ports = rte_eth_dev_count(); > > > + const int socket_id = rte_socket_id(); > > > + uint8_t port; > > > + > > > + for (port = 0; port < nb_ports; port++) { > > > + /* skip ports that are not enabled */ > > > + if ((enabled_port_mask & (1 << port)) == 0) > > > + continue; > > > + > > > + if (rte_eth_dev_socket_id(port) > 0 && > > > + rte_eth_dev_socket_id(port) != socket_id) > > > + printf("WARNING, port %u is on remote NUMA node to " > > > + "RX thread.\n\tPerformance will not " > > > + "be optimal.\n", port); > > > + } > > > + > > > + printf("\nCore %u doing packet RX.\n", rte_lcore_id()); > > > + port = 0; > > > + for (;;) { > > > + /* skip ports that are not enabled */ > > > + if ((enabled_port_mask & (1 << port)) == 0) { > > > + if (++port == nb_ports) > > > + port = 0; > > > + continue; > > > + } > > > + struct rte_mbuf *bufs[BURST_SIZE*2]; > > > + const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, > > > + BURST_SIZE); > > > + app_stats.rx.rx_pkts += nb_rx; > > > + > > > + rte_distributor_process(d, bufs, nb_rx); > > > + const uint16_t nb_ret = rte_distributor_returned_pkts(d, > > > + bufs, BURST_SIZE*2); > > > + app_stats.rx.returned_pkts += nb_ret; > > > + if (unlikely(nb_ret == 0)) > > > + continue; > > > + > > > + uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret); > > > + app_stats.rx.enqueued_pkts += sent; > > > + if (unlikely(sent < nb_ret)) { > > > + LOG_DEBUG(DISTRAPP, "%s:Packet loss due to full ring\n", __func__); > > > + while (sent < nb_ret) > > > + rte_pktmbuf_free(bufs[sent++]); > > > + } > > > + if (++port == nb_ports) > > > + port = 0; > > > + } > > > +} > > > + > > > +static inline void > > > +flush_one_port(struct output_buffer *outbuf, uint8_t outp) > > > +{ > > > + unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs, > > > + outbuf->count); > > > + app_stats.tx.tx_pkts += nb_tx; > > > + > > > + if (unlikely(nb_tx < outbuf->count)) { > > > + LOG_DEBUG(DISTRAPP, "%s:Packet loss with tx_burst\n", __func__); > > > + do { > > > + rte_pktmbuf_free(outbuf->mbufs[nb_tx]); > > > + } while (++nb_tx < outbuf->count); > > > + } > > > + outbuf->count = 0; > > > +} > > > + > > > +static inline void > > > +flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports) > > > +{ > > > + uint8_t outp; > > > + for (outp = 0; outp < nb_ports; outp++) { > > > + /* skip ports that are not enabled */ > > > + if ((enabled_port_mask & (1 << outp)) == 0) > > > + continue; > > > + > > > + if (tx_buffers[outp].count == 0) > > > + continue; > > > + > > > + flush_one_port(&tx_buffers[outp], outp); > > > + } > > > +} > > > + > > > +static __attribute__((noreturn)) void > > > +lcore_tx(struct rte_ring *in_r) > > > +{ > > > + static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS]; > > > + const uint8_t nb_ports = rte_eth_dev_count(); > > > + const int socket_id = rte_socket_id(); > > > + uint8_t port; > > > + > > > + for (port = 0; port < nb_ports; port++) { > > > + /* skip ports that are not enabled */ > > > + if ((enabled_port_mask & (1 << port)) == 0) > > > + continue; > > > + > > > + if (rte_eth_dev_socket_id(port) > 0 && > > > + rte_eth_dev_socket_id(port) != socket_id) > > > + printf("WARNING, port %u is on remote NUMA node to " > > > + "TX thread.\n\tPerformance will not " > > > + "be optimal.\n", port); > > > + } > > > + > > > + printf("\nCore %u doing packet TX.\n", rte_lcore_id()); > > > + for (;;) { > > > + for (port = 0; port < nb_ports; port++) { > > > + /* skip ports that are not enabled */ > > > + if ((enabled_port_mask & (1 << port)) == 0) > > > + continue; > > > + > > > + struct rte_mbuf *bufs[BURST_SIZE]; > > > + const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, > > > + (void *)bufs, BURST_SIZE); > > > + app_stats.tx.dequeue_pkts += nb_rx; > > > + > > > + /* if we get no traffic, flush anything we have */ > > > + if (unlikely(nb_rx == 0)) { > > > + flush_all_ports(tx_buffers, nb_ports); > > > + continue; > > > + } > > > + > > > + /* for traffic we receive, queue it up for transmit */ > > > + uint16_t i; > > > + _mm_prefetch(bufs[0], 0); > > > + _mm_prefetch(bufs[1], 0); > > > + _mm_prefetch(bufs[2], 0); > > > + for (i = 0; i < nb_rx; i++) { > > > + struct output_buffer *outbuf; > > > + uint8_t outp; > > > + _mm_prefetch(bufs[i + 3], 0); > > > + /* workers should update in_port to hold the > > > + * output port value */ > > > + outp = bufs[i]->port; > > > + /* skip ports that are not enabled */ > > > + if ((enabled_port_mask & (1 << outp)) == 0) > > > + continue; > > > + > > > + outbuf = &tx_buffers[outp]; > > > + outbuf->mbufs[outbuf->count++] = bufs[i]; > > > + if (outbuf->count == BURST_SIZE) > > > + flush_one_port(outbuf, outp); > > > + } > > > + } > > > + } > > > +} > > > + > > > + > > > +static __attribute__((noreturn)) void > > > +lcore_worker(struct lcore_params *p) > > > +{ > > > + struct rte_distributor *d = p->d; > > > + const unsigned id = p->worker_id; > > > + /* for single port, xor_val will be zero so we won't modify the output > > > + * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa > > > + */ > > > + const unsigned xor_val = (rte_eth_dev_count() > 1); > > > + struct rte_mbuf *buf = NULL; > > > + > > > + printf("\nCore %u acting as worker core.\n", rte_lcore_id()); > > > + for (;;) { > > > + buf = rte_distributor_get_pkt(d, id, buf); > > > + buf->port ^= xor_val; > > > + } > > > +} > > > + > > > +static void > > > +int_handler(int sig_num) > > > +{ > > > + struct rte_eth_stats eth_stats; > > > + unsigned i; > > > + > > > + printf("Exiting on signal %d\n", sig_num); > > > + > > > + printf("\nRX thread stats:\n"); > > > + printf(" - Received: %"PRIu64"\n", app_stats.rx.rx_pkts); > > > + printf(" - Processed: %"PRIu64"\n", app_stats.rx.returned_pkts); > > > + printf(" - Enqueued: %"PRIu64"\n", app_stats.rx.enqueued_pkts); > > > + > > > + printf("\nTX thread stats:\n"); > > > + printf(" - Dequeued: %"PRIu64"\n", app_stats.tx.dequeue_pkts); > > > + printf(" - Transmitted: %"PRIu64"\n", app_stats.tx.tx_pkts); > > > + > > > + for (i = 0; i < rte_eth_dev_count(); i++) { > > > + rte_eth_stats_get(i, ð_stats); > > > + printf("\nPort %u stats:\n", i); > > > + printf(" - Pkts in: %"PRIu64"\n", eth_stats.ipackets); > > > + printf(" - Pkts out: %"PRIu64"\n", eth_stats.opackets); > > > + printf(" - In Errs: %"PRIu64"\n", eth_stats.ierrors); > > > + printf(" - Out Errs: %"PRIu64"\n", eth_stats.oerrors); > > > + printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf); > > > + } > > > + exit(0); > > rte_exit here? Also, this is a pretty ungraceful exit strategy as all the > > threads you've created and memory you've allocated are just forgotten here. > > Given that dpdk mempools are shared, this has the potential to leak lots of > > memory if other apps are using the dpdk at the same time that you run this. You > > probably want to use the sigint handler to raise a flag to the tx/rx threads to > > shutdown gracefully, and then free your allocated memory and mempool. > > > > Neil > > > > Unless the different processes are explicitly cooperating as > primary/secondary, the mempools are not shared. I just don't see the need > for this app to do more cleanup on ctrl-c signal, as it's not intended to be > a multiprocess app, and there is little that any secondary process could do > to work with this app, except possibly some resource monitoring, which would > be completely unaffected by it exiting the way it does. > Ah, ok, so we don't use a common shared pool between isolated processes then, thats good. Still though, this is a sample application, I think its lazy programming practice to illustrate to application developers that its generally ok to exit programs without freeing your resources. Its about 20 lines of additional code to change the sigint handler to flag an exit condition, and have all the other threads join on it. Neil > /Bruce >
> -----Original Message----- > From: Neil Horman [mailto:nhorman@tuxdriver.com] > Sent: Tuesday, September 30, 2014 2:40 PM > To: Richardson, Bruce > Cc: Pattan, Reshma; dev@dpdk.org > Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app > > On Tue, Sep 30, 2014 at 01:18:28PM +0100, Bruce Richardson wrote: > > On Tue, Sep 30, 2014 at 07:34:45AM -0400, Neil Horman wrote: > > > On Tue, Sep 30, 2014 at 11:39:37AM +0100, reshmapa wrote: > > > > From: Reshma Pattan <reshma.pattan@intel.com> > > > > > > > > A new sample app that shows the usage of the distributor library. > > > > This app works as follows: > > > > > > > > * An RX thread runs which pulls packets from each ethernet port in turn > > > > and passes those packets to worker using a distributor component. > > > > * The workers take the packets in turn, and determine the output port > > > > for those packets using basic l2forwarding doing an xor on the source > > > > port id. > > > > * The RX thread takes the returned packets from the workers and enqueue > > > > those packets into an rte_ring structure. > > > > * A TX thread pulls the packets off the rte_ring structure and then > > > > sends each packet out the output port specified previously by > > > > the worker > > > > * Command-line option support provided only for portmask. > > > > > > > > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com> > > > > Signed-off-by: Reshma Pattan <reshma.pattan@intel.com> > > > > --- > > > > examples/Makefile | 1 + > > > > examples/distributor_app/Makefile | 57 ++++ > > > > examples/distributor_app/main.c | 600 > +++++++++++++++++++++++++++++++++++++ > > > > examples/distributor_app/main.h | 46 +++ > > > > 4 files changed, 704 insertions(+), 0 deletions(-) create mode > > > > 100644 examples/distributor_app/Makefile create mode 100644 > > > > examples/distributor_app/main.c create mode 100644 > > > > examples/distributor_app/main.h > > > > > > > > diff --git a/examples/Makefile b/examples/Makefile index > > > > 6245f83..2ba82b0 100644 > > > > --- a/examples/Makefile > > > > +++ b/examples/Makefile > > > > @@ -66,5 +66,6 @@ DIRS-y += vhost > > > > DIRS-$(CONFIG_RTE_LIBRTE_XEN_DOM0) += vhost_xen DIRS-y += vmdq > > > > DIRS-y += vmdq_dcb > > > > +DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += distributor_app > > > > > > > > include $(RTE_SDK)/mk/rte.extsubdir.mk diff --git > > > > a/examples/distributor_app/Makefile > > > > b/examples/distributor_app/Makefile > > > > new file mode 100644 > > > > index 0000000..6a5bada > > > > --- /dev/null > > > > +++ b/examples/distributor_app/Makefile > > > > @@ -0,0 +1,57 @@ > > > > +# BSD LICENSE > > > > +# > > > > +# Copyright(c) 2010-2014 Intel Corporation. All rights reserved. > > > > +# All rights reserved. > > > > +# > > > > +# Redistribution and use in source and binary forms, with or without > > > > +# modification, are permitted provided that the following conditions > > > > +# are met: > > > > +# > > > > +# * Redistributions of source code must retain the above copyright > > > > +# notice, this list of conditions and the following disclaimer. > > > > +# * Redistributions in binary form must reproduce the above copyright > > > > +# notice, this list of conditions and the following disclaimer in > > > > +# the documentation and/or other materials provided with the > > > > +# distribution. > > > > +# * Neither the name of Intel Corporation nor the names of its > > > > +# contributors may be used to endorse or promote products derived > > > > +# from this software without specific prior written permission. > > > > +# > > > > +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND > CONTRIBUTORS > > > > +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT > NOT > > > > +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND > FITNESS FOR > > > > +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE > COPYRIGHT > > > > +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, > INCIDENTAL, > > > > +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, > BUT NOT > > > > +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; > LOSS OF USE, > > > > +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED > AND ON ANY > > > > +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR > TORT > > > > +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT > OF THE USE > > > > +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH > DAMAGE. > > > > + > > > > +ifeq ($(RTE_SDK),) > > > > +$(error "Please define RTE_SDK environment variable") endif > > > > + > > > > +# Default target, can be overriden by command line or environment > > > > +RTE_TARGET ?= x86_64-native-linuxapp-gcc > > > > + > > > > +include $(RTE_SDK)/mk/rte.vars.mk > > > > + > > > > +# binary name > > > > +APP = distributor_app > > > > + > > > > +# all source are stored in SRCS-y SRCS-y := main.c > > > > + > > > > +CFLAGS += $(WERROR_FLAGS) > > > > + > > > > +# workaround for a gcc bug with noreturn attribute # > > > > +http://gcc.gnu.org/bugzilla/show_bug.cgi?id=12603 > > > > +ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y) CFLAGS_main.o += > > > > +-Wno-return-type endif > > > > + > > > > +EXTRA_CFLAGS += -O3 -Wfatal-errors > > > > + > > > > +include $(RTE_SDK)/mk/rte.extapp.mk > > > > diff --git a/examples/distributor_app/main.c > > > > b/examples/distributor_app/main.c new file mode 100644 index > > > > 0000000..f555d93 > > > > --- /dev/null > > > > +++ b/examples/distributor_app/main.c > > > > @@ -0,0 +1,600 @@ > > > > +/*- > > > > + * BSD LICENSE > > > > + * > > > > + * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. > > > > + * All rights reserved. > > > > + * > > > > + * Redistribution and use in source and binary forms, with or without > > > > + * modification, are permitted provided that the following conditions > > > > + * are met: > > > > + * > > > > + * * Redistributions of source code must retain the above copyright > > > > + * notice, this list of conditions and the following disclaimer. > > > > + * * Redistributions in binary form must reproduce the above copyright > > > > + * notice, this list of conditions and the following disclaimer in > > > > + * the documentation and/or other materials provided with the > > > > + * distribution. > > > > + * * Neither the name of Intel Corporation nor the names of its > > > > + * contributors may be used to endorse or promote products derived > > > > + * from this software without specific prior written permission. > > > > + * > > > > + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND > CONTRIBUTORS > > > > + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, > BUT NOT > > > > + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND > FITNESS FOR > > > > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE > COPYRIGHT > > > > + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, > INCIDENTAL, > > > > + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, > BUT NOT > > > > + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; > LOSS OF USE, > > > > + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED > AND ON ANY > > > > + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR > TORT > > > > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT > OF THE USE > > > > + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH > DAMAGE. > > > > + */ > > > > + > > > > +#include <stdint.h> > > > > +#include <inttypes.h> > > > > +#include <unistd.h> > > > > +#include <signal.h> > > > > +#include <getopt.h> > > > > + > > > > +#include <rte_eal.h> > > > > +#include <rte_ethdev.h> > > > > +#include <rte_cycles.h> > > > > +#include <rte_malloc.h> > > > > +#include <rte_debug.h> > > > > +#include <rte_distributor.h> > > > > + > > > > +#include "main.h" > > > > + > > > > +#define RX_RING_SIZE 256 > > > > +#define RX_FREE_THRESH 32 > > > > +#define RX_PTHRESH 8 > > > > +#define RX_HTHRESH 8 > > > > +#define RX_WTHRESH 0 > > > > + > > > > +#define TX_RING_SIZE 512 > > > > +#define TX_FREE_THRESH 32 > > > > +#define TX_PTHRESH 32 > > > > +#define TX_HTHRESH 0 > > > > +#define TX_WTHRESH 0 > > > > +#define TX_RSBIT_THRESH 32 > > > > +#define TX_Q_FLAGS (ETH_TXQ_FLAGS_NOMULTSEGS | > ETH_TXQ_FLAGS_NOVLANOFFL |\ > > > > + ETH_TXQ_FLAGS_NOXSUMSCTP | ETH_TXQ_FLAGS_NOXSUMUDP | \ > > > > + ETH_TXQ_FLAGS_NOXSUMTCP) > > > > + > > > > +#define NUM_MBUFS ((64*1024)-1) > > > > +#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + > > > > +RTE_PKTMBUF_HEADROOM) #define MBUF_CACHE_SIZE 250 #define > > > > +BURST_SIZE 32 #define RTE_RING_SZ 1024 > > > > + > > > > +/* uncommnet below line to enable debug logs */ > > > > +/* #define DEBUG */ > > > > + > > > > +#ifdef DEBUG > > > > +#define LOG_LEVEL RTE_LOG_DEBUG > > > > +#define LOG_DEBUG(log_type, fmt, args...) do { \ > > > > + RTE_LOG(DEBUG, log_type, fmt, ##args) \ > > > > +} while (0) > > > > +#else > > > > +#define LOG_LEVEL RTE_LOG_INFO > > > > +#define LOG_DEBUG(log_type, fmt, args...) do {} while (0) #endif > > > > + > > > > +#define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1 > > > > + > > > > +/* mask of enabled ports */ > > > > +static uint32_t enabled_port_mask = 0; > > > > + > > > > +static volatile struct app_stats { > > > > + struct { > > > > + uint64_t rx_pkts; > > > > + uint64_t returned_pkts; > > > > + uint64_t enqueued_pkts; > > > > + } rx __rte_cache_aligned; > > > > + > > > > + struct { > > > > + uint64_t dequeue_pkts; > > > > + uint64_t tx_pkts; > > > > + } tx __rte_cache_aligned; > > > > +} app_stats; > > > > + > > > > +static const struct rte_eth_conf port_conf_default = { > > > > + .rxmode = { > > > > + .mq_mode = ETH_MQ_RX_RSS, > > > > + .max_rx_pkt_len = ETHER_MAX_LEN, > > > > + .split_hdr_size = 0, > > > > + .header_split = 0, /**< Header Split disabled */ > > > > + .hw_ip_checksum = 0, /**< IP checksum offload enabled */ > > > > + .hw_vlan_filter = 0, /**< VLAN filtering disabled */ > > > > + .jumbo_frame = 0, /**< Jumbo Frame Support disabled */ > > > > + .hw_strip_crc = 0, /**< CRC stripped by hardware */ > > > > + }, > > > > + .txmode = { > > > > + .mq_mode = ETH_MQ_TX_NONE, > > > > + }, > > > > + .lpbk_mode = 0, > > > > + .rx_adv_conf = { > > > > + .rss_conf = { > > > > + .rss_hf = ETH_RSS_IPV4 | ETH_RSS_IPV6 | > > > > + ETH_RSS_IPV4_TCP | > ETH_RSS_IPV4_UDP | > > > > + ETH_RSS_IPV6_TCP | > ETH_RSS_IPV6_UDP, > > > > + } > > > > + }, > > > > +}; > > > > + > > > > +static const struct rte_eth_rxconf rx_conf_default = { > > > > + .rx_thresh = { > > > > + .pthresh = RX_PTHRESH, > > > > + .hthresh = RX_HTHRESH, > > > > + .wthresh = RX_WTHRESH, > > > > + }, > > > > + .rx_free_thresh = RX_FREE_THRESH, > > > > + .rx_drop_en = 0, > > > > +}; > > > > + > > > > +static const struct rte_eth_txconf tx_conf_default = { > > > > + .tx_thresh = { > > > > + .pthresh = TX_PTHRESH, > > > > + .hthresh = TX_HTHRESH, > > > > + .wthresh = TX_WTHRESH, > > > > + }, > > > > + .tx_free_thresh = TX_FREE_THRESH, > > > > + .tx_rs_thresh = TX_RSBIT_THRESH, > > > > + .txq_flags = TX_Q_FLAGS > > > > + > > > > +}; > > > > + > > > > +struct output_buffer { > > > > + unsigned count; > > > > + struct rte_mbuf *mbufs[BURST_SIZE]; }; > > > > + > > > > +/* > > > > + * Initialises a given port using global settings and with the rx > > > > +buffers > > > > + * coming from the mbuf_pool passed as parameter */ static > > > > +inline int port_init(uint8_t port, struct rte_mempool *mbuf_pool) > > > > +{ > > > > + struct rte_eth_conf port_conf = port_conf_default; > > > > + const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1; > > > > + int retval; > > > > + uint16_t q; > > > > + > > > > + if (port >= rte_eth_dev_count()) > > > > + return -1; > > > > + > > > > + retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf); > > > > + if (retval != 0) > > > > + return retval; > > > > + > > > > + for (q = 0; q < rxRings; q++) { > > > > + retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE, > > > > + rte_eth_dev_socket_id(port), > > > > + &rx_conf_default, mbuf_pool); > > > > + if (retval < 0) > > > > + return retval; > > > > + } > > > > + > > > > + for (q = 0; q < txRings; q++) { > > > > + retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE, > > > > + rte_eth_dev_socket_id(port), > > > > + &tx_conf_default); > > > > + if (retval < 0) > > > > + return retval; > > > > + } > > > > + > > > > + retval = rte_eth_dev_start(port); > > > > + if (retval < 0) > > > > + return retval; > > > > + > > > > + struct rte_eth_link link; > > > > + rte_eth_link_get_nowait(port, &link); > > > > + if (!link.link_status) { > > > > + sleep(1); > > > > + rte_eth_link_get_nowait(port, &link); > > > > + } > > > > + > > > > + if (!link.link_status) { > > > > + printf("Link down on port %"PRIu8"\n", port); > > > > + return 0; > > > > + } > > > > + > > > > + struct ether_addr addr; > > > > + rte_eth_macaddr_get(port, &addr); > > > > + printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 > > > > + " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", > > > > + (unsigned)port, > > > > + addr.addr_bytes[0], addr.addr_bytes[1], > > > > + addr.addr_bytes[2], addr.addr_bytes[3], > > > > + addr.addr_bytes[4], addr.addr_bytes[5]); > > > > + > > > > + rte_eth_promiscuous_enable(port); > > > > + > > > > + return 0; > > > > +} > > > > + > > > > +struct lcore_params { > > > > + unsigned worker_id; > > > > + struct rte_distributor *d; > > > > + struct rte_ring *r; > > > > +}; > > > > + > > > > +static __attribute__((noreturn)) void lcore_rx(struct > > > > +lcore_params *p) { > > > > + struct rte_distributor *d = p->d; > > > > + struct rte_ring *r = p->r; > > > > + const uint8_t nb_ports = rte_eth_dev_count(); > > > > + const int socket_id = rte_socket_id(); > > > > + uint8_t port; > > > > + > > > > + for (port = 0; port < nb_ports; port++) { > > > > + /* skip ports that are not enabled */ > > > > + if ((enabled_port_mask & (1 << port)) == 0) > > > > + continue; > > > > + > > > > + if (rte_eth_dev_socket_id(port) > 0 && > > > > + rte_eth_dev_socket_id(port) != socket_id) > > > > + printf("WARNING, port %u is on remote NUMA node to > " > > > > + "RX thread.\n\tPerformance will not " > > > > + "be optimal.\n", port); > > > > + } > > > > + > > > > + printf("\nCore %u doing packet RX.\n", rte_lcore_id()); > > > > + port = 0; > > > > + for (;;) { > > > > + /* skip ports that are not enabled */ > > > > + if ((enabled_port_mask & (1 << port)) == 0) { > > > > + if (++port == nb_ports) > > > > + port = 0; > > > > + continue; > > > > + } > > > > + struct rte_mbuf *bufs[BURST_SIZE*2]; > > > > + const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, > > > > + BURST_SIZE); > > > > + app_stats.rx.rx_pkts += nb_rx; > > > > + > > > > + rte_distributor_process(d, bufs, nb_rx); > > > > + const uint16_t nb_ret = rte_distributor_returned_pkts(d, > > > > + bufs, BURST_SIZE*2); > > > > + app_stats.rx.returned_pkts += nb_ret; > > > > + if (unlikely(nb_ret == 0)) > > > > + continue; > > > > + > > > > + uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret); > > > > + app_stats.rx.enqueued_pkts += sent; > > > > + if (unlikely(sent < nb_ret)) { > > > > + LOG_DEBUG(DISTRAPP, "%s:Packet loss due to full > ring\n", __func__); > > > > + while (sent < nb_ret) > > > > + rte_pktmbuf_free(bufs[sent++]); > > > > + } > > > > + if (++port == nb_ports) > > > > + port = 0; > > > > + } > > > > +} > > > > + > > > > +static inline void > > > > +flush_one_port(struct output_buffer *outbuf, uint8_t outp) { > > > > + unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs, > > > > + outbuf->count); > > > > + app_stats.tx.tx_pkts += nb_tx; > > > > + > > > > + if (unlikely(nb_tx < outbuf->count)) { > > > > + LOG_DEBUG(DISTRAPP, "%s:Packet loss with tx_burst\n", > __func__); > > > > + do { > > > > + rte_pktmbuf_free(outbuf->mbufs[nb_tx]); > > > > + } while (++nb_tx < outbuf->count); > > > > + } > > > > + outbuf->count = 0; > > > > +} > > > > + > > > > +static inline void > > > > +flush_all_ports(struct output_buffer *tx_buffers, uint8_t > > > > +nb_ports) { > > > > + uint8_t outp; > > > > + for (outp = 0; outp < nb_ports; outp++) { > > > > + /* skip ports that are not enabled */ > > > > + if ((enabled_port_mask & (1 << outp)) == 0) > > > > + continue; > > > > + > > > > + if (tx_buffers[outp].count == 0) > > > > + continue; > > > > + > > > > + flush_one_port(&tx_buffers[outp], outp); > > > > + } > > > > +} > > > > + > > > > +static __attribute__((noreturn)) void lcore_tx(struct rte_ring > > > > +*in_r) { > > > > + static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS]; > > > > + const uint8_t nb_ports = rte_eth_dev_count(); > > > > + const int socket_id = rte_socket_id(); > > > > + uint8_t port; > > > > + > > > > + for (port = 0; port < nb_ports; port++) { > > > > + /* skip ports that are not enabled */ > > > > + if ((enabled_port_mask & (1 << port)) == 0) > > > > + continue; > > > > + > > > > + if (rte_eth_dev_socket_id(port) > 0 && > > > > + rte_eth_dev_socket_id(port) != socket_id) > > > > + printf("WARNING, port %u is on remote NUMA node to > " > > > > + "TX thread.\n\tPerformance will not " > > > > + "be optimal.\n", port); > > > > + } > > > > + > > > > + printf("\nCore %u doing packet TX.\n", rte_lcore_id()); > > > > + for (;;) { > > > > + for (port = 0; port < nb_ports; port++) { > > > > + /* skip ports that are not enabled */ > > > > + if ((enabled_port_mask & (1 << port)) == 0) > > > > + continue; > > > > + > > > > + struct rte_mbuf *bufs[BURST_SIZE]; > > > > + const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, > > > > + (void *)bufs, BURST_SIZE); > > > > + app_stats.tx.dequeue_pkts += nb_rx; > > > > + > > > > + /* if we get no traffic, flush anything we have */ > > > > + if (unlikely(nb_rx == 0)) { > > > > + flush_all_ports(tx_buffers, nb_ports); > > > > + continue; > > > > + } > > > > + > > > > + /* for traffic we receive, queue it up for transmit */ > > > > + uint16_t i; > > > > + _mm_prefetch(bufs[0], 0); > > > > + _mm_prefetch(bufs[1], 0); > > > > + _mm_prefetch(bufs[2], 0); > > > > + for (i = 0; i < nb_rx; i++) { > > > > + struct output_buffer *outbuf; > > > > + uint8_t outp; > > > > + _mm_prefetch(bufs[i + 3], 0); > > > > + /* workers should update in_port to hold the > > > > + * output port value */ > > > > + outp = bufs[i]->port; > > > > + /* skip ports that are not enabled */ > > > > + if ((enabled_port_mask & (1 << outp)) == 0) > > > > + continue; > > > > + > > > > + outbuf = &tx_buffers[outp]; > > > > + outbuf->mbufs[outbuf->count++] = bufs[i]; > > > > + if (outbuf->count == BURST_SIZE) > > > > + flush_one_port(outbuf, outp); > > > > + } > > > > + } > > > > + } > > > > +} > > > > + > > > > + > > > > +static __attribute__((noreturn)) void lcore_worker(struct > > > > +lcore_params *p) { > > > > + struct rte_distributor *d = p->d; > > > > + const unsigned id = p->worker_id; > > > > + /* for single port, xor_val will be zero so we won't modify the output > > > > + * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa > > > > + */ > > > > + const unsigned xor_val = (rte_eth_dev_count() > 1); > > > > + struct rte_mbuf *buf = NULL; > > > > + > > > > + printf("\nCore %u acting as worker core.\n", rte_lcore_id()); > > > > + for (;;) { > > > > + buf = rte_distributor_get_pkt(d, id, buf); > > > > + buf->port ^= xor_val; > > > > + } > > > > +} > > > > + > > > > +static void > > > > +int_handler(int sig_num) > > > > +{ > > > > + struct rte_eth_stats eth_stats; > > > > + unsigned i; > > > > + > > > > + printf("Exiting on signal %d\n", sig_num); > > > > + > > > > + printf("\nRX thread stats:\n"); > > > > + printf(" - Received: %"PRIu64"\n", app_stats.rx.rx_pkts); > > > > + printf(" - Processed: %"PRIu64"\n", app_stats.rx.returned_pkts); > > > > + printf(" - Enqueued: %"PRIu64"\n", app_stats.rx.enqueued_pkts); > > > > + > > > > + printf("\nTX thread stats:\n"); > > > > + printf(" - Dequeued: %"PRIu64"\n", app_stats.tx.dequeue_pkts); > > > > + printf(" - Transmitted: %"PRIu64"\n", app_stats.tx.tx_pkts); > > > > + > > > > + for (i = 0; i < rte_eth_dev_count(); i++) { > > > > + rte_eth_stats_get(i, ð_stats); > > > > + printf("\nPort %u stats:\n", i); > > > > + printf(" - Pkts in: %"PRIu64"\n", eth_stats.ipackets); > > > > + printf(" - Pkts out: %"PRIu64"\n", eth_stats.opackets); > > > > + printf(" - In Errs: %"PRIu64"\n", eth_stats.ierrors); > > > > + printf(" - Out Errs: %"PRIu64"\n", eth_stats.oerrors); > > > > + printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf); > > > > + } > > > > + exit(0); > > > rte_exit here? Also, this is a pretty ungraceful exit strategy as > > > all the threads you've created and memory you've allocated are just > forgotten here. > > > Given that dpdk mempools are shared, this has the potential to leak > > > lots of memory if other apps are using the dpdk at the same time > > > that you run this. You probably want to use the sigint handler to > > > raise a flag to the tx/rx threads to shutdown gracefully, and then free your > allocated memory and mempool. > > > > > > Neil > > > > > > > Unless the different processes are explicitly cooperating as > > primary/secondary, the mempools are not shared. I just don't see the > > need for this app to do more cleanup on ctrl-c signal, as it's not > > intended to be a multiprocess app, and there is little that any > > secondary process could do to work with this app, except possibly some > > resource monitoring, which would be completely unaffected by it exiting the > way it does. > > > Ah, ok, so we don't use a common shared pool between isolated processes > then, thats good. Still though, this is a sample application, I think its lazy > programming practice to illustrate to application developers that its generally ok > to exit programs without freeing your resources. Its about 20 lines of additional > code to change the sigint handler to flag an exit condition, and have all the > other threads join on it. > > Neil 1)I had sent v5 patch which handles graceful shutdown of rx and tx threads upon SIGINT 2)Worker thread graceful shutdown was not handled as of now as it needs some change in lcore_worker logic , which will be done in future enhancements. 3)Freeing of mempool is also not handled , as the framework support is not available. 4)Cleaning of rx/tx queues not done, as it needs some extensive logic which we haven't planned as of now. Will check the possibility of doing it in future enhancements i.e in next version of sample application. /Reshma > > > /Bruce > > -------------------------------------------------------------- Intel Shannon Limited Registered in Ireland Registered Office: Collinstown Industrial Park, Leixlip, County Kildare Registered Number: 308263 Business address: Dromore House, East Park, Shannon, Co. Clare This e-mail and any attachments may contain confidential material for the sole use of the intended recipient(s). Any review or distribution by others is strictly prohibited. If you are not the intended recipient, please contact the sender and delete all copies.
On Wed, Oct 01, 2014 at 02:47:00PM +0000, Pattan, Reshma wrote: > > > > -----Original Message----- > > From: Neil Horman [mailto:nhorman@tuxdriver.com] > > Sent: Tuesday, September 30, 2014 2:40 PM > > To: Richardson, Bruce > > Cc: Pattan, Reshma; dev@dpdk.org > > Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app > > > > On Tue, Sep 30, 2014 at 01:18:28PM +0100, Bruce Richardson wrote: > > > On Tue, Sep 30, 2014 at 07:34:45AM -0400, Neil Horman wrote: > > > > On Tue, Sep 30, 2014 at 11:39:37AM +0100, reshmapa wrote: > > > > > From: Reshma Pattan <reshma.pattan@intel.com> > > > > > > > > > > A new sample app that shows the usage of the distributor library. > > > > > This app works as follows: > > > > > > > > > > * An RX thread runs which pulls packets from each ethernet port in turn > > > > > and passes those packets to worker using a distributor component. > > > > > * The workers take the packets in turn, and determine the output port > > > > > for those packets using basic l2forwarding doing an xor on the source > > > > > port id. > > > > > * The RX thread takes the returned packets from the workers and enqueue > > > > > those packets into an rte_ring structure. > > > > > * A TX thread pulls the packets off the rte_ring structure and then > > > > > sends each packet out the output port specified previously by > > > > > the worker > > > > > * Command-line option support provided only for portmask. > > > > > > > > > > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com> > > > > > Signed-off-by: Reshma Pattan <reshma.pattan@intel.com> > > > > > --- > > > > > examples/Makefile | 1 + > > > > > examples/distributor_app/Makefile | 57 ++++ > > > > > examples/distributor_app/main.c | 600 > > +++++++++++++++++++++++++++++++++++++ > > > > > examples/distributor_app/main.h | 46 +++ > > > > > 4 files changed, 704 insertions(+), 0 deletions(-) create mode > > > > > 100644 examples/distributor_app/Makefile create mode 100644 > > > > > examples/distributor_app/main.c create mode 100644 > > > > > examples/distributor_app/main.h > > > > > > > > > > diff --git a/examples/Makefile b/examples/Makefile index > > > > > 6245f83..2ba82b0 100644 > > > > > --- a/examples/Makefile > > > > > +++ b/examples/Makefile > > > > > @@ -66,5 +66,6 @@ DIRS-y += vhost > > > > > DIRS-$(CONFIG_RTE_LIBRTE_XEN_DOM0) += vhost_xen DIRS-y += vmdq > > > > > DIRS-y += vmdq_dcb > > > > > +DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += distributor_app > > > > > > > > > > include $(RTE_SDK)/mk/rte.extsubdir.mk diff --git > > > > > a/examples/distributor_app/Makefile > > > > > b/examples/distributor_app/Makefile > > > > > new file mode 100644 > > > > > index 0000000..6a5bada > > > > > --- /dev/null > > > > > +++ b/examples/distributor_app/Makefile > > > > > @@ -0,0 +1,57 @@ > > > > > +# BSD LICENSE > > > > > +# > > > > > +# Copyright(c) 2010-2014 Intel Corporation. All rights reserved. > > > > > +# All rights reserved. > > > > > +# > > > > > +# Redistribution and use in source and binary forms, with or without > > > > > +# modification, are permitted provided that the following conditions > > > > > +# are met: > > > > > +# > > > > > +# * Redistributions of source code must retain the above copyright > > > > > +# notice, this list of conditions and the following disclaimer. > > > > > +# * Redistributions in binary form must reproduce the above copyright > > > > > +# notice, this list of conditions and the following disclaimer in > > > > > +# the documentation and/or other materials provided with the > > > > > +# distribution. > > > > > +# * Neither the name of Intel Corporation nor the names of its > > > > > +# contributors may be used to endorse or promote products derived > > > > > +# from this software without specific prior written permission. > > > > > +# > > > > > +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND > > CONTRIBUTORS > > > > > +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT > > NOT > > > > > +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND > > FITNESS FOR > > > > > +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE > > COPYRIGHT > > > > > +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, > > INCIDENTAL, > > > > > +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, > > BUT NOT > > > > > +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; > > LOSS OF USE, > > > > > +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED > > AND ON ANY > > > > > +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR > > TORT > > > > > +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT > > OF THE USE > > > > > +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH > > DAMAGE. > > > > > + > > > > > +ifeq ($(RTE_SDK),) > > > > > +$(error "Please define RTE_SDK environment variable") endif > > > > > + > > > > > +# Default target, can be overriden by command line or environment > > > > > +RTE_TARGET ?= x86_64-native-linuxapp-gcc > > > > > + > > > > > +include $(RTE_SDK)/mk/rte.vars.mk > > > > > + > > > > > +# binary name > > > > > +APP = distributor_app > > > > > + > > > > > +# all source are stored in SRCS-y SRCS-y := main.c > > > > > + > > > > > +CFLAGS += $(WERROR_FLAGS) > > > > > + > > > > > +# workaround for a gcc bug with noreturn attribute # > > > > > +http://gcc.gnu.org/bugzilla/show_bug.cgi?id=12603 > > > > > +ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y) CFLAGS_main.o += > > > > > +-Wno-return-type endif > > > > > + > > > > > +EXTRA_CFLAGS += -O3 -Wfatal-errors > > > > > + > > > > > +include $(RTE_SDK)/mk/rte.extapp.mk > > > > > diff --git a/examples/distributor_app/main.c > > > > > b/examples/distributor_app/main.c new file mode 100644 index > > > > > 0000000..f555d93 > > > > > --- /dev/null > > > > > +++ b/examples/distributor_app/main.c > > > > > @@ -0,0 +1,600 @@ > > > > > +/*- > > > > > + * BSD LICENSE > > > > > + * > > > > > + * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. > > > > > + * All rights reserved. > > > > > + * > > > > > + * Redistribution and use in source and binary forms, with or without > > > > > + * modification, are permitted provided that the following conditions > > > > > + * are met: > > > > > + * > > > > > + * * Redistributions of source code must retain the above copyright > > > > > + * notice, this list of conditions and the following disclaimer. > > > > > + * * Redistributions in binary form must reproduce the above copyright > > > > > + * notice, this list of conditions and the following disclaimer in > > > > > + * the documentation and/or other materials provided with the > > > > > + * distribution. > > > > > + * * Neither the name of Intel Corporation nor the names of its > > > > > + * contributors may be used to endorse or promote products derived > > > > > + * from this software without specific prior written permission. > > > > > + * > > > > > + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND > > CONTRIBUTORS > > > > > + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, > > BUT NOT > > > > > + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND > > FITNESS FOR > > > > > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE > > COPYRIGHT > > > > > + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, > > INCIDENTAL, > > > > > + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, > > BUT NOT > > > > > + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; > > LOSS OF USE, > > > > > + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED > > AND ON ANY > > > > > + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR > > TORT > > > > > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT > > OF THE USE > > > > > + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH > > DAMAGE. > > > > > + */ > > > > > + > > > > > +#include <stdint.h> > > > > > +#include <inttypes.h> > > > > > +#include <unistd.h> > > > > > +#include <signal.h> > > > > > +#include <getopt.h> > > > > > + > > > > > +#include <rte_eal.h> > > > > > +#include <rte_ethdev.h> > > > > > +#include <rte_cycles.h> > > > > > +#include <rte_malloc.h> > > > > > +#include <rte_debug.h> > > > > > +#include <rte_distributor.h> > > > > > + > > > > > +#include "main.h" > > > > > + > > > > > +#define RX_RING_SIZE 256 > > > > > +#define RX_FREE_THRESH 32 > > > > > +#define RX_PTHRESH 8 > > > > > +#define RX_HTHRESH 8 > > > > > +#define RX_WTHRESH 0 > > > > > + > > > > > +#define TX_RING_SIZE 512 > > > > > +#define TX_FREE_THRESH 32 > > > > > +#define TX_PTHRESH 32 > > > > > +#define TX_HTHRESH 0 > > > > > +#define TX_WTHRESH 0 > > > > > +#define TX_RSBIT_THRESH 32 > > > > > +#define TX_Q_FLAGS (ETH_TXQ_FLAGS_NOMULTSEGS | > > ETH_TXQ_FLAGS_NOVLANOFFL |\ > > > > > + ETH_TXQ_FLAGS_NOXSUMSCTP | ETH_TXQ_FLAGS_NOXSUMUDP | \ > > > > > + ETH_TXQ_FLAGS_NOXSUMTCP) > > > > > + > > > > > +#define NUM_MBUFS ((64*1024)-1) > > > > > +#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + > > > > > +RTE_PKTMBUF_HEADROOM) #define MBUF_CACHE_SIZE 250 #define > > > > > +BURST_SIZE 32 #define RTE_RING_SZ 1024 > > > > > + > > > > > +/* uncommnet below line to enable debug logs */ > > > > > +/* #define DEBUG */ > > > > > + > > > > > +#ifdef DEBUG > > > > > +#define LOG_LEVEL RTE_LOG_DEBUG > > > > > +#define LOG_DEBUG(log_type, fmt, args...) do { \ > > > > > + RTE_LOG(DEBUG, log_type, fmt, ##args) \ > > > > > +} while (0) > > > > > +#else > > > > > +#define LOG_LEVEL RTE_LOG_INFO > > > > > +#define LOG_DEBUG(log_type, fmt, args...) do {} while (0) #endif > > > > > + > > > > > +#define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1 > > > > > + > > > > > +/* mask of enabled ports */ > > > > > +static uint32_t enabled_port_mask = 0; > > > > > + > > > > > +static volatile struct app_stats { > > > > > + struct { > > > > > + uint64_t rx_pkts; > > > > > + uint64_t returned_pkts; > > > > > + uint64_t enqueued_pkts; > > > > > + } rx __rte_cache_aligned; > > > > > + > > > > > + struct { > > > > > + uint64_t dequeue_pkts; > > > > > + uint64_t tx_pkts; > > > > > + } tx __rte_cache_aligned; > > > > > +} app_stats; > > > > > + > > > > > +static const struct rte_eth_conf port_conf_default = { > > > > > + .rxmode = { > > > > > + .mq_mode = ETH_MQ_RX_RSS, > > > > > + .max_rx_pkt_len = ETHER_MAX_LEN, > > > > > + .split_hdr_size = 0, > > > > > + .header_split = 0, /**< Header Split disabled */ > > > > > + .hw_ip_checksum = 0, /**< IP checksum offload enabled */ > > > > > + .hw_vlan_filter = 0, /**< VLAN filtering disabled */ > > > > > + .jumbo_frame = 0, /**< Jumbo Frame Support disabled */ > > > > > + .hw_strip_crc = 0, /**< CRC stripped by hardware */ > > > > > + }, > > > > > + .txmode = { > > > > > + .mq_mode = ETH_MQ_TX_NONE, > > > > > + }, > > > > > + .lpbk_mode = 0, > > > > > + .rx_adv_conf = { > > > > > + .rss_conf = { > > > > > + .rss_hf = ETH_RSS_IPV4 | ETH_RSS_IPV6 | > > > > > + ETH_RSS_IPV4_TCP | > > ETH_RSS_IPV4_UDP | > > > > > + ETH_RSS_IPV6_TCP | > > ETH_RSS_IPV6_UDP, > > > > > + } > > > > > + }, > > > > > +}; > > > > > + > > > > > +static const struct rte_eth_rxconf rx_conf_default = { > > > > > + .rx_thresh = { > > > > > + .pthresh = RX_PTHRESH, > > > > > + .hthresh = RX_HTHRESH, > > > > > + .wthresh = RX_WTHRESH, > > > > > + }, > > > > > + .rx_free_thresh = RX_FREE_THRESH, > > > > > + .rx_drop_en = 0, > > > > > +}; > > > > > + > > > > > +static const struct rte_eth_txconf tx_conf_default = { > > > > > + .tx_thresh = { > > > > > + .pthresh = TX_PTHRESH, > > > > > + .hthresh = TX_HTHRESH, > > > > > + .wthresh = TX_WTHRESH, > > > > > + }, > > > > > + .tx_free_thresh = TX_FREE_THRESH, > > > > > + .tx_rs_thresh = TX_RSBIT_THRESH, > > > > > + .txq_flags = TX_Q_FLAGS > > > > > + > > > > > +}; > > > > > + > > > > > +struct output_buffer { > > > > > + unsigned count; > > > > > + struct rte_mbuf *mbufs[BURST_SIZE]; }; > > > > > + > > > > > +/* > > > > > + * Initialises a given port using global settings and with the rx > > > > > +buffers > > > > > + * coming from the mbuf_pool passed as parameter */ static > > > > > +inline int port_init(uint8_t port, struct rte_mempool *mbuf_pool) > > > > > +{ > > > > > + struct rte_eth_conf port_conf = port_conf_default; > > > > > + const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1; > > > > > + int retval; > > > > > + uint16_t q; > > > > > + > > > > > + if (port >= rte_eth_dev_count()) > > > > > + return -1; > > > > > + > > > > > + retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf); > > > > > + if (retval != 0) > > > > > + return retval; > > > > > + > > > > > + for (q = 0; q < rxRings; q++) { > > > > > + retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE, > > > > > + rte_eth_dev_socket_id(port), > > > > > + &rx_conf_default, mbuf_pool); > > > > > + if (retval < 0) > > > > > + return retval; > > > > > + } > > > > > + > > > > > + for (q = 0; q < txRings; q++) { > > > > > + retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE, > > > > > + rte_eth_dev_socket_id(port), > > > > > + &tx_conf_default); > > > > > + if (retval < 0) > > > > > + return retval; > > > > > + } > > > > > + > > > > > + retval = rte_eth_dev_start(port); > > > > > + if (retval < 0) > > > > > + return retval; > > > > > + > > > > > + struct rte_eth_link link; > > > > > + rte_eth_link_get_nowait(port, &link); > > > > > + if (!link.link_status) { > > > > > + sleep(1); > > > > > + rte_eth_link_get_nowait(port, &link); > > > > > + } > > > > > + > > > > > + if (!link.link_status) { > > > > > + printf("Link down on port %"PRIu8"\n", port); > > > > > + return 0; > > > > > + } > > > > > + > > > > > + struct ether_addr addr; > > > > > + rte_eth_macaddr_get(port, &addr); > > > > > + printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 > > > > > + " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", > > > > > + (unsigned)port, > > > > > + addr.addr_bytes[0], addr.addr_bytes[1], > > > > > + addr.addr_bytes[2], addr.addr_bytes[3], > > > > > + addr.addr_bytes[4], addr.addr_bytes[5]); > > > > > + > > > > > + rte_eth_promiscuous_enable(port); > > > > > + > > > > > + return 0; > > > > > +} > > > > > + > > > > > +struct lcore_params { > > > > > + unsigned worker_id; > > > > > + struct rte_distributor *d; > > > > > + struct rte_ring *r; > > > > > +}; > > > > > + > > > > > +static __attribute__((noreturn)) void lcore_rx(struct > > > > > +lcore_params *p) { > > > > > + struct rte_distributor *d = p->d; > > > > > + struct rte_ring *r = p->r; > > > > > + const uint8_t nb_ports = rte_eth_dev_count(); > > > > > + const int socket_id = rte_socket_id(); > > > > > + uint8_t port; > > > > > + > > > > > + for (port = 0; port < nb_ports; port++) { > > > > > + /* skip ports that are not enabled */ > > > > > + if ((enabled_port_mask & (1 << port)) == 0) > > > > > + continue; > > > > > + > > > > > + if (rte_eth_dev_socket_id(port) > 0 && > > > > > + rte_eth_dev_socket_id(port) != socket_id) > > > > > + printf("WARNING, port %u is on remote NUMA node to > > " > > > > > + "RX thread.\n\tPerformance will not " > > > > > + "be optimal.\n", port); > > > > > + } > > > > > + > > > > > + printf("\nCore %u doing packet RX.\n", rte_lcore_id()); > > > > > + port = 0; > > > > > + for (;;) { > > > > > + /* skip ports that are not enabled */ > > > > > + if ((enabled_port_mask & (1 << port)) == 0) { > > > > > + if (++port == nb_ports) > > > > > + port = 0; > > > > > + continue; > > > > > + } > > > > > + struct rte_mbuf *bufs[BURST_SIZE*2]; > > > > > + const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, > > > > > + BURST_SIZE); > > > > > + app_stats.rx.rx_pkts += nb_rx; > > > > > + > > > > > + rte_distributor_process(d, bufs, nb_rx); > > > > > + const uint16_t nb_ret = rte_distributor_returned_pkts(d, > > > > > + bufs, BURST_SIZE*2); > > > > > + app_stats.rx.returned_pkts += nb_ret; > > > > > + if (unlikely(nb_ret == 0)) > > > > > + continue; > > > > > + > > > > > + uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret); > > > > > + app_stats.rx.enqueued_pkts += sent; > > > > > + if (unlikely(sent < nb_ret)) { > > > > > + LOG_DEBUG(DISTRAPP, "%s:Packet loss due to full > > ring\n", __func__); > > > > > + while (sent < nb_ret) > > > > > + rte_pktmbuf_free(bufs[sent++]); > > > > > + } > > > > > + if (++port == nb_ports) > > > > > + port = 0; > > > > > + } > > > > > +} > > > > > + > > > > > +static inline void > > > > > +flush_one_port(struct output_buffer *outbuf, uint8_t outp) { > > > > > + unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs, > > > > > + outbuf->count); > > > > > + app_stats.tx.tx_pkts += nb_tx; > > > > > + > > > > > + if (unlikely(nb_tx < outbuf->count)) { > > > > > + LOG_DEBUG(DISTRAPP, "%s:Packet loss with tx_burst\n", > > __func__); > > > > > + do { > > > > > + rte_pktmbuf_free(outbuf->mbufs[nb_tx]); > > > > > + } while (++nb_tx < outbuf->count); > > > > > + } > > > > > + outbuf->count = 0; > > > > > +} > > > > > + > > > > > +static inline void > > > > > +flush_all_ports(struct output_buffer *tx_buffers, uint8_t > > > > > +nb_ports) { > > > > > + uint8_t outp; > > > > > + for (outp = 0; outp < nb_ports; outp++) { > > > > > + /* skip ports that are not enabled */ > > > > > + if ((enabled_port_mask & (1 << outp)) == 0) > > > > > + continue; > > > > > + > > > > > + if (tx_buffers[outp].count == 0) > > > > > + continue; > > > > > + > > > > > + flush_one_port(&tx_buffers[outp], outp); > > > > > + } > > > > > +} > > > > > + > > > > > +static __attribute__((noreturn)) void lcore_tx(struct rte_ring > > > > > +*in_r) { > > > > > + static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS]; > > > > > + const uint8_t nb_ports = rte_eth_dev_count(); > > > > > + const int socket_id = rte_socket_id(); > > > > > + uint8_t port; > > > > > + > > > > > + for (port = 0; port < nb_ports; port++) { > > > > > + /* skip ports that are not enabled */ > > > > > + if ((enabled_port_mask & (1 << port)) == 0) > > > > > + continue; > > > > > + > > > > > + if (rte_eth_dev_socket_id(port) > 0 && > > > > > + rte_eth_dev_socket_id(port) != socket_id) > > > > > + printf("WARNING, port %u is on remote NUMA node to > > " > > > > > + "TX thread.\n\tPerformance will not " > > > > > + "be optimal.\n", port); > > > > > + } > > > > > + > > > > > + printf("\nCore %u doing packet TX.\n", rte_lcore_id()); > > > > > + for (;;) { > > > > > + for (port = 0; port < nb_ports; port++) { > > > > > + /* skip ports that are not enabled */ > > > > > + if ((enabled_port_mask & (1 << port)) == 0) > > > > > + continue; > > > > > + > > > > > + struct rte_mbuf *bufs[BURST_SIZE]; > > > > > + const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, > > > > > + (void *)bufs, BURST_SIZE); > > > > > + app_stats.tx.dequeue_pkts += nb_rx; > > > > > + > > > > > + /* if we get no traffic, flush anything we have */ > > > > > + if (unlikely(nb_rx == 0)) { > > > > > + flush_all_ports(tx_buffers, nb_ports); > > > > > + continue; > > > > > + } > > > > > + > > > > > + /* for traffic we receive, queue it up for transmit */ > > > > > + uint16_t i; > > > > > + _mm_prefetch(bufs[0], 0); > > > > > + _mm_prefetch(bufs[1], 0); > > > > > + _mm_prefetch(bufs[2], 0); > > > > > + for (i = 0; i < nb_rx; i++) { > > > > > + struct output_buffer *outbuf; > > > > > + uint8_t outp; > > > > > + _mm_prefetch(bufs[i + 3], 0); > > > > > + /* workers should update in_port to hold the > > > > > + * output port value */ > > > > > + outp = bufs[i]->port; > > > > > + /* skip ports that are not enabled */ > > > > > + if ((enabled_port_mask & (1 << outp)) == 0) > > > > > + continue; > > > > > + > > > > > + outbuf = &tx_buffers[outp]; > > > > > + outbuf->mbufs[outbuf->count++] = bufs[i]; > > > > > + if (outbuf->count == BURST_SIZE) > > > > > + flush_one_port(outbuf, outp); > > > > > + } > > > > > + } > > > > > + } > > > > > +} > > > > > + > > > > > + > > > > > +static __attribute__((noreturn)) void lcore_worker(struct > > > > > +lcore_params *p) { > > > > > + struct rte_distributor *d = p->d; > > > > > + const unsigned id = p->worker_id; > > > > > + /* for single port, xor_val will be zero so we won't modify the output > > > > > + * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa > > > > > + */ > > > > > + const unsigned xor_val = (rte_eth_dev_count() > 1); > > > > > + struct rte_mbuf *buf = NULL; > > > > > + > > > > > + printf("\nCore %u acting as worker core.\n", rte_lcore_id()); > > > > > + for (;;) { > > > > > + buf = rte_distributor_get_pkt(d, id, buf); > > > > > + buf->port ^= xor_val; > > > > > + } > > > > > +} > > > > > + > > > > > +static void > > > > > +int_handler(int sig_num) > > > > > +{ > > > > > + struct rte_eth_stats eth_stats; > > > > > + unsigned i; > > > > > + > > > > > + printf("Exiting on signal %d\n", sig_num); > > > > > + > > > > > + printf("\nRX thread stats:\n"); > > > > > + printf(" - Received: %"PRIu64"\n", app_stats.rx.rx_pkts); > > > > > + printf(" - Processed: %"PRIu64"\n", app_stats.rx.returned_pkts); > > > > > + printf(" - Enqueued: %"PRIu64"\n", app_stats.rx.enqueued_pkts); > > > > > + > > > > > + printf("\nTX thread stats:\n"); > > > > > + printf(" - Dequeued: %"PRIu64"\n", app_stats.tx.dequeue_pkts); > > > > > + printf(" - Transmitted: %"PRIu64"\n", app_stats.tx.tx_pkts); > > > > > + > > > > > + for (i = 0; i < rte_eth_dev_count(); i++) { > > > > > + rte_eth_stats_get(i, ð_stats); > > > > > + printf("\nPort %u stats:\n", i); > > > > > + printf(" - Pkts in: %"PRIu64"\n", eth_stats.ipackets); > > > > > + printf(" - Pkts out: %"PRIu64"\n", eth_stats.opackets); > > > > > + printf(" - In Errs: %"PRIu64"\n", eth_stats.ierrors); > > > > > + printf(" - Out Errs: %"PRIu64"\n", eth_stats.oerrors); > > > > > + printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf); > > > > > + } > > > > > + exit(0); > > > > rte_exit here? Also, this is a pretty ungraceful exit strategy as > > > > all the threads you've created and memory you've allocated are just > > forgotten here. > > > > Given that dpdk mempools are shared, this has the potential to leak > > > > lots of memory if other apps are using the dpdk at the same time > > > > that you run this. You probably want to use the sigint handler to > > > > raise a flag to the tx/rx threads to shutdown gracefully, and then free your > > allocated memory and mempool. > > > > > > > > Neil > > > > > > > > > > Unless the different processes are explicitly cooperating as > > > primary/secondary, the mempools are not shared. I just don't see the > > > need for this app to do more cleanup on ctrl-c signal, as it's not > > > intended to be a multiprocess app, and there is little that any > > > secondary process could do to work with this app, except possibly some > > > resource monitoring, which would be completely unaffected by it exiting the > > way it does. > > > > > Ah, ok, so we don't use a common shared pool between isolated processes > > then, thats good. Still though, this is a sample application, I think its lazy > > programming practice to illustrate to application developers that its generally ok > > to exit programs without freeing your resources. Its about 20 lines of additional > > code to change the sigint handler to flag an exit condition, and have all the > > other threads join on it. > > > > Neil > > 1)I had sent v5 patch which handles graceful shutdown of rx and tx threads upon SIGINT I see it and will take a look shortly, thanks. > 2)Worker thread graceful shutdown was not handled as of now as it needs some change in lcore_worker logic , which will be done in future enhancements. Not sure I understand what you mean here. Can you elaborate? > 3)Freeing of mempool is also not handled , as the framework support is not available. Ew, I hadn't noticed that, freeing of mempools seems like something we should implement. > 4)Cleaning of rx/tx queues not done, as it needs some extensive logic which we haven't planned as of now. Will check the possibility of doing it in future enhancements i.e in next version of sample application. We can't just flush the queues after we shutdown the workers? I presume a queue flush operation exists, yes? Neil > > /Reshma > > > > > > /Bruce > > > > > > -------------------------------------------------------------- > Intel Shannon Limited > Registered in Ireland > Registered Office: Collinstown Industrial Park, Leixlip, County Kildare > Registered Number: 308263 > Business address: Dromore House, East Park, Shannon, Co. Clare > > This e-mail and any attachments may contain confidential material for the sole use of the intended recipient(s). Any review or distribution by others is strictly prohibited. If you are not the intended recipient, please contact the sender and delete all copies. > > >
On Wed, Oct 01, 2014 at 10:56:20AM -0400, Neil Horman wrote: > On Wed, Oct 01, 2014 at 02:47:00PM +0000, Pattan, Reshma wrote: > > > > > > > -----Original Message----- > > > From: Neil Horman [mailto:nhorman@tuxdriver.com] > > > Sent: Tuesday, September 30, 2014 2:40 PM > > > To: Richardson, Bruce > > > Cc: Pattan, Reshma; dev@dpdk.org > > > Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app > > > > > > On Tue, Sep 30, 2014 at 01:18:28PM +0100, Bruce Richardson wrote: > > > > On Tue, Sep 30, 2014 at 07:34:45AM -0400, Neil Horman wrote: > > > > > On Tue, Sep 30, 2014 at 11:39:37AM +0100, reshmapa wrote: > > > > > > From: Reshma Pattan <reshma.pattan@intel.com> > > > > > > > > > > > > A new sample app that shows the usage of the distributor library. > > > > > > This app works as follows: > > > > > > > > > > > > * An RX thread runs which pulls packets from each ethernet port in turn > > > > > > and passes those packets to worker using a distributor component. > > > > > > * The workers take the packets in turn, and determine the output port > > > > > > for those packets using basic l2forwarding doing an xor on the source > > > > > > port id. > > > > > > * The RX thread takes the returned packets from the workers and enqueue > > > > > > those packets into an rte_ring structure. > > > > > > * A TX thread pulls the packets off the rte_ring structure and then > > > > > > sends each packet out the output port specified previously by > > > > > > the worker > > > > > > * Command-line option support provided only for portmask. > > > > > > > > > > > > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com> > > > > > > Signed-off-by: Reshma Pattan <reshma.pattan@intel.com> > > > > > > --- > > > > > > examples/Makefile | 1 + > > > > > > examples/distributor_app/Makefile | 57 ++++ > > > > > > examples/distributor_app/main.c | 600 > > > +++++++++++++++++++++++++++++++++++++ > > > > > > examples/distributor_app/main.h | 46 +++ > > > > > > 4 files changed, 704 insertions(+), 0 deletions(-) create mode > > > > > > 100644 examples/distributor_app/Makefile create mode 100644 > > > > > > examples/distributor_app/main.c create mode 100644 > > > > > > examples/distributor_app/main.h > > > > > > > > > > > > diff --git a/examples/Makefile b/examples/Makefile index > > > > > > 6245f83..2ba82b0 100644 > > > > > > --- a/examples/Makefile > > > > > > +++ b/examples/Makefile > > > > > > @@ -66,5 +66,6 @@ DIRS-y += vhost > > > > > > DIRS-$(CONFIG_RTE_LIBRTE_XEN_DOM0) += vhost_xen DIRS-y += vmdq > > > > > > DIRS-y += vmdq_dcb > > > > > > +DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += distributor_app > > > > > > > > > > > > include $(RTE_SDK)/mk/rte.extsubdir.mk diff --git > > > > > > a/examples/distributor_app/Makefile > > > > > > b/examples/distributor_app/Makefile > > > > > > new file mode 100644 > > > > > > index 0000000..6a5bada > > > > > > --- /dev/null > > > > > > +++ b/examples/distributor_app/Makefile > > > > > > @@ -0,0 +1,57 @@ > > > > > > +# BSD LICENSE > > > > > > +# > > > > > > +# Copyright(c) 2010-2014 Intel Corporation. All rights reserved. > > > > > > +# All rights reserved. > > > > > > +# > > > > > > +# Redistribution and use in source and binary forms, with or without > > > > > > +# modification, are permitted provided that the following conditions > > > > > > +# are met: > > > > > > +# > > > > > > +# * Redistributions of source code must retain the above copyright > > > > > > +# notice, this list of conditions and the following disclaimer. > > > > > > +# * Redistributions in binary form must reproduce the above copyright > > > > > > +# notice, this list of conditions and the following disclaimer in > > > > > > +# the documentation and/or other materials provided with the > > > > > > +# distribution. > > > > > > +# * Neither the name of Intel Corporation nor the names of its > > > > > > +# contributors may be used to endorse or promote products derived > > > > > > +# from this software without specific prior written permission. > > > > > > +# > > > > > > +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND > > > CONTRIBUTORS > > > > > > +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT > > > NOT > > > > > > +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND > > > FITNESS FOR > > > > > > +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE > > > COPYRIGHT > > > > > > +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, > > > INCIDENTAL, > > > > > > +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, > > > BUT NOT > > > > > > +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; > > > LOSS OF USE, > > > > > > +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED > > > AND ON ANY > > > > > > +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR > > > TORT > > > > > > +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT > > > OF THE USE > > > > > > +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH > > > DAMAGE. > > > > > > + > > > > > > +ifeq ($(RTE_SDK),) > > > > > > +$(error "Please define RTE_SDK environment variable") endif > > > > > > + > > > > > > +# Default target, can be overriden by command line or environment > > > > > > +RTE_TARGET ?= x86_64-native-linuxapp-gcc > > > > > > + > > > > > > +include $(RTE_SDK)/mk/rte.vars.mk > > > > > > + > > > > > > +# binary name > > > > > > +APP = distributor_app > > > > > > + > > > > > > +# all source are stored in SRCS-y SRCS-y := main.c > > > > > > + > > > > > > +CFLAGS += $(WERROR_FLAGS) > > > > > > + > > > > > > +# workaround for a gcc bug with noreturn attribute # > > > > > > +http://gcc.gnu.org/bugzilla/show_bug.cgi?id=12603 > > > > > > +ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y) CFLAGS_main.o += > > > > > > +-Wno-return-type endif > > > > > > + > > > > > > +EXTRA_CFLAGS += -O3 -Wfatal-errors > > > > > > + > > > > > > +include $(RTE_SDK)/mk/rte.extapp.mk > > > > > > diff --git a/examples/distributor_app/main.c > > > > > > b/examples/distributor_app/main.c new file mode 100644 index > > > > > > 0000000..f555d93 > > > > > > --- /dev/null > > > > > > +++ b/examples/distributor_app/main.c > > > > > > @@ -0,0 +1,600 @@ > > > > > > +/*- > > > > > > + * BSD LICENSE > > > > > > + * > > > > > > + * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. > > > > > > + * All rights reserved. > > > > > > + * > > > > > > + * Redistribution and use in source and binary forms, with or without > > > > > > + * modification, are permitted provided that the following conditions > > > > > > + * are met: > > > > > > + * > > > > > > + * * Redistributions of source code must retain the above copyright > > > > > > + * notice, this list of conditions and the following disclaimer. > > > > > > + * * Redistributions in binary form must reproduce the above copyright > > > > > > + * notice, this list of conditions and the following disclaimer in > > > > > > + * the documentation and/or other materials provided with the > > > > > > + * distribution. > > > > > > + * * Neither the name of Intel Corporation nor the names of its > > > > > > + * contributors may be used to endorse or promote products derived > > > > > > + * from this software without specific prior written permission. > > > > > > + * > > > > > > + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND > > > CONTRIBUTORS > > > > > > + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, > > > BUT NOT > > > > > > + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND > > > FITNESS FOR > > > > > > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE > > > COPYRIGHT > > > > > > + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, > > > INCIDENTAL, > > > > > > + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, > > > BUT NOT > > > > > > + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; > > > LOSS OF USE, > > > > > > + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED > > > AND ON ANY > > > > > > + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR > > > TORT > > > > > > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT > > > OF THE USE > > > > > > + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH > > > DAMAGE. > > > > > > + */ > > > > > > + > > > > > > +#include <stdint.h> > > > > > > +#include <inttypes.h> > > > > > > +#include <unistd.h> > > > > > > +#include <signal.h> > > > > > > +#include <getopt.h> > > > > > > + > > > > > > +#include <rte_eal.h> > > > > > > +#include <rte_ethdev.h> > > > > > > +#include <rte_cycles.h> > > > > > > +#include <rte_malloc.h> > > > > > > +#include <rte_debug.h> > > > > > > +#include <rte_distributor.h> > > > > > > + > > > > > > +#include "main.h" > > > > > > + > > > > > > +#define RX_RING_SIZE 256 > > > > > > +#define RX_FREE_THRESH 32 > > > > > > +#define RX_PTHRESH 8 > > > > > > +#define RX_HTHRESH 8 > > > > > > +#define RX_WTHRESH 0 > > > > > > + > > > > > > +#define TX_RING_SIZE 512 > > > > > > +#define TX_FREE_THRESH 32 > > > > > > +#define TX_PTHRESH 32 > > > > > > +#define TX_HTHRESH 0 > > > > > > +#define TX_WTHRESH 0 > > > > > > +#define TX_RSBIT_THRESH 32 > > > > > > +#define TX_Q_FLAGS (ETH_TXQ_FLAGS_NOMULTSEGS | > > > ETH_TXQ_FLAGS_NOVLANOFFL |\ > > > > > > + ETH_TXQ_FLAGS_NOXSUMSCTP | ETH_TXQ_FLAGS_NOXSUMUDP | \ > > > > > > + ETH_TXQ_FLAGS_NOXSUMTCP) > > > > > > + > > > > > > +#define NUM_MBUFS ((64*1024)-1) > > > > > > +#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + > > > > > > +RTE_PKTMBUF_HEADROOM) #define MBUF_CACHE_SIZE 250 #define > > > > > > +BURST_SIZE 32 #define RTE_RING_SZ 1024 > > > > > > + > > > > > > +/* uncommnet below line to enable debug logs */ > > > > > > +/* #define DEBUG */ > > > > > > + > > > > > > +#ifdef DEBUG > > > > > > +#define LOG_LEVEL RTE_LOG_DEBUG > > > > > > +#define LOG_DEBUG(log_type, fmt, args...) do { \ > > > > > > + RTE_LOG(DEBUG, log_type, fmt, ##args) \ > > > > > > +} while (0) > > > > > > +#else > > > > > > +#define LOG_LEVEL RTE_LOG_INFO > > > > > > +#define LOG_DEBUG(log_type, fmt, args...) do {} while (0) #endif > > > > > > + > > > > > > +#define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1 > > > > > > + > > > > > > +/* mask of enabled ports */ > > > > > > +static uint32_t enabled_port_mask = 0; > > > > > > + > > > > > > +static volatile struct app_stats { > > > > > > + struct { > > > > > > + uint64_t rx_pkts; > > > > > > + uint64_t returned_pkts; > > > > > > + uint64_t enqueued_pkts; > > > > > > + } rx __rte_cache_aligned; > > > > > > + > > > > > > + struct { > > > > > > + uint64_t dequeue_pkts; > > > > > > + uint64_t tx_pkts; > > > > > > + } tx __rte_cache_aligned; > > > > > > +} app_stats; > > > > > > + > > > > > > +static const struct rte_eth_conf port_conf_default = { > > > > > > + .rxmode = { > > > > > > + .mq_mode = ETH_MQ_RX_RSS, > > > > > > + .max_rx_pkt_len = ETHER_MAX_LEN, > > > > > > + .split_hdr_size = 0, > > > > > > + .header_split = 0, /**< Header Split disabled */ > > > > > > + .hw_ip_checksum = 0, /**< IP checksum offload enabled */ > > > > > > + .hw_vlan_filter = 0, /**< VLAN filtering disabled */ > > > > > > + .jumbo_frame = 0, /**< Jumbo Frame Support disabled */ > > > > > > + .hw_strip_crc = 0, /**< CRC stripped by hardware */ > > > > > > + }, > > > > > > + .txmode = { > > > > > > + .mq_mode = ETH_MQ_TX_NONE, > > > > > > + }, > > > > > > + .lpbk_mode = 0, > > > > > > + .rx_adv_conf = { > > > > > > + .rss_conf = { > > > > > > + .rss_hf = ETH_RSS_IPV4 | ETH_RSS_IPV6 | > > > > > > + ETH_RSS_IPV4_TCP | > > > ETH_RSS_IPV4_UDP | > > > > > > + ETH_RSS_IPV6_TCP | > > > ETH_RSS_IPV6_UDP, > > > > > > + } > > > > > > + }, > > > > > > +}; > > > > > > + > > > > > > +static const struct rte_eth_rxconf rx_conf_default = { > > > > > > + .rx_thresh = { > > > > > > + .pthresh = RX_PTHRESH, > > > > > > + .hthresh = RX_HTHRESH, > > > > > > + .wthresh = RX_WTHRESH, > > > > > > + }, > > > > > > + .rx_free_thresh = RX_FREE_THRESH, > > > > > > + .rx_drop_en = 0, > > > > > > +}; > > > > > > + > > > > > > +static const struct rte_eth_txconf tx_conf_default = { > > > > > > + .tx_thresh = { > > > > > > + .pthresh = TX_PTHRESH, > > > > > > + .hthresh = TX_HTHRESH, > > > > > > + .wthresh = TX_WTHRESH, > > > > > > + }, > > > > > > + .tx_free_thresh = TX_FREE_THRESH, > > > > > > + .tx_rs_thresh = TX_RSBIT_THRESH, > > > > > > + .txq_flags = TX_Q_FLAGS > > > > > > + > > > > > > +}; > > > > > > + > > > > > > +struct output_buffer { > > > > > > + unsigned count; > > > > > > + struct rte_mbuf *mbufs[BURST_SIZE]; }; > > > > > > + > > > > > > +/* > > > > > > + * Initialises a given port using global settings and with the rx > > > > > > +buffers > > > > > > + * coming from the mbuf_pool passed as parameter */ static > > > > > > +inline int port_init(uint8_t port, struct rte_mempool *mbuf_pool) > > > > > > +{ > > > > > > + struct rte_eth_conf port_conf = port_conf_default; > > > > > > + const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1; > > > > > > + int retval; > > > > > > + uint16_t q; > > > > > > + > > > > > > + if (port >= rte_eth_dev_count()) > > > > > > + return -1; > > > > > > + > > > > > > + retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf); > > > > > > + if (retval != 0) > > > > > > + return retval; > > > > > > + > > > > > > + for (q = 0; q < rxRings; q++) { > > > > > > + retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE, > > > > > > + rte_eth_dev_socket_id(port), > > > > > > + &rx_conf_default, mbuf_pool); > > > > > > + if (retval < 0) > > > > > > + return retval; > > > > > > + } > > > > > > + > > > > > > + for (q = 0; q < txRings; q++) { > > > > > > + retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE, > > > > > > + rte_eth_dev_socket_id(port), > > > > > > + &tx_conf_default); > > > > > > + if (retval < 0) > > > > > > + return retval; > > > > > > + } > > > > > > + > > > > > > + retval = rte_eth_dev_start(port); > > > > > > + if (retval < 0) > > > > > > + return retval; > > > > > > + > > > > > > + struct rte_eth_link link; > > > > > > + rte_eth_link_get_nowait(port, &link); > > > > > > + if (!link.link_status) { > > > > > > + sleep(1); > > > > > > + rte_eth_link_get_nowait(port, &link); > > > > > > + } > > > > > > + > > > > > > + if (!link.link_status) { > > > > > > + printf("Link down on port %"PRIu8"\n", port); > > > > > > + return 0; > > > > > > + } > > > > > > + > > > > > > + struct ether_addr addr; > > > > > > + rte_eth_macaddr_get(port, &addr); > > > > > > + printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 > > > > > > + " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", > > > > > > + (unsigned)port, > > > > > > + addr.addr_bytes[0], addr.addr_bytes[1], > > > > > > + addr.addr_bytes[2], addr.addr_bytes[3], > > > > > > + addr.addr_bytes[4], addr.addr_bytes[5]); > > > > > > + > > > > > > + rte_eth_promiscuous_enable(port); > > > > > > + > > > > > > + return 0; > > > > > > +} > > > > > > + > > > > > > +struct lcore_params { > > > > > > + unsigned worker_id; > > > > > > + struct rte_distributor *d; > > > > > > + struct rte_ring *r; > > > > > > +}; > > > > > > + > > > > > > +static __attribute__((noreturn)) void lcore_rx(struct > > > > > > +lcore_params *p) { > > > > > > + struct rte_distributor *d = p->d; > > > > > > + struct rte_ring *r = p->r; > > > > > > + const uint8_t nb_ports = rte_eth_dev_count(); > > > > > > + const int socket_id = rte_socket_id(); > > > > > > + uint8_t port; > > > > > > + > > > > > > + for (port = 0; port < nb_ports; port++) { > > > > > > + /* skip ports that are not enabled */ > > > > > > + if ((enabled_port_mask & (1 << port)) == 0) > > > > > > + continue; > > > > > > + > > > > > > + if (rte_eth_dev_socket_id(port) > 0 && > > > > > > + rte_eth_dev_socket_id(port) != socket_id) > > > > > > + printf("WARNING, port %u is on remote NUMA node to > > > " > > > > > > + "RX thread.\n\tPerformance will not " > > > > > > + "be optimal.\n", port); > > > > > > + } > > > > > > + > > > > > > + printf("\nCore %u doing packet RX.\n", rte_lcore_id()); > > > > > > + port = 0; > > > > > > + for (;;) { > > > > > > + /* skip ports that are not enabled */ > > > > > > + if ((enabled_port_mask & (1 << port)) == 0) { > > > > > > + if (++port == nb_ports) > > > > > > + port = 0; > > > > > > + continue; > > > > > > + } > > > > > > + struct rte_mbuf *bufs[BURST_SIZE*2]; > > > > > > + const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, > > > > > > + BURST_SIZE); > > > > > > + app_stats.rx.rx_pkts += nb_rx; > > > > > > + > > > > > > + rte_distributor_process(d, bufs, nb_rx); > > > > > > + const uint16_t nb_ret = rte_distributor_returned_pkts(d, > > > > > > + bufs, BURST_SIZE*2); > > > > > > + app_stats.rx.returned_pkts += nb_ret; > > > > > > + if (unlikely(nb_ret == 0)) > > > > > > + continue; > > > > > > + > > > > > > + uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret); > > > > > > + app_stats.rx.enqueued_pkts += sent; > > > > > > + if (unlikely(sent < nb_ret)) { > > > > > > + LOG_DEBUG(DISTRAPP, "%s:Packet loss due to full > > > ring\n", __func__); > > > > > > + while (sent < nb_ret) > > > > > > + rte_pktmbuf_free(bufs[sent++]); > > > > > > + } > > > > > > + if (++port == nb_ports) > > > > > > + port = 0; > > > > > > + } > > > > > > +} > > > > > > + > > > > > > +static inline void > > > > > > +flush_one_port(struct output_buffer *outbuf, uint8_t outp) { > > > > > > + unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs, > > > > > > + outbuf->count); > > > > > > + app_stats.tx.tx_pkts += nb_tx; > > > > > > + > > > > > > + if (unlikely(nb_tx < outbuf->count)) { > > > > > > + LOG_DEBUG(DISTRAPP, "%s:Packet loss with tx_burst\n", > > > __func__); > > > > > > + do { > > > > > > + rte_pktmbuf_free(outbuf->mbufs[nb_tx]); > > > > > > + } while (++nb_tx < outbuf->count); > > > > > > + } > > > > > > + outbuf->count = 0; > > > > > > +} > > > > > > + > > > > > > +static inline void > > > > > > +flush_all_ports(struct output_buffer *tx_buffers, uint8_t > > > > > > +nb_ports) { > > > > > > + uint8_t outp; > > > > > > + for (outp = 0; outp < nb_ports; outp++) { > > > > > > + /* skip ports that are not enabled */ > > > > > > + if ((enabled_port_mask & (1 << outp)) == 0) > > > > > > + continue; > > > > > > + > > > > > > + if (tx_buffers[outp].count == 0) > > > > > > + continue; > > > > > > + > > > > > > + flush_one_port(&tx_buffers[outp], outp); > > > > > > + } > > > > > > +} > > > > > > + > > > > > > +static __attribute__((noreturn)) void lcore_tx(struct rte_ring > > > > > > +*in_r) { > > > > > > + static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS]; > > > > > > + const uint8_t nb_ports = rte_eth_dev_count(); > > > > > > + const int socket_id = rte_socket_id(); > > > > > > + uint8_t port; > > > > > > + > > > > > > + for (port = 0; port < nb_ports; port++) { > > > > > > + /* skip ports that are not enabled */ > > > > > > + if ((enabled_port_mask & (1 << port)) == 0) > > > > > > + continue; > > > > > > + > > > > > > + if (rte_eth_dev_socket_id(port) > 0 && > > > > > > + rte_eth_dev_socket_id(port) != socket_id) > > > > > > + printf("WARNING, port %u is on remote NUMA node to > > > " > > > > > > + "TX thread.\n\tPerformance will not " > > > > > > + "be optimal.\n", port); > > > > > > + } > > > > > > + > > > > > > + printf("\nCore %u doing packet TX.\n", rte_lcore_id()); > > > > > > + for (;;) { > > > > > > + for (port = 0; port < nb_ports; port++) { > > > > > > + /* skip ports that are not enabled */ > > > > > > + if ((enabled_port_mask & (1 << port)) == 0) > > > > > > + continue; > > > > > > + > > > > > > + struct rte_mbuf *bufs[BURST_SIZE]; > > > > > > + const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, > > > > > > + (void *)bufs, BURST_SIZE); > > > > > > + app_stats.tx.dequeue_pkts += nb_rx; > > > > > > + > > > > > > + /* if we get no traffic, flush anything we have */ > > > > > > + if (unlikely(nb_rx == 0)) { > > > > > > + flush_all_ports(tx_buffers, nb_ports); > > > > > > + continue; > > > > > > + } > > > > > > + > > > > > > + /* for traffic we receive, queue it up for transmit */ > > > > > > + uint16_t i; > > > > > > + _mm_prefetch(bufs[0], 0); > > > > > > + _mm_prefetch(bufs[1], 0); > > > > > > + _mm_prefetch(bufs[2], 0); > > > > > > + for (i = 0; i < nb_rx; i++) { > > > > > > + struct output_buffer *outbuf; > > > > > > + uint8_t outp; > > > > > > + _mm_prefetch(bufs[i + 3], 0); > > > > > > + /* workers should update in_port to hold the > > > > > > + * output port value */ > > > > > > + outp = bufs[i]->port; > > > > > > + /* skip ports that are not enabled */ > > > > > > + if ((enabled_port_mask & (1 << outp)) == 0) > > > > > > + continue; > > > > > > + > > > > > > + outbuf = &tx_buffers[outp]; > > > > > > + outbuf->mbufs[outbuf->count++] = bufs[i]; > > > > > > + if (outbuf->count == BURST_SIZE) > > > > > > + flush_one_port(outbuf, outp); > > > > > > + } > > > > > > + } > > > > > > + } > > > > > > +} > > > > > > + > > > > > > + > > > > > > +static __attribute__((noreturn)) void lcore_worker(struct > > > > > > +lcore_params *p) { > > > > > > + struct rte_distributor *d = p->d; > > > > > > + const unsigned id = p->worker_id; > > > > > > + /* for single port, xor_val will be zero so we won't modify the output > > > > > > + * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa > > > > > > + */ > > > > > > + const unsigned xor_val = (rte_eth_dev_count() > 1); > > > > > > + struct rte_mbuf *buf = NULL; > > > > > > + > > > > > > + printf("\nCore %u acting as worker core.\n", rte_lcore_id()); > > > > > > + for (;;) { > > > > > > + buf = rte_distributor_get_pkt(d, id, buf); > > > > > > + buf->port ^= xor_val; > > > > > > + } > > > > > > +} > > > > > > + > > > > > > +static void > > > > > > +int_handler(int sig_num) > > > > > > +{ > > > > > > + struct rte_eth_stats eth_stats; > > > > > > + unsigned i; > > > > > > + > > > > > > + printf("Exiting on signal %d\n", sig_num); > > > > > > + > > > > > > + printf("\nRX thread stats:\n"); > > > > > > + printf(" - Received: %"PRIu64"\n", app_stats.rx.rx_pkts); > > > > > > + printf(" - Processed: %"PRIu64"\n", app_stats.rx.returned_pkts); > > > > > > + printf(" - Enqueued: %"PRIu64"\n", app_stats.rx.enqueued_pkts); > > > > > > + > > > > > > + printf("\nTX thread stats:\n"); > > > > > > + printf(" - Dequeued: %"PRIu64"\n", app_stats.tx.dequeue_pkts); > > > > > > + printf(" - Transmitted: %"PRIu64"\n", app_stats.tx.tx_pkts); > > > > > > + > > > > > > + for (i = 0; i < rte_eth_dev_count(); i++) { > > > > > > + rte_eth_stats_get(i, ð_stats); > > > > > > + printf("\nPort %u stats:\n", i); > > > > > > + printf(" - Pkts in: %"PRIu64"\n", eth_stats.ipackets); > > > > > > + printf(" - Pkts out: %"PRIu64"\n", eth_stats.opackets); > > > > > > + printf(" - In Errs: %"PRIu64"\n", eth_stats.ierrors); > > > > > > + printf(" - Out Errs: %"PRIu64"\n", eth_stats.oerrors); > > > > > > + printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf); > > > > > > + } > > > > > > + exit(0); > > > > > rte_exit here? Also, this is a pretty ungraceful exit strategy as > > > > > all the threads you've created and memory you've allocated are just > > > forgotten here. > > > > > Given that dpdk mempools are shared, this has the potential to leak > > > > > lots of memory if other apps are using the dpdk at the same time > > > > > that you run this. You probably want to use the sigint handler to > > > > > raise a flag to the tx/rx threads to shutdown gracefully, and then free your > > > allocated memory and mempool. > > > > > > > > > > Neil > > > > > > > > > > > > > Unless the different processes are explicitly cooperating as > > > > primary/secondary, the mempools are not shared. I just don't see the > > > > need for this app to do more cleanup on ctrl-c signal, as it's not > > > > intended to be a multiprocess app, and there is little that any > > > > secondary process could do to work with this app, except possibly some > > > > resource monitoring, which would be completely unaffected by it exiting the > > > way it does. > > > > > > > Ah, ok, so we don't use a common shared pool between isolated processes > > > then, thats good. Still though, this is a sample application, I think its lazy > > > programming practice to illustrate to application developers that its generally ok > > > to exit programs without freeing your resources. Its about 20 lines of additional > > > code to change the sigint handler to flag an exit condition, and have all the > > > other threads join on it. > > > > > > Neil > > > > 1)I had sent v5 patch which handles graceful shutdown of rx and tx threads upon SIGINT > I see it and will take a look shortly, thanks. > > > 2)Worker thread graceful shutdown was not handled as of now as it needs some change in lcore_worker logic , which will be done in future enhancements. > Not sure I understand what you mean here. Can you elaborate? > > > 3)Freeing of mempool is also not handled , as the framework support is not available. > Ew, I hadn't noticed that, freeing of mempools seems like something we should > implement. > > > 4)Cleaning of rx/tx queues not done, as it needs some extensive logic which we haven't planned as of now. Will check the possibility of doing it in future enhancements i.e in next version of sample application. > We can't just flush the queues after we shutdown the workers? I presume a queue > flush operation exists, yes? > Neil Other than code hygiene, which does have some value in itself, I can't really see what the practical point of such cleanup would be. If traffic is going through the system, and the process is killed packets will be dropped, whatever we do, as packet reception will stop. If traffic is not going through the system, then there are no packets in flight and therefore no relevant cleanup to be done. [And if the traffic is stopped just before shutting down the app, at a throughput rate of a couple of million packets per second, the app should be flushed of packets within tiny fractions of a second]. So overall, I just don't see complicated jumping through hoops for flushing and cleaning up things being worth the effort. These apps are designed to run in a forever loop, and, in the exception case, when killed the cleanup done by kernel is sufficient. regards, /Bruce
> > > > > > 1)I had sent v5 patch which handles graceful shutdown of rx and tx threads upon SIGINT > > I see it and will take a look shortly, thanks. > > > > > 2)Worker thread graceful shutdown was not handled as of now as it needs some change in lcore_worker logic , which will be done in future enhancements. > > Not sure I understand what you mean here. Can you elaborate? > > > > > 3)Freeing of mempool is also not handled , as the framework support is not available. > > Ew, I hadn't noticed that, freeing of mempools seems like something we should > > implement. > > > > > 4)Cleaning of rx/tx queues not done, as it needs some extensive logic which we haven't planned as of now. Will check the possibility of doing it in future enhancements i.e in next version of sample application. > > We can't just flush the queues after we shutdown the workers? I presume a queue > > flush operation exists, yes? > > Neil > > Other than code hygiene, which does have some value in itself, I can't > really see what the practical point of such cleanup would be. > This is really the only assertion I'm trying to make. I understand this application won't suffer from exiting uncleanly, and that makes the need for preforming cleanup little more than overhead. But that said, hygine is exactly the point I'm driving at here. These are example applications, that presumably people look at when writing their own apps. If you don't do things properly, people looking at your code are less likely to do them as well. Even if it doesn't hurt for you to exit uncleanly, it will hurt someone, and if they look to these examples as a source of best practices, it seems to me that it would be in everyones interest, if best practices were demonstrated. Neil
> -----Original Message----- > From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Bruce Richardson > Sent: Wednesday, October 01, 2014 4:38 PM > To: Neil Horman > Cc: dev@dpdk.org > Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app > > On Wed, Oct 01, 2014 at 10:56:20AM -0400, Neil Horman wrote: > > On Wed, Oct 01, 2014 at 02:47:00PM +0000, Pattan, Reshma wrote: > > > > > > > > > > -----Original Message----- > > > > From: Neil Horman [mailto:nhorman@tuxdriver.com] > > > > Sent: Tuesday, September 30, 2014 2:40 PM > > > > To: Richardson, Bruce > > > > Cc: Pattan, Reshma; dev@dpdk.org > > > > Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app > > > > > > > > On Tue, Sep 30, 2014 at 01:18:28PM +0100, Bruce Richardson wrote: > > > > > On Tue, Sep 30, 2014 at 07:34:45AM -0400, Neil Horman wrote: > > > > > > On Tue, Sep 30, 2014 at 11:39:37AM +0100, reshmapa wrote: > > > > > > > From: Reshma Pattan <reshma.pattan@intel.com> > > > > > > > > > > > > > > A new sample app that shows the usage of the distributor library. > > > > > > > This app works as follows: > > > > > > > > > > > > > > * An RX thread runs which pulls packets from each ethernet port in turn > > > > > > > and passes those packets to worker using a distributor component. > > > > > > > * The workers take the packets in turn, and determine the output port > > > > > > > for those packets using basic l2forwarding doing an xor on the source > > > > > > > port id. > > > > > > > * The RX thread takes the returned packets from the workers and enqueue > > > > > > > those packets into an rte_ring structure. > > > > > > > * A TX thread pulls the packets off the rte_ring structure and then > > > > > > > sends each packet out the output port specified previously by > > > > > > > the worker > > > > > > > * Command-line option support provided only for portmask. > > > > > > > > > > > > > > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com> > > > > > > > Signed-off-by: Reshma Pattan <reshma.pattan@intel.com> > > > > > > > --- > > > > > > > examples/Makefile | 1 + > > > > > > > examples/distributor_app/Makefile | 57 ++++ > > > > > > > examples/distributor_app/main.c | 600 > > > > +++++++++++++++++++++++++++++++++++++ > > > > > > > examples/distributor_app/main.h | 46 +++ > > > > > > > 4 files changed, 704 insertions(+), 0 deletions(-) create mode > > > > > > > 100644 examples/distributor_app/Makefile create mode 100644 > > > > > > > examples/distributor_app/main.c create mode 100644 > > > > > > > examples/distributor_app/main.h > > > > > > > > > > > > > > diff --git a/examples/Makefile b/examples/Makefile index > > > > > > > 6245f83..2ba82b0 100644 > > > > > > > --- a/examples/Makefile > > > > > > > +++ b/examples/Makefile > > > > > > > @@ -66,5 +66,6 @@ DIRS-y += vhost > > > > > > > DIRS-$(CONFIG_RTE_LIBRTE_XEN_DOM0) += vhost_xen DIRS-y += vmdq > > > > > > > DIRS-y += vmdq_dcb > > > > > > > +DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += distributor_app > > > > > > > > > > > > > > include $(RTE_SDK)/mk/rte.extsubdir.mk diff --git > > > > > > > a/examples/distributor_app/Makefile > > > > > > > b/examples/distributor_app/Makefile > > > > > > > new file mode 100644 > > > > > > > index 0000000..6a5bada > > > > > > > --- /dev/null > > > > > > > +++ b/examples/distributor_app/Makefile > > > > > > > @@ -0,0 +1,57 @@ > > > > > > > +# BSD LICENSE > > > > > > > +# > > > > > > > +# Copyright(c) 2010-2014 Intel Corporation. All rights reserved. > > > > > > > +# All rights reserved. > > > > > > > +# > > > > > > > +# Redistribution and use in source and binary forms, with or without > > > > > > > +# modification, are permitted provided that the following conditions > > > > > > > +# are met: > > > > > > > +# > > > > > > > +# * Redistributions of source code must retain the above copyright > > > > > > > +# notice, this list of conditions and the following disclaimer. > > > > > > > +# * Redistributions in binary form must reproduce the above copyright > > > > > > > +# notice, this list of conditions and the following disclaimer in > > > > > > > +# the documentation and/or other materials provided with the > > > > > > > +# distribution. > > > > > > > +# * Neither the name of Intel Corporation nor the names of its > > > > > > > +# contributors may be used to endorse or promote products derived > > > > > > > +# from this software without specific prior written permission. > > > > > > > +# > > > > > > > +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND > > > > CONTRIBUTORS > > > > > > > +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT > > > > NOT > > > > > > > +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND > > > > FITNESS FOR > > > > > > > +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE > > > > COPYRIGHT > > > > > > > +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, > > > > INCIDENTAL, > > > > > > > +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, > > > > BUT NOT > > > > > > > +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; > > > > LOSS OF USE, > > > > > > > +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED > > > > AND ON ANY > > > > > > > +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR > > > > TORT > > > > > > > +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT > > > > OF THE USE > > > > > > > +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH > > > > DAMAGE. > > > > > > > + > > > > > > > +ifeq ($(RTE_SDK),) > > > > > > > +$(error "Please define RTE_SDK environment variable") endif > > > > > > > + > > > > > > > +# Default target, can be overriden by command line or environment > > > > > > > +RTE_TARGET ?= x86_64-native-linuxapp-gcc > > > > > > > + > > > > > > > +include $(RTE_SDK)/mk/rte.vars.mk > > > > > > > + > > > > > > > +# binary name > > > > > > > +APP = distributor_app > > > > > > > + > > > > > > > +# all source are stored in SRCS-y SRCS-y := main.c > > > > > > > + > > > > > > > +CFLAGS += $(WERROR_FLAGS) > > > > > > > + > > > > > > > +# workaround for a gcc bug with noreturn attribute # > > > > > > > +http://gcc.gnu.org/bugzilla/show_bug.cgi?id=12603 > > > > > > > +ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y) CFLAGS_main.o += > > > > > > > +-Wno-return-type endif > > > > > > > + > > > > > > > +EXTRA_CFLAGS += -O3 -Wfatal-errors > > > > > > > + > > > > > > > +include $(RTE_SDK)/mk/rte.extapp.mk > > > > > > > diff --git a/examples/distributor_app/main.c > > > > > > > b/examples/distributor_app/main.c new file mode 100644 index > > > > > > > 0000000..f555d93 > > > > > > > --- /dev/null > > > > > > > +++ b/examples/distributor_app/main.c > > > > > > > @@ -0,0 +1,600 @@ > > > > > > > +/*- > > > > > > > + * BSD LICENSE > > > > > > > + * > > > > > > > + * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. > > > > > > > + * All rights reserved. > > > > > > > + * > > > > > > > + * Redistribution and use in source and binary forms, with or without > > > > > > > + * modification, are permitted provided that the following conditions > > > > > > > + * are met: > > > > > > > + * > > > > > > > + * * Redistributions of source code must retain the above copyright > > > > > > > + * notice, this list of conditions and the following disclaimer. > > > > > > > + * * Redistributions in binary form must reproduce the above copyright > > > > > > > + * notice, this list of conditions and the following disclaimer in > > > > > > > + * the documentation and/or other materials provided with the > > > > > > > + * distribution. > > > > > > > + * * Neither the name of Intel Corporation nor the names of its > > > > > > > + * contributors may be used to endorse or promote products derived > > > > > > > + * from this software without specific prior written permission. > > > > > > > + * > > > > > > > + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND > > > > CONTRIBUTORS > > > > > > > + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, > > > > BUT NOT > > > > > > > + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND > > > > FITNESS FOR > > > > > > > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE > > > > COPYRIGHT > > > > > > > + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, > > > > INCIDENTAL, > > > > > > > + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, > > > > BUT NOT > > > > > > > + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; > > > > LOSS OF USE, > > > > > > > + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED > > > > AND ON ANY > > > > > > > + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR > > > > TORT > > > > > > > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT > > > > OF THE USE > > > > > > > + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH > > > > DAMAGE. > > > > > > > + */ > > > > > > > + > > > > > > > +#include <stdint.h> > > > > > > > +#include <inttypes.h> > > > > > > > +#include <unistd.h> > > > > > > > +#include <signal.h> > > > > > > > +#include <getopt.h> > > > > > > > + > > > > > > > +#include <rte_eal.h> > > > > > > > +#include <rte_ethdev.h> > > > > > > > +#include <rte_cycles.h> > > > > > > > +#include <rte_malloc.h> > > > > > > > +#include <rte_debug.h> > > > > > > > +#include <rte_distributor.h> > > > > > > > + > > > > > > > +#include "main.h" > > > > > > > + > > > > > > > +#define RX_RING_SIZE 256 > > > > > > > +#define RX_FREE_THRESH 32 > > > > > > > +#define RX_PTHRESH 8 > > > > > > > +#define RX_HTHRESH 8 > > > > > > > +#define RX_WTHRESH 0 > > > > > > > + > > > > > > > +#define TX_RING_SIZE 512 > > > > > > > +#define TX_FREE_THRESH 32 > > > > > > > +#define TX_PTHRESH 32 > > > > > > > +#define TX_HTHRESH 0 > > > > > > > +#define TX_WTHRESH 0 > > > > > > > +#define TX_RSBIT_THRESH 32 > > > > > > > +#define TX_Q_FLAGS (ETH_TXQ_FLAGS_NOMULTSEGS | > > > > ETH_TXQ_FLAGS_NOVLANOFFL |\ > > > > > > > + ETH_TXQ_FLAGS_NOXSUMSCTP | ETH_TXQ_FLAGS_NOXSUMUDP | \ > > > > > > > + ETH_TXQ_FLAGS_NOXSUMTCP) > > > > > > > + > > > > > > > +#define NUM_MBUFS ((64*1024)-1) > > > > > > > +#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + > > > > > > > +RTE_PKTMBUF_HEADROOM) #define MBUF_CACHE_SIZE 250 #define > > > > > > > +BURST_SIZE 32 #define RTE_RING_SZ 1024 > > > > > > > + > > > > > > > +/* uncommnet below line to enable debug logs */ > > > > > > > +/* #define DEBUG */ > > > > > > > + > > > > > > > +#ifdef DEBUG > > > > > > > +#define LOG_LEVEL RTE_LOG_DEBUG > > > > > > > +#define LOG_DEBUG(log_type, fmt, args...) do { \ > > > > > > > + RTE_LOG(DEBUG, log_type, fmt, ##args) \ > > > > > > > +} while (0) > > > > > > > +#else > > > > > > > +#define LOG_LEVEL RTE_LOG_INFO > > > > > > > +#define LOG_DEBUG(log_type, fmt, args...) do {} while (0) #endif > > > > > > > + > > > > > > > +#define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1 > > > > > > > + > > > > > > > +/* mask of enabled ports */ > > > > > > > +static uint32_t enabled_port_mask = 0; > > > > > > > + > > > > > > > +static volatile struct app_stats { > > > > > > > + struct { > > > > > > > + uint64_t rx_pkts; > > > > > > > + uint64_t returned_pkts; > > > > > > > + uint64_t enqueued_pkts; > > > > > > > + } rx __rte_cache_aligned; > > > > > > > + > > > > > > > + struct { > > > > > > > + uint64_t dequeue_pkts; > > > > > > > + uint64_t tx_pkts; > > > > > > > + } tx __rte_cache_aligned; > > > > > > > +} app_stats; > > > > > > > + > > > > > > > +static const struct rte_eth_conf port_conf_default = { > > > > > > > + .rxmode = { > > > > > > > + .mq_mode = ETH_MQ_RX_RSS, > > > > > > > + .max_rx_pkt_len = ETHER_MAX_LEN, > > > > > > > + .split_hdr_size = 0, > > > > > > > + .header_split = 0, /**< Header Split disabled */ > > > > > > > + .hw_ip_checksum = 0, /**< IP checksum offload enabled */ > > > > > > > + .hw_vlan_filter = 0, /**< VLAN filtering disabled */ > > > > > > > + .jumbo_frame = 0, /**< Jumbo Frame Support disabled */ > > > > > > > + .hw_strip_crc = 0, /**< CRC stripped by hardware */ > > > > > > > + }, > > > > > > > + .txmode = { > > > > > > > + .mq_mode = ETH_MQ_TX_NONE, > > > > > > > + }, > > > > > > > + .lpbk_mode = 0, > > > > > > > + .rx_adv_conf = { > > > > > > > + .rss_conf = { > > > > > > > + .rss_hf = ETH_RSS_IPV4 | ETH_RSS_IPV6 | > > > > > > > + ETH_RSS_IPV4_TCP | > > > > ETH_RSS_IPV4_UDP | > > > > > > > + ETH_RSS_IPV6_TCP | > > > > ETH_RSS_IPV6_UDP, > > > > > > > + } > > > > > > > + }, > > > > > > > +}; > > > > > > > + > > > > > > > +static const struct rte_eth_rxconf rx_conf_default = { > > > > > > > + .rx_thresh = { > > > > > > > + .pthresh = RX_PTHRESH, > > > > > > > + .hthresh = RX_HTHRESH, > > > > > > > + .wthresh = RX_WTHRESH, > > > > > > > + }, > > > > > > > + .rx_free_thresh = RX_FREE_THRESH, > > > > > > > + .rx_drop_en = 0, > > > > > > > +}; > > > > > > > + > > > > > > > +static const struct rte_eth_txconf tx_conf_default = { > > > > > > > + .tx_thresh = { > > > > > > > + .pthresh = TX_PTHRESH, > > > > > > > + .hthresh = TX_HTHRESH, > > > > > > > + .wthresh = TX_WTHRESH, > > > > > > > + }, > > > > > > > + .tx_free_thresh = TX_FREE_THRESH, > > > > > > > + .tx_rs_thresh = TX_RSBIT_THRESH, > > > > > > > + .txq_flags = TX_Q_FLAGS > > > > > > > + > > > > > > > +}; > > > > > > > + > > > > > > > +struct output_buffer { > > > > > > > + unsigned count; > > > > > > > + struct rte_mbuf *mbufs[BURST_SIZE]; }; > > > > > > > + > > > > > > > +/* > > > > > > > + * Initialises a given port using global settings and with the rx > > > > > > > +buffers > > > > > > > + * coming from the mbuf_pool passed as parameter */ static > > > > > > > +inline int port_init(uint8_t port, struct rte_mempool *mbuf_pool) > > > > > > > +{ > > > > > > > + struct rte_eth_conf port_conf = port_conf_default; > > > > > > > + const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1; > > > > > > > + int retval; > > > > > > > + uint16_t q; > > > > > > > + > > > > > > > + if (port >= rte_eth_dev_count()) > > > > > > > + return -1; > > > > > > > + > > > > > > > + retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf); > > > > > > > + if (retval != 0) > > > > > > > + return retval; > > > > > > > + > > > > > > > + for (q = 0; q < rxRings; q++) { > > > > > > > + retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE, > > > > > > > + rte_eth_dev_socket_id(port), > > > > > > > + &rx_conf_default, mbuf_pool); > > > > > > > + if (retval < 0) > > > > > > > + return retval; > > > > > > > + } > > > > > > > + > > > > > > > + for (q = 0; q < txRings; q++) { > > > > > > > + retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE, > > > > > > > + rte_eth_dev_socket_id(port), > > > > > > > + &tx_conf_default); > > > > > > > + if (retval < 0) > > > > > > > + return retval; > > > > > > > + } > > > > > > > + > > > > > > > + retval = rte_eth_dev_start(port); > > > > > > > + if (retval < 0) > > > > > > > + return retval; > > > > > > > + > > > > > > > + struct rte_eth_link link; > > > > > > > + rte_eth_link_get_nowait(port, &link); > > > > > > > + if (!link.link_status) { > > > > > > > + sleep(1); > > > > > > > + rte_eth_link_get_nowait(port, &link); > > > > > > > + } > > > > > > > + > > > > > > > + if (!link.link_status) { > > > > > > > + printf("Link down on port %"PRIu8"\n", port); > > > > > > > + return 0; > > > > > > > + } > > > > > > > + > > > > > > > + struct ether_addr addr; > > > > > > > + rte_eth_macaddr_get(port, &addr); > > > > > > > + printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 > > > > > > > + " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", > > > > > > > + (unsigned)port, > > > > > > > + addr.addr_bytes[0], addr.addr_bytes[1], > > > > > > > + addr.addr_bytes[2], addr.addr_bytes[3], > > > > > > > + addr.addr_bytes[4], addr.addr_bytes[5]); > > > > > > > + > > > > > > > + rte_eth_promiscuous_enable(port); > > > > > > > + > > > > > > > + return 0; > > > > > > > +} > > > > > > > + > > > > > > > +struct lcore_params { > > > > > > > + unsigned worker_id; > > > > > > > + struct rte_distributor *d; > > > > > > > + struct rte_ring *r; > > > > > > > +}; > > > > > > > + > > > > > > > +static __attribute__((noreturn)) void lcore_rx(struct > > > > > > > +lcore_params *p) { > > > > > > > + struct rte_distributor *d = p->d; > > > > > > > + struct rte_ring *r = p->r; > > > > > > > + const uint8_t nb_ports = rte_eth_dev_count(); > > > > > > > + const int socket_id = rte_socket_id(); > > > > > > > + uint8_t port; > > > > > > > + > > > > > > > + for (port = 0; port < nb_ports; port++) { > > > > > > > + /* skip ports that are not enabled */ > > > > > > > + if ((enabled_port_mask & (1 << port)) == 0) > > > > > > > + continue; > > > > > > > + > > > > > > > + if (rte_eth_dev_socket_id(port) > 0 && > > > > > > > + rte_eth_dev_socket_id(port) != socket_id) > > > > > > > + printf("WARNING, port %u is on remote NUMA node to > > > > " > > > > > > > + "RX thread.\n\tPerformance will not " > > > > > > > + "be optimal.\n", port); > > > > > > > + } > > > > > > > + > > > > > > > + printf("\nCore %u doing packet RX.\n", rte_lcore_id()); > > > > > > > + port = 0; > > > > > > > + for (;;) { > > > > > > > + /* skip ports that are not enabled */ > > > > > > > + if ((enabled_port_mask & (1 << port)) == 0) { > > > > > > > + if (++port == nb_ports) > > > > > > > + port = 0; > > > > > > > + continue; > > > > > > > + } > > > > > > > + struct rte_mbuf *bufs[BURST_SIZE*2]; > > > > > > > + const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, > > > > > > > + BURST_SIZE); > > > > > > > + app_stats.rx.rx_pkts += nb_rx; > > > > > > > + > > > > > > > + rte_distributor_process(d, bufs, nb_rx); > > > > > > > + const uint16_t nb_ret = rte_distributor_returned_pkts(d, > > > > > > > + bufs, BURST_SIZE*2); > > > > > > > + app_stats.rx.returned_pkts += nb_ret; > > > > > > > + if (unlikely(nb_ret == 0)) > > > > > > > + continue; > > > > > > > + > > > > > > > + uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret); > > > > > > > + app_stats.rx.enqueued_pkts += sent; > > > > > > > + if (unlikely(sent < nb_ret)) { > > > > > > > + LOG_DEBUG(DISTRAPP, "%s:Packet loss due to full > > > > ring\n", __func__); > > > > > > > + while (sent < nb_ret) > > > > > > > + rte_pktmbuf_free(bufs[sent++]); > > > > > > > + } > > > > > > > + if (++port == nb_ports) > > > > > > > + port = 0; > > > > > > > + } > > > > > > > +} > > > > > > > + > > > > > > > +static inline void > > > > > > > +flush_one_port(struct output_buffer *outbuf, uint8_t outp) { > > > > > > > + unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs, > > > > > > > + outbuf->count); > > > > > > > + app_stats.tx.tx_pkts += nb_tx; > > > > > > > + > > > > > > > + if (unlikely(nb_tx < outbuf->count)) { > > > > > > > + LOG_DEBUG(DISTRAPP, "%s:Packet loss with tx_burst\n", > > > > __func__); > > > > > > > + do { > > > > > > > + rte_pktmbuf_free(outbuf->mbufs[nb_tx]); > > > > > > > + } while (++nb_tx < outbuf->count); > > > > > > > + } > > > > > > > + outbuf->count = 0; > > > > > > > +} > > > > > > > + > > > > > > > +static inline void > > > > > > > +flush_all_ports(struct output_buffer *tx_buffers, uint8_t > > > > > > > +nb_ports) { > > > > > > > + uint8_t outp; > > > > > > > + for (outp = 0; outp < nb_ports; outp++) { > > > > > > > + /* skip ports that are not enabled */ > > > > > > > + if ((enabled_port_mask & (1 << outp)) == 0) > > > > > > > + continue; > > > > > > > + > > > > > > > + if (tx_buffers[outp].count == 0) > > > > > > > + continue; > > > > > > > + > > > > > > > + flush_one_port(&tx_buffers[outp], outp); > > > > > > > + } > > > > > > > +} > > > > > > > + > > > > > > > +static __attribute__((noreturn)) void lcore_tx(struct rte_ring > > > > > > > +*in_r) { > > > > > > > + static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS]; > > > > > > > + const uint8_t nb_ports = rte_eth_dev_count(); > > > > > > > + const int socket_id = rte_socket_id(); > > > > > > > + uint8_t port; > > > > > > > + > > > > > > > + for (port = 0; port < nb_ports; port++) { > > > > > > > + /* skip ports that are not enabled */ > > > > > > > + if ((enabled_port_mask & (1 << port)) == 0) > > > > > > > + continue; > > > > > > > + > > > > > > > + if (rte_eth_dev_socket_id(port) > 0 && > > > > > > > + rte_eth_dev_socket_id(port) != socket_id) > > > > > > > + printf("WARNING, port %u is on remote NUMA node to > > > > " > > > > > > > + "TX thread.\n\tPerformance will not " > > > > > > > + "be optimal.\n", port); > > > > > > > + } > > > > > > > + > > > > > > > + printf("\nCore %u doing packet TX.\n", rte_lcore_id()); > > > > > > > + for (;;) { > > > > > > > + for (port = 0; port < nb_ports; port++) { > > > > > > > + /* skip ports that are not enabled */ > > > > > > > + if ((enabled_port_mask & (1 << port)) == 0) > > > > > > > + continue; > > > > > > > + > > > > > > > + struct rte_mbuf *bufs[BURST_SIZE]; > > > > > > > + const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, > > > > > > > + (void *)bufs, BURST_SIZE); > > > > > > > + app_stats.tx.dequeue_pkts += nb_rx; > > > > > > > + > > > > > > > + /* if we get no traffic, flush anything we have */ > > > > > > > + if (unlikely(nb_rx == 0)) { > > > > > > > + flush_all_ports(tx_buffers, nb_ports); > > > > > > > + continue; > > > > > > > + } > > > > > > > + > > > > > > > + /* for traffic we receive, queue it up for transmit */ > > > > > > > + uint16_t i; > > > > > > > + _mm_prefetch(bufs[0], 0); > > > > > > > + _mm_prefetch(bufs[1], 0); > > > > > > > + _mm_prefetch(bufs[2], 0); > > > > > > > + for (i = 0; i < nb_rx; i++) { > > > > > > > + struct output_buffer *outbuf; > > > > > > > + uint8_t outp; > > > > > > > + _mm_prefetch(bufs[i + 3], 0); > > > > > > > + /* workers should update in_port to hold the > > > > > > > + * output port value */ > > > > > > > + outp = bufs[i]->port; > > > > > > > + /* skip ports that are not enabled */ > > > > > > > + if ((enabled_port_mask & (1 << outp)) == 0) > > > > > > > + continue; > > > > > > > + > > > > > > > + outbuf = &tx_buffers[outp]; > > > > > > > + outbuf->mbufs[outbuf->count++] = bufs[i]; > > > > > > > + if (outbuf->count == BURST_SIZE) > > > > > > > + flush_one_port(outbuf, outp); > > > > > > > + } > > > > > > > + } > > > > > > > + } > > > > > > > +} > > > > > > > + > > > > > > > + > > > > > > > +static __attribute__((noreturn)) void lcore_worker(struct > > > > > > > +lcore_params *p) { > > > > > > > + struct rte_distributor *d = p->d; > > > > > > > + const unsigned id = p->worker_id; > > > > > > > + /* for single port, xor_val will be zero so we won't modify the output > > > > > > > + * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa > > > > > > > + */ > > > > > > > + const unsigned xor_val = (rte_eth_dev_count() > 1); > > > > > > > + struct rte_mbuf *buf = NULL; > > > > > > > + > > > > > > > + printf("\nCore %u acting as worker core.\n", rte_lcore_id()); > > > > > > > + for (;;) { > > > > > > > + buf = rte_distributor_get_pkt(d, id, buf); > > > > > > > + buf->port ^= xor_val; > > > > > > > + } > > > > > > > +} > > > > > > > + > > > > > > > +static void > > > > > > > +int_handler(int sig_num) > > > > > > > +{ > > > > > > > + struct rte_eth_stats eth_stats; > > > > > > > + unsigned i; > > > > > > > + > > > > > > > + printf("Exiting on signal %d\n", sig_num); > > > > > > > + > > > > > > > + printf("\nRX thread stats:\n"); > > > > > > > + printf(" - Received: %"PRIu64"\n", app_stats.rx.rx_pkts); > > > > > > > + printf(" - Processed: %"PRIu64"\n", app_stats.rx.returned_pkts); > > > > > > > + printf(" - Enqueued: %"PRIu64"\n", app_stats.rx.enqueued_pkts); > > > > > > > + > > > > > > > + printf("\nTX thread stats:\n"); > > > > > > > + printf(" - Dequeued: %"PRIu64"\n", app_stats.tx.dequeue_pkts); > > > > > > > + printf(" - Transmitted: %"PRIu64"\n", app_stats.tx.tx_pkts); > > > > > > > + > > > > > > > + for (i = 0; i < rte_eth_dev_count(); i++) { > > > > > > > + rte_eth_stats_get(i, ð_stats); > > > > > > > + printf("\nPort %u stats:\n", i); > > > > > > > + printf(" - Pkts in: %"PRIu64"\n", eth_stats.ipackets); > > > > > > > + printf(" - Pkts out: %"PRIu64"\n", eth_stats.opackets); > > > > > > > + printf(" - In Errs: %"PRIu64"\n", eth_stats.ierrors); > > > > > > > + printf(" - Out Errs: %"PRIu64"\n", eth_stats.oerrors); > > > > > > > + printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf); > > > > > > > + } > > > > > > > + exit(0); > > > > > > rte_exit here? Also, this is a pretty ungraceful exit strategy as > > > > > > all the threads you've created and memory you've allocated are just > > > > forgotten here. > > > > > > Given that dpdk mempools are shared, this has the potential to leak > > > > > > lots of memory if other apps are using the dpdk at the same time > > > > > > that you run this. You probably want to use the sigint handler to > > > > > > raise a flag to the tx/rx threads to shutdown gracefully, and then free your > > > > allocated memory and mempool. > > > > > > > > > > > > Neil > > > > > > > > > > > > > > > > Unless the different processes are explicitly cooperating as > > > > > primary/secondary, the mempools are not shared. I just don't see the > > > > > need for this app to do more cleanup on ctrl-c signal, as it's not > > > > > intended to be a multiprocess app, and there is little that any > > > > > secondary process could do to work with this app, except possibly some > > > > > resource monitoring, which would be completely unaffected by it exiting the > > > > way it does. > > > > > > > > > Ah, ok, so we don't use a common shared pool between isolated processes > > > > then, thats good. Still though, this is a sample application, I think its lazy > > > > programming practice to illustrate to application developers that its generally ok > > > > to exit programs without freeing your resources. Its about 20 lines of additional > > > > code to change the sigint handler to flag an exit condition, and have all the > > > > other threads join on it. > > > > > > > > Neil > > > > > > 1)I had sent v5 patch which handles graceful shutdown of rx and tx threads upon SIGINT > > I see it and will take a look shortly, thanks. > > > > > 2)Worker thread graceful shutdown was not handled as of now as it needs some change in lcore_worker logic , which will be done > in future enhancements. > > Not sure I understand what you mean here. Can you elaborate? > > > > > 3)Freeing of mempool is also not handled , as the framework support is not available. > > Ew, I hadn't noticed that, freeing of mempools seems like something we should > > implement. > > > > > 4)Cleaning of rx/tx queues not done, as it needs some extensive logic which we haven't planned as of now. Will check the > possibility of doing it in future enhancements i.e in next version of sample application. > > We can't just flush the queues after we shutdown the workers? I presume a queue > > flush operation exists, yes? > > Neil > > Other than code hygiene, which does have some value in itself, I can't > really see what the practical point of such cleanup would be. > > If traffic is going through the system, and the process is killed packets > will be dropped, whatever we do, as packet reception will stop. If traffic > is not going through the system, then there are no packets in flight and > therefore no relevant cleanup to be done. [And if the traffic is stopped > just before shutting down the app, at a throughput rate of a couple of > million packets per second, the app should be flushed of packets within tiny > fractions of a second]. I think that in theory not resetting HW at process termination can cause a problem. Something like that: - DPDK app has HW RX/TX queues active with armed RXDs, but the link is idle (no packets are flying). - DPDK app terminates abnormally. - user deletes DPDK hugepages files. - Hugepage memory that was used by DPDK for RXDs/data buffers are given to other app (or kernel). - Packet arrives -a s HW is still active - it will do a write to the RXD and data-buffer. - Silent memory corruption. Saying that, I don't think we can completely eliminate that problem from user-space code - as not process signals can be handled. >From other side - our whole concept is to move away from custom kernel modules... Though we probably can make it less possible to happen - create a termination handler, that would try to reset all active HW. And make sure it is called atexit and all catchable signals that cause process termination. But I don' t think it is a good idea to duplicate such code in each and every sample app. I think it should be in the librte_eal, and yes - it is a subject of a separate patch/discussion :) Konstantin > So overall, I just don't see complicated jumping through hoops for flushing > and cleaning up things being worth the effort. These apps are designed to > run in a forever loop, and, in the exception case, when killed the cleanup > done by kernel is sufficient. > > regards, > /Bruce
> -----Original Message----- > From: Neil Horman [mailto:nhorman@tuxdriver.com] > Sent: Wednesday, October 1, 2014 5:08 PM > To: Richardson, Bruce > Cc: Pattan, Reshma; dev@dpdk.org > Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app > > > > > > > > > 1)I had sent v5 patch which handles graceful shutdown of rx and tx > > > > threads upon SIGINT > > > I see it and will take a look shortly, thanks. > > > > > > > 2)Worker thread graceful shutdown was not handled as of now as it needs > some change in lcore_worker logic , which will be done in future enhancements. > > > Not sure I understand what you mean here. Can you elaborate? > > > rte_distributor_process which runs as part of rx thread will process incoming packets and checks for any requests for the packets from worker threads . If request is seen, it adds the packet/work to particular workers back log and proceed with processing of next packet. If no request seen the packet index will not be incremented and the while loop which is conditionally based on packet indexing runs in a continuous loop without breaking and rx thread will not proceed with next statement execution until unless rte_distributor_process comes out of while loop. This issue happens only when we enable graceful shutdown logic for both rx/worker threads, as workers threads gets killed and no request seen by rx thread and it stucks. Hence as of now graceful shutdown logic is provided only for rx thread. For worker threads will check what can be done in next enhancements. Thanks, Reshma > > > > 3)Freeing of mempool is also not handled , as the framework support is not > available. > > > Ew, I hadn't noticed that, freeing of mempools seems like something > > > we should implement. > > > > > > > 4)Cleaning of rx/tx queues not done, as it needs some extensive logic > which we haven't planned as of now. Will check the possibility of doing it in > future enhancements i.e in next version of sample application. > > > We can't just flush the queues after we shutdown the workers? I > > > presume a queue flush operation exists, yes? > > > Neil > > > > Other than code hygiene, which does have some value in itself, I can't > > really see what the practical point of such cleanup would be. > > > This is really the only assertion I'm trying to make. I understand this application > won't suffer from exiting uncleanly, and that makes the need for preforming > cleanup little more than overhead. > > But that said, hygine is exactly the point I'm driving at here. These are example > applications, that presumably people look at when writing their own apps. If > you don't do things properly, people looking at your code are less likely to do > them as well. Even if it doesn't hurt for you to exit uncleanly, it will hurt > someone, and if they look to these examples as a source of best practices, it > seems to me that it would be in everyones interest, if best practices were > demonstrated. > > Neil -------------------------------------------------------------- Intel Shannon Limited Registered in Ireland Registered Office: Collinstown Industrial Park, Leixlip, County Kildare Registered Number: 308263 Business address: Dromore House, East Park, Shannon, Co. Clare This e-mail and any attachments may contain confidential material for the sole use of the intended recipient(s). Any review or distribution by others is strictly prohibited. If you are not the intended recipient, please contact the sender and delete all copies.
On Mon, Oct 06, 2014 at 02:16:22PM +0000, Pattan, Reshma wrote: > > > > -----Original Message----- > > From: Neil Horman [mailto:nhorman@tuxdriver.com] > > Sent: Wednesday, October 1, 2014 5:08 PM > > To: Richardson, Bruce > > Cc: Pattan, Reshma; dev@dpdk.org > > Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app > > > > > > > > > > > > 1)I had sent v5 patch which handles graceful shutdown of rx and tx > > > > > threads upon SIGINT > > > > I see it and will take a look shortly, thanks. > > > > > > > > > 2)Worker thread graceful shutdown was not handled as of now as it needs > > some change in lcore_worker logic , which will be done in future enhancements. > > > > Not sure I understand what you mean here. Can you elaborate? > > > > > rte_distributor_process which runs as part of rx thread will process incoming packets and checks for any requests for the packets from worker threads . > If request is seen, it adds the packet/work to particular workers back log and proceed with processing of next packet. > If no request seen the packet index will not be incremented and the while loop which is conditionally based on packet indexing runs in a continuous loop without breaking and rx thread will not proceed with next statement execution until unless rte_distributor_process comes out of while loop. > This issue happens only when we enable graceful shutdown logic for both rx/worker threads, as workers threads gets killed and no request seen by rx thread and it stucks. > Hence as of now graceful shutdown logic is provided only for rx thread. For worker threads will check what can be done in next enhancements. > > Thanks, > Reshma > I see what you're saying, Once you make a call to rte_distributor_get_pkt, you have no way to gracefully shut down use of the rte_distributor_library. Not just this application, but any application. Thats just not sane, and suggests that we integrated the rte_distributor library too soon. I would suggest that you prefix this patch with an update to the rte distributor library to allow rte_distributor_get_pkt and friends to return NULL if the queue is emtpy. Applications should be checking the return value for NULL anyway, and can preform the rte_pause operation. Then update this patch to do a clean exit. To say that "we will check what can be done in the next enhancements" is to say that this won't be addressed again until a paying custmoer gripes about it. Neil > > > > > 3)Freeing of mempool is also not handled , as the framework support is not > > available. > > > > Ew, I hadn't noticed that, freeing of mempools seems like something > > > > we should implement. > > > > > > > > > 4)Cleaning of rx/tx queues not done, as it needs some extensive logic > > which we haven't planned as of now. Will check the possibility of doing it in > > future enhancements i.e in next version of sample application. > > > > We can't just flush the queues after we shutdown the workers? I > > > > presume a queue flush operation exists, yes? > > > > Neil > > > > > > Other than code hygiene, which does have some value in itself, I can't > > > really see what the practical point of such cleanup would be. > > > > > This is really the only assertion I'm trying to make. I understand this application > > won't suffer from exiting uncleanly, and that makes the need for preforming > > cleanup little more than overhead. > > > > But that said, hygine is exactly the point I'm driving at here. These are example > > applications, that presumably people look at when writing their own apps. If > > you don't do things properly, people looking at your code are less likely to do > > them as well. Even if it doesn't hurt for you to exit uncleanly, it will hurt > > someone, and if they look to these examples as a source of best practices, it > > seems to me that it would be in everyones interest, if best practices were > > demonstrated. > > > > Neil > > -------------------------------------------------------------- > Intel Shannon Limited > Registered in Ireland > Registered Office: Collinstown Industrial Park, Leixlip, County Kildare > Registered Number: 308263 > Business address: Dromore House, East Park, Shannon, Co. Clare > > This e-mail and any attachments may contain confidential material for the sole use of the intended recipient(s). Any review or distribution by others is strictly prohibited. If you are not the intended recipient, please contact the sender and delete all copies. > > >
> -----Original Message----- > From: Neil Horman [mailto:nhorman@tuxdriver.com] > Sent: Monday, October 6, 2014 3:45 PM > To: Pattan, Reshma > Cc: dev@dpdk.org; Richardson, Bruce > Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app > > On Mon, Oct 06, 2014 at 02:16:22PM +0000, Pattan, Reshma wrote: > > > > > > > -----Original Message----- > > > From: Neil Horman [mailto:nhorman@tuxdriver.com] > > > Sent: Wednesday, October 1, 2014 5:08 PM > > > To: Richardson, Bruce > > > Cc: Pattan, Reshma; dev@dpdk.org > > > Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app > > > > > > > > > > > > > > > 1)I had sent v5 patch which handles graceful shutdown of rx > > > > > > and tx threads upon SIGINT > > > > > I see it and will take a look shortly, thanks. > > > > > > > > > > > 2)Worker thread graceful shutdown was not handled as of now as > > > > > > it needs > > > some change in lcore_worker logic , which will be done in future > enhancements. > > > > > Not sure I understand what you mean here. Can you elaborate? > > > > > > > rte_distributor_process which runs as part of rx thread will process incoming > packets and checks for any requests for the packets from worker threads . > > If request is seen, it adds the packet/work to particular workers back log and > proceed with processing of next packet. > > If no request seen the packet index will not be incremented and the while loop > which is conditionally based on packet indexing runs in a continuous loop > without breaking and rx thread will not proceed with next statement execution > until unless rte_distributor_process comes out of while loop. > > This issue happens only when we enable graceful shutdown logic for both > rx/worker threads, as workers threads gets killed and no request seen by rx > thread and it stucks. > > Hence as of now graceful shutdown logic is provided only for rx thread. For > worker threads will check what can be done in next enhancements. > > > > Thanks, > > Reshma > > > > I see what you're saying, Once you make a call to rte_distributor_get_pkt, you > have no way to gracefully shut down use of the rte_distributor_library. Not just > this application, but any application. Thats just not sane, and suggests that we > integrated the rte_distributor library too soon. I would suggest that you prefix > this patch with an update to the rte distributor library to allow > rte_distributor_get_pkt and friends to return NULL if the queue is emtpy. > Applications should be checking the return value for NULL anyway, and can > preform the rte_pause operation. Then update this patch to do a clean exit. To > say that "we will check what can be done in the next enhancements" is to say > that this won't be addressed again until a paying custmoer gripes about it. > Neil > Hi Neil, We have rte_distributor_request_pkt and rte_distributor_poll_pkt() in dpdk.org, which can be used together(in place of rte_distributor_get_pkt) to check empty queue condition I believe. So no separate changes needed. Though we replace to rte_distributor_get_pkt with above two, the issue will not be solved as the thread that gets blocked is rx thread but not worker thread. Rx thread gets blocked in rte_distributore_process due to non-availability of requests from worker. How about sending dummy request_pkts upon SIGINT and allow rte_distributore_process to get to completion? Thanks, Reshma > > > > > > 3)Freeing of mempool is also not handled , as the framework > > > > > > support is not > > > available. > > > > > Ew, I hadn't noticed that, freeing of mempools seems like > > > > > something we should implement. > > > > > > > > > > > 4)Cleaning of rx/tx queues not done, as it needs some > > > > > > extensive logic > > > which we haven't planned as of now. Will check the possibility of doing it in > > > future enhancements i.e in next version of sample application. > > > > > We can't just flush the queues after we shutdown the workers? I > > > > > presume a queue flush operation exists, yes? > > > > > Neil > > > > > > > > Other than code hygiene, which does have some value in itself, I > > > > can't really see what the practical point of such cleanup would be. > > > > > > > This is really the only assertion I'm trying to make. I understand > > > this application won't suffer from exiting uncleanly, and that makes > > > the need for preforming cleanup little more than overhead. > > > > > > But that said, hygine is exactly the point I'm driving at here. > > > These are example applications, that presumably people look at when > > > writing their own apps. If you don't do things properly, people > > > looking at your code are less likely to do them as well. Even if it > > > doesn't hurt for you to exit uncleanly, it will hurt someone, and if > > > they look to these examples as a source of best practices, it seems > > > to me that it would be in everyones interest, if best practices were > demonstrated. > > > > > > Neil > > > > -------------------------------------------------------------- > > Intel Shannon Limited > > Registered in Ireland > > Registered Office: Collinstown Industrial Park, Leixlip, County > > Kildare Registered Number: 308263 Business address: Dromore House, > > East Park, Shannon, Co. Clare > > > > This e-mail and any attachments may contain confidential material for the sole > use of the intended recipient(s). Any review or distribution by others is strictly > prohibited. If you are not the intended recipient, please contact the sender and > delete all copies. > > > > > > -------------------------------------------------------------- Intel Shannon Limited Registered in Ireland Registered Office: Collinstown Industrial Park, Leixlip, County Kildare Registered Number: 308263 Business address: Dromore House, East Park, Shannon, Co. Clare This e-mail and any attachments may contain confidential material for the sole use of the intended recipient(s). Any review or distribution by others is strictly prohibited. If you are not the intended recipient, please contact the sender and delete all copies.
On Mon, Oct 06, 2014 at 05:34:13PM +0000, Pattan, Reshma wrote: > > > > -----Original Message----- > > From: Neil Horman [mailto:nhorman@tuxdriver.com] > > Sent: Monday, October 6, 2014 3:45 PM > > To: Pattan, Reshma > > Cc: dev@dpdk.org; Richardson, Bruce > > Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app > > > > On Mon, Oct 06, 2014 at 02:16:22PM +0000, Pattan, Reshma wrote: > > > > > > > > > > -----Original Message----- > > > > From: Neil Horman [mailto:nhorman@tuxdriver.com] > > > > Sent: Wednesday, October 1, 2014 5:08 PM > > > > To: Richardson, Bruce > > > > Cc: Pattan, Reshma; dev@dpdk.org > > > > Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app > > > > > > > > > > > > > > > > > > 1)I had sent v5 patch which handles graceful shutdown of rx > > > > > > > and tx threads upon SIGINT > > > > > > I see it and will take a look shortly, thanks. > > > > > > > > > > > > > 2)Worker thread graceful shutdown was not handled as of now as > > > > > > > it needs > > > > some change in lcore_worker logic , which will be done in future > > enhancements. > > > > > > Not sure I understand what you mean here. Can you elaborate? > > > > > > > > > rte_distributor_process which runs as part of rx thread will process incoming > > packets and checks for any requests for the packets from worker threads . > > > If request is seen, it adds the packet/work to particular workers back log and > > proceed with processing of next packet. > > > If no request seen the packet index will not be incremented and the while loop > > which is conditionally based on packet indexing runs in a continuous loop > > without breaking and rx thread will not proceed with next statement execution > > until unless rte_distributor_process comes out of while loop. > > > This issue happens only when we enable graceful shutdown logic for both > > rx/worker threads, as workers threads gets killed and no request seen by rx > > thread and it stucks. > > > Hence as of now graceful shutdown logic is provided only for rx thread. For > > worker threads will check what can be done in next enhancements. > > > > > > Thanks, > > > Reshma > > > > > > > I see what you're saying, Once you make a call to rte_distributor_get_pkt, you > > have no way to gracefully shut down use of the rte_distributor_library. Not just > > this application, but any application. Thats just not sane, and suggests that we > > integrated the rte_distributor library too soon. I would suggest that you prefix > > this patch with an update to the rte distributor library to allow > > rte_distributor_get_pkt and friends to return NULL if the queue is emtpy. > > Applications should be checking the return value for NULL anyway, and can > > preform the rte_pause operation. Then update this patch to do a clean exit. To > > say that "we will check what can be done in the next enhancements" is to say > > that this won't be addressed again until a paying custmoer gripes about it. > > Neil > > > > Hi Neil, > > We have rte_distributor_request_pkt and rte_distributor_poll_pkt() in dpdk.org, which can be used together(in place of rte_distributor_get_pkt) > to check empty queue condition I believe. So no separate changes needed. > Though we replace to rte_distributor_get_pkt with above two, the issue will not be solved as the thread that gets blocked is rx thread but not worker thread. > Rx thread gets blocked in rte_distributore_process due to non-availability of requests from worker. > How about sending dummy request_pkts upon SIGINT and allow rte_distributore_process to get to completion? > I suppose creating a flagged dummy packet to ensure that the worker threads can exit their spin loops works fine, yes. Neil > Thanks, > Reshma > > > > > > > > 3)Freeing of mempool is also not handled , as the framework > > > > > > > support is not > > > > available. > > > > > > Ew, I hadn't noticed that, freeing of mempools seems like > > > > > > something we should implement. > > > > > > > > > > > > > 4)Cleaning of rx/tx queues not done, as it needs some > > > > > > > extensive logic > > > > which we haven't planned as of now. Will check the possibility of doing it in > > > > future enhancements i.e in next version of sample application. > > > > > > We can't just flush the queues after we shutdown the workers? I > > > > > > presume a queue flush operation exists, yes? > > > > > > Neil > > > > > > > > > > Other than code hygiene, which does have some value in itself, I > > > > > can't really see what the practical point of such cleanup would be. > > > > > > > > > This is really the only assertion I'm trying to make. I understand > > > > this application won't suffer from exiting uncleanly, and that makes > > > > the need for preforming cleanup little more than overhead. > > > > > > > > But that said, hygine is exactly the point I'm driving at here. > > > > These are example applications, that presumably people look at when > > > > writing their own apps. If you don't do things properly, people > > > > looking at your code are less likely to do them as well. Even if it > > > > doesn't hurt for you to exit uncleanly, it will hurt someone, and if > > > > they look to these examples as a source of best practices, it seems > > > > to me that it would be in everyones interest, if best practices were > > demonstrated. > > > > > > > > Neil > > > > > > -------------------------------------------------------------- > > > Intel Shannon Limited > > > Registered in Ireland > > > Registered Office: Collinstown Industrial Park, Leixlip, County > > > Kildare Registered Number: 308263 Business address: Dromore House, > > > East Park, Shannon, Co. Clare > > > > > > This e-mail and any attachments may contain confidential material for the sole > > use of the intended recipient(s). Any review or distribution by others is strictly > > prohibited. If you are not the intended recipient, please contact the sender and > > delete all copies. > > > > > > > > > > -------------------------------------------------------------- > Intel Shannon Limited > Registered in Ireland > Registered Office: Collinstown Industrial Park, Leixlip, County Kildare > Registered Number: 308263 > Business address: Dromore House, East Park, Shannon, Co. Clare > > This e-mail and any attachments may contain confidential material for the sole use of the intended recipient(s). Any review or distribution by others is strictly prohibited. If you are not the intended recipient, please contact the sender and delete all copies. > > >
diff --git a/examples/Makefile b/examples/Makefile index 6245f83..2ba82b0 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -66,5 +66,6 @@ DIRS-y += vhost DIRS-$(CONFIG_RTE_LIBRTE_XEN_DOM0) += vhost_xen DIRS-y += vmdq DIRS-y += vmdq_dcb +DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += distributor_app include $(RTE_SDK)/mk/rte.extsubdir.mk diff --git a/examples/distributor_app/Makefile b/examples/distributor_app/Makefile new file mode 100644 index 0000000..6a5bada --- /dev/null +++ b/examples/distributor_app/Makefile @@ -0,0 +1,57 @@ +# BSD LICENSE +# +# Copyright(c) 2010-2014 Intel Corporation. All rights reserved. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Intel Corporation nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +ifeq ($(RTE_SDK),) +$(error "Please define RTE_SDK environment variable") +endif + +# Default target, can be overriden by command line or environment +RTE_TARGET ?= x86_64-native-linuxapp-gcc + +include $(RTE_SDK)/mk/rte.vars.mk + +# binary name +APP = distributor_app + +# all source are stored in SRCS-y +SRCS-y := main.c + +CFLAGS += $(WERROR_FLAGS) + +# workaround for a gcc bug with noreturn attribute +# http://gcc.gnu.org/bugzilla/show_bug.cgi?id=12603 +ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y) +CFLAGS_main.o += -Wno-return-type +endif + +EXTRA_CFLAGS += -O3 -Wfatal-errors + +include $(RTE_SDK)/mk/rte.extapp.mk diff --git a/examples/distributor_app/main.c b/examples/distributor_app/main.c new file mode 100644 index 0000000..f555d93 --- /dev/null +++ b/examples/distributor_app/main.c @@ -0,0 +1,600 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include <stdint.h> +#include <inttypes.h> +#include <unistd.h> +#include <signal.h> +#include <getopt.h> + +#include <rte_eal.h> +#include <rte_ethdev.h> +#include <rte_cycles.h> +#include <rte_malloc.h> +#include <rte_debug.h> +#include <rte_distributor.h> + +#include "main.h" + +#define RX_RING_SIZE 256 +#define RX_FREE_THRESH 32 +#define RX_PTHRESH 8 +#define RX_HTHRESH 8 +#define RX_WTHRESH 0 + +#define TX_RING_SIZE 512 +#define TX_FREE_THRESH 32 +#define TX_PTHRESH 32 +#define TX_HTHRESH 0 +#define TX_WTHRESH 0 +#define TX_RSBIT_THRESH 32 +#define TX_Q_FLAGS (ETH_TXQ_FLAGS_NOMULTSEGS | ETH_TXQ_FLAGS_NOVLANOFFL |\ + ETH_TXQ_FLAGS_NOXSUMSCTP | ETH_TXQ_FLAGS_NOXSUMUDP | \ + ETH_TXQ_FLAGS_NOXSUMTCP) + +#define NUM_MBUFS ((64*1024)-1) +#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM) +#define MBUF_CACHE_SIZE 250 +#define BURST_SIZE 32 +#define RTE_RING_SZ 1024 + +/* uncommnet below line to enable debug logs */ +/* #define DEBUG */ + +#ifdef DEBUG +#define LOG_LEVEL RTE_LOG_DEBUG +#define LOG_DEBUG(log_type, fmt, args...) do { \ + RTE_LOG(DEBUG, log_type, fmt, ##args) \ +} while (0) +#else +#define LOG_LEVEL RTE_LOG_INFO +#define LOG_DEBUG(log_type, fmt, args...) do {} while (0) +#endif + +#define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1 + +/* mask of enabled ports */ +static uint32_t enabled_port_mask = 0; + +static volatile struct app_stats { + struct { + uint64_t rx_pkts; + uint64_t returned_pkts; + uint64_t enqueued_pkts; + } rx __rte_cache_aligned; + + struct { + uint64_t dequeue_pkts; + uint64_t tx_pkts; + } tx __rte_cache_aligned; +} app_stats; + +static const struct rte_eth_conf port_conf_default = { + .rxmode = { + .mq_mode = ETH_MQ_RX_RSS, + .max_rx_pkt_len = ETHER_MAX_LEN, + .split_hdr_size = 0, + .header_split = 0, /**< Header Split disabled */ + .hw_ip_checksum = 0, /**< IP checksum offload enabled */ + .hw_vlan_filter = 0, /**< VLAN filtering disabled */ + .jumbo_frame = 0, /**< Jumbo Frame Support disabled */ + .hw_strip_crc = 0, /**< CRC stripped by hardware */ + }, + .txmode = { + .mq_mode = ETH_MQ_TX_NONE, + }, + .lpbk_mode = 0, + .rx_adv_conf = { + .rss_conf = { + .rss_hf = ETH_RSS_IPV4 | ETH_RSS_IPV6 | + ETH_RSS_IPV4_TCP | ETH_RSS_IPV4_UDP | + ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP, + } + }, +}; + +static const struct rte_eth_rxconf rx_conf_default = { + .rx_thresh = { + .pthresh = RX_PTHRESH, + .hthresh = RX_HTHRESH, + .wthresh = RX_WTHRESH, + }, + .rx_free_thresh = RX_FREE_THRESH, + .rx_drop_en = 0, +}; + +static const struct rte_eth_txconf tx_conf_default = { + .tx_thresh = { + .pthresh = TX_PTHRESH, + .hthresh = TX_HTHRESH, + .wthresh = TX_WTHRESH, + }, + .tx_free_thresh = TX_FREE_THRESH, + .tx_rs_thresh = TX_RSBIT_THRESH, + .txq_flags = TX_Q_FLAGS + +}; + +struct output_buffer { + unsigned count; + struct rte_mbuf *mbufs[BURST_SIZE]; +}; + +/* + * Initialises a given port using global settings and with the rx buffers + * coming from the mbuf_pool passed as parameter + */ +static inline int +port_init(uint8_t port, struct rte_mempool *mbuf_pool) +{ + struct rte_eth_conf port_conf = port_conf_default; + const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1; + int retval; + uint16_t q; + + if (port >= rte_eth_dev_count()) + return -1; + + retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf); + if (retval != 0) + return retval; + + for (q = 0; q < rxRings; q++) { + retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE, + rte_eth_dev_socket_id(port), + &rx_conf_default, mbuf_pool); + if (retval < 0) + return retval; + } + + for (q = 0; q < txRings; q++) { + retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE, + rte_eth_dev_socket_id(port), + &tx_conf_default); + if (retval < 0) + return retval; + } + + retval = rte_eth_dev_start(port); + if (retval < 0) + return retval; + + struct rte_eth_link link; + rte_eth_link_get_nowait(port, &link); + if (!link.link_status) { + sleep(1); + rte_eth_link_get_nowait(port, &link); + } + + if (!link.link_status) { + printf("Link down on port %"PRIu8"\n", port); + return 0; + } + + struct ether_addr addr; + rte_eth_macaddr_get(port, &addr); + printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 + " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", + (unsigned)port, + addr.addr_bytes[0], addr.addr_bytes[1], + addr.addr_bytes[2], addr.addr_bytes[3], + addr.addr_bytes[4], addr.addr_bytes[5]); + + rte_eth_promiscuous_enable(port); + + return 0; +} + +struct lcore_params { + unsigned worker_id; + struct rte_distributor *d; + struct rte_ring *r; +}; + +static __attribute__((noreturn)) void +lcore_rx(struct lcore_params *p) +{ + struct rte_distributor *d = p->d; + struct rte_ring *r = p->r; + const uint8_t nb_ports = rte_eth_dev_count(); + const int socket_id = rte_socket_id(); + uint8_t port; + + for (port = 0; port < nb_ports; port++) { + /* skip ports that are not enabled */ + if ((enabled_port_mask & (1 << port)) == 0) + continue; + + if (rte_eth_dev_socket_id(port) > 0 && + rte_eth_dev_socket_id(port) != socket_id) + printf("WARNING, port %u is on remote NUMA node to " + "RX thread.\n\tPerformance will not " + "be optimal.\n", port); + } + + printf("\nCore %u doing packet RX.\n", rte_lcore_id()); + port = 0; + for (;;) { + /* skip ports that are not enabled */ + if ((enabled_port_mask & (1 << port)) == 0) { + if (++port == nb_ports) + port = 0; + continue; + } + struct rte_mbuf *bufs[BURST_SIZE*2]; + const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, + BURST_SIZE); + app_stats.rx.rx_pkts += nb_rx; + + rte_distributor_process(d, bufs, nb_rx); + const uint16_t nb_ret = rte_distributor_returned_pkts(d, + bufs, BURST_SIZE*2); + app_stats.rx.returned_pkts += nb_ret; + if (unlikely(nb_ret == 0)) + continue; + + uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret); + app_stats.rx.enqueued_pkts += sent; + if (unlikely(sent < nb_ret)) { + LOG_DEBUG(DISTRAPP, "%s:Packet loss due to full ring\n", __func__); + while (sent < nb_ret) + rte_pktmbuf_free(bufs[sent++]); + } + if (++port == nb_ports) + port = 0; + } +} + +static inline void +flush_one_port(struct output_buffer *outbuf, uint8_t outp) +{ + unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs, + outbuf->count); + app_stats.tx.tx_pkts += nb_tx; + + if (unlikely(nb_tx < outbuf->count)) { + LOG_DEBUG(DISTRAPP, "%s:Packet loss with tx_burst\n", __func__); + do { + rte_pktmbuf_free(outbuf->mbufs[nb_tx]); + } while (++nb_tx < outbuf->count); + } + outbuf->count = 0; +} + +static inline void +flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports) +{ + uint8_t outp; + for (outp = 0; outp < nb_ports; outp++) { + /* skip ports that are not enabled */ + if ((enabled_port_mask & (1 << outp)) == 0) + continue; + + if (tx_buffers[outp].count == 0) + continue; + + flush_one_port(&tx_buffers[outp], outp); + } +} + +static __attribute__((noreturn)) void +lcore_tx(struct rte_ring *in_r) +{ + static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS]; + const uint8_t nb_ports = rte_eth_dev_count(); + const int socket_id = rte_socket_id(); + uint8_t port; + + for (port = 0; port < nb_ports; port++) { + /* skip ports that are not enabled */ + if ((enabled_port_mask & (1 << port)) == 0) + continue; + + if (rte_eth_dev_socket_id(port) > 0 && + rte_eth_dev_socket_id(port) != socket_id) + printf("WARNING, port %u is on remote NUMA node to " + "TX thread.\n\tPerformance will not " + "be optimal.\n", port); + } + + printf("\nCore %u doing packet TX.\n", rte_lcore_id()); + for (;;) { + for (port = 0; port < nb_ports; port++) { + /* skip ports that are not enabled */ + if ((enabled_port_mask & (1 << port)) == 0) + continue; + + struct rte_mbuf *bufs[BURST_SIZE]; + const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, + (void *)bufs, BURST_SIZE); + app_stats.tx.dequeue_pkts += nb_rx; + + /* if we get no traffic, flush anything we have */ + if (unlikely(nb_rx == 0)) { + flush_all_ports(tx_buffers, nb_ports); + continue; + } + + /* for traffic we receive, queue it up for transmit */ + uint16_t i; + _mm_prefetch(bufs[0], 0); + _mm_prefetch(bufs[1], 0); + _mm_prefetch(bufs[2], 0); + for (i = 0; i < nb_rx; i++) { + struct output_buffer *outbuf; + uint8_t outp; + _mm_prefetch(bufs[i + 3], 0); + /* workers should update in_port to hold the + * output port value */ + outp = bufs[i]->port; + /* skip ports that are not enabled */ + if ((enabled_port_mask & (1 << outp)) == 0) + continue; + + outbuf = &tx_buffers[outp]; + outbuf->mbufs[outbuf->count++] = bufs[i]; + if (outbuf->count == BURST_SIZE) + flush_one_port(outbuf, outp); + } + } + } +} + + +static __attribute__((noreturn)) void +lcore_worker(struct lcore_params *p) +{ + struct rte_distributor *d = p->d; + const unsigned id = p->worker_id; + /* for single port, xor_val will be zero so we won't modify the output + * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa + */ + const unsigned xor_val = (rte_eth_dev_count() > 1); + struct rte_mbuf *buf = NULL; + + printf("\nCore %u acting as worker core.\n", rte_lcore_id()); + for (;;) { + buf = rte_distributor_get_pkt(d, id, buf); + buf->port ^= xor_val; + } +} + +static void +int_handler(int sig_num) +{ + struct rte_eth_stats eth_stats; + unsigned i; + + printf("Exiting on signal %d\n", sig_num); + + printf("\nRX thread stats:\n"); + printf(" - Received: %"PRIu64"\n", app_stats.rx.rx_pkts); + printf(" - Processed: %"PRIu64"\n", app_stats.rx.returned_pkts); + printf(" - Enqueued: %"PRIu64"\n", app_stats.rx.enqueued_pkts); + + printf("\nTX thread stats:\n"); + printf(" - Dequeued: %"PRIu64"\n", app_stats.tx.dequeue_pkts); + printf(" - Transmitted: %"PRIu64"\n", app_stats.tx.tx_pkts); + + for (i = 0; i < rte_eth_dev_count(); i++) { + rte_eth_stats_get(i, ð_stats); + printf("\nPort %u stats:\n", i); + printf(" - Pkts in: %"PRIu64"\n", eth_stats.ipackets); + printf(" - Pkts out: %"PRIu64"\n", eth_stats.opackets); + printf(" - In Errs: %"PRIu64"\n", eth_stats.ierrors); + printf(" - Out Errs: %"PRIu64"\n", eth_stats.oerrors); + printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf); + } + exit(0); +} + +/* display usage */ +static void +print_usage(const char *prgname) +{ + printf("%s [EAL options] -- -p PORTMASK\n" + " -p PORTMASK: hexadecimal bitmask of ports to configure\n", + prgname); +} + +static int +parse_portmask(const char *portmask) +{ + char *end = NULL; + unsigned long pm; + + /* parse hexadecimal string */ + pm = strtoul(portmask, &end, 16); + if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0')) + return -1; + + if (pm == 0) + return -1; + + return pm; +} + +/* Parse the argument given in the command line of the application */ +static int +parse_args(int argc, char **argv) +{ + int opt; + char **argvopt; + int option_index; + char *prgname = argv[0]; + static struct option lgopts[] = { + {NULL, 0, 0, 0} + }; + + argvopt = argv; + + while ((opt = getopt_long(argc, argvopt, "p:", + lgopts, &option_index)) != EOF) { + + switch (opt) { + /* portmask */ + case 'p': + enabled_port_mask = parse_portmask(optarg); + if (enabled_port_mask == 0) { + printf("invalid portmask\n"); + print_usage(prgname); + return -1; + } + break; + + default: + print_usage(prgname); + return -1; + } + } + +if (optind <= 1) { + print_usage(prgname); + return -1; +} + + argv[optind-1] = prgname; + + optind = 0; /* reset getopt lib */ + return 0; +} + +/* Main function, does initialization and calls the per-lcore functions */ +int +MAIN(int argc, char *argv[]) +{ + struct rte_mempool *mbuf_pool; + struct rte_distributor *d; + struct rte_ring *output_ring; + unsigned lcore_id, worker_id = 0; + unsigned nb_ports; + uint8_t portid; + uint8_t nb_ports_available; + + /* catch ctrl-c so we can print on exit */ + signal(SIGINT, int_handler); + + /* init EAL */ + int ret = rte_eal_init(argc, argv); + if (ret < 0) + rte_exit(EXIT_FAILURE, "Error with EAL initialization\n"); + argc -= ret; + argv += ret; + + /* parse application arguments (after the EAL ones) */ + ret = parse_args(argc, argv); + if (ret < 0) + rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n"); + + if (rte_lcore_count() < 3) + rte_exit(EXIT_FAILURE, "Error, This application needs at " + "least 3 logical cores to run:\n" + "1 lcore for packet RX and distribution\n" + "1 lcore for packet TX\n" + "and at least 1 lcore for worker threads\n"); + + if (rte_eal_pci_probe() != 0) + rte_exit(EXIT_FAILURE, "Error with PCI probing\n"); + + nb_ports = rte_eth_dev_count(); + if (nb_ports == 0) + rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n"); + if (nb_ports != 1 && (nb_ports & 1)) + rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except " + "when using a single port\n"); + + mbuf_pool = rte_mempool_create("MBUF_POOL", NUM_MBUFS * nb_ports, + MBUF_SIZE, MBUF_CACHE_SIZE, + sizeof(struct rte_pktmbuf_pool_private), + rte_pktmbuf_pool_init, NULL, + rte_pktmbuf_init, NULL, + rte_socket_id(), 0); + if (mbuf_pool == NULL) + rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n"); + + nb_ports_available = nb_ports; + + /* initialize all ports */ + for (portid = 0; portid < nb_ports; portid++) { + /* skip ports that are not enabled */ + if ((enabled_port_mask & (1 << portid)) == 0) { + printf("\nSkipping disabled port %d\n", portid); + nb_ports_available--; + continue; + } + /* init port */ + printf("Initializing port %u... done\n", (unsigned) portid); + + if (port_init(portid, mbuf_pool) != 0) + rte_exit(EXIT_FAILURE, "Cannot initialize port %"PRIu8"\n", + portid); + } + + if (!nb_ports_available) { + rte_exit(EXIT_FAILURE, + "All available ports are disabled. Please set portmask.\n"); + } + + d = rte_distributor_create("PKT_DIST", rte_socket_id(), + rte_lcore_count() - 2); + if (d == NULL) + rte_exit(EXIT_FAILURE, "Cannot create distributor\n"); + + /* scheduler ring is read only by the transmitter core, but written to + * by multiple threads + */ + output_ring = rte_ring_create("Output_ring", RTE_RING_SZ, + rte_socket_id(), RING_F_SC_DEQ); + if (output_ring == NULL) + rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); + + RTE_LCORE_FOREACH_SLAVE(lcore_id) { + if (worker_id == rte_lcore_count() - 2) + rte_eal_remote_launch((lcore_function_t *)lcore_tx, + output_ring, lcore_id); + else { + struct lcore_params *p = + rte_malloc(NULL, sizeof(*p), 0); + if (!p) + rte_panic("malloc failure\n"); + *p = (struct lcore_params){worker_id, d, output_ring}; + rte_eal_remote_launch((lcore_function_t *)lcore_worker, + p, lcore_id); + } + worker_id++; + } + /* call lcore_main on master core only */ + struct lcore_params p = { 0, d, output_ring }; + lcore_rx(&p); + return 0; +} diff --git a/examples/distributor_app/main.h b/examples/distributor_app/main.h new file mode 100644 index 0000000..2682d15 --- /dev/null +++ b/examples/distributor_app/main.h @@ -0,0 +1,46 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _MAIN_H_ +#define _MAIN_H_ + + +#ifdef RTE_EXEC_ENV_BAREMETAL +#define MAIN _main +#else +#define MAIN main +#endif + +int MAIN(int argc, char *argv[]); + +#endif /* ifndef _MAIN_H_ */