[dpdk-dev,v3] distributor_app: new sample app

Message ID 1412073577-12248-1-git-send-email-reshma.pattan@intel.com (mailing list archive)
State Superseded, archived
Headers

Commit Message

Pattan, Reshma Sept. 30, 2014, 10:39 a.m. UTC
From: Reshma Pattan <reshma.pattan@intel.com>

A new sample app that shows the usage of the distributor library. This
app works as follows:

* An RX thread runs which pulls packets from each ethernet port in turn
  and passes those packets to worker using a distributor component.
* The workers take the packets in turn, and determine the output port
  for those packets using basic l2forwarding doing an xor on the source
  port id.
* The RX thread takes the returned packets from the workers and enqueue
  those packets into an rte_ring structure.
* A TX thread pulls the packets off the rte_ring structure and then
  sends each packet out the output port specified previously by the worker
* Command-line option support provided only for portmask.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
Signed-off-by: Reshma Pattan    <reshma.pattan@intel.com>
---
 examples/Makefile                 |    1 +
 examples/distributor_app/Makefile |   57 ++++
 examples/distributor_app/main.c   |  600 +++++++++++++++++++++++++++++++++++++
 examples/distributor_app/main.h   |   46 +++
 4 files changed, 704 insertions(+), 0 deletions(-)
 create mode 100644 examples/distributor_app/Makefile
 create mode 100644 examples/distributor_app/main.c
 create mode 100644 examples/distributor_app/main.h
  

Comments

Neil Horman Sept. 30, 2014, 11:34 a.m. UTC | #1
On Tue, Sep 30, 2014 at 11:39:37AM +0100, reshmapa wrote:
> From: Reshma Pattan <reshma.pattan@intel.com>
> 
> A new sample app that shows the usage of the distributor library. This
> app works as follows:
> 
> * An RX thread runs which pulls packets from each ethernet port in turn
>   and passes those packets to worker using a distributor component.
> * The workers take the packets in turn, and determine the output port
>   for those packets using basic l2forwarding doing an xor on the source
>   port id.
> * The RX thread takes the returned packets from the workers and enqueue
>   those packets into an rte_ring structure.
> * A TX thread pulls the packets off the rte_ring structure and then
>   sends each packet out the output port specified previously by the worker
> * Command-line option support provided only for portmask.
> 
> Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> Signed-off-by: Reshma Pattan    <reshma.pattan@intel.com>
> ---
>  examples/Makefile                 |    1 +
>  examples/distributor_app/Makefile |   57 ++++
>  examples/distributor_app/main.c   |  600 +++++++++++++++++++++++++++++++++++++
>  examples/distributor_app/main.h   |   46 +++
>  4 files changed, 704 insertions(+), 0 deletions(-)
>  create mode 100644 examples/distributor_app/Makefile
>  create mode 100644 examples/distributor_app/main.c
>  create mode 100644 examples/distributor_app/main.h
> 
> diff --git a/examples/Makefile b/examples/Makefile
> index 6245f83..2ba82b0 100644
> --- a/examples/Makefile
> +++ b/examples/Makefile
> @@ -66,5 +66,6 @@ DIRS-y += vhost
>  DIRS-$(CONFIG_RTE_LIBRTE_XEN_DOM0) += vhost_xen
>  DIRS-y += vmdq
>  DIRS-y += vmdq_dcb
> +DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += distributor_app
>  
>  include $(RTE_SDK)/mk/rte.extsubdir.mk
> diff --git a/examples/distributor_app/Makefile b/examples/distributor_app/Makefile
> new file mode 100644
> index 0000000..6a5bada
> --- /dev/null
> +++ b/examples/distributor_app/Makefile
> @@ -0,0 +1,57 @@
> +#   BSD LICENSE
> +#
> +#   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> +#   All rights reserved.
> +#
> +#   Redistribution and use in source and binary forms, with or without
> +#   modification, are permitted provided that the following conditions
> +#   are met:
> +#
> +#     * Redistributions of source code must retain the above copyright
> +#       notice, this list of conditions and the following disclaimer.
> +#     * Redistributions in binary form must reproduce the above copyright
> +#       notice, this list of conditions and the following disclaimer in
> +#       the documentation and/or other materials provided with the
> +#       distribution.
> +#     * Neither the name of Intel Corporation nor the names of its
> +#       contributors may be used to endorse or promote products derived
> +#       from this software without specific prior written permission.
> +#
> +#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> +#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> +#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> +#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> +#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> +#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> +#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> +#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> +#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> +#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> +#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> +
> +ifeq ($(RTE_SDK),)
> +$(error "Please define RTE_SDK environment variable")
> +endif
> +
> +# Default target, can be overriden by command line or environment
> +RTE_TARGET ?= x86_64-native-linuxapp-gcc
> +
> +include $(RTE_SDK)/mk/rte.vars.mk
> +
> +# binary name
> +APP = distributor_app
> +
> +# all source are stored in SRCS-y
> +SRCS-y := main.c
> +
> +CFLAGS += $(WERROR_FLAGS)
> +
> +# workaround for a gcc bug with noreturn attribute
> +# http://gcc.gnu.org/bugzilla/show_bug.cgi?id=12603
> +ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y)
> +CFLAGS_main.o += -Wno-return-type
> +endif
> +
> +EXTRA_CFLAGS += -O3 -Wfatal-errors
> +
> +include $(RTE_SDK)/mk/rte.extapp.mk
> diff --git a/examples/distributor_app/main.c b/examples/distributor_app/main.c
> new file mode 100644
> index 0000000..f555d93
> --- /dev/null
> +++ b/examples/distributor_app/main.c
> @@ -0,0 +1,600 @@
> +/*-
> + *   BSD LICENSE
> + *
> + *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> + *   All rights reserved.
> + *
> + *   Redistribution and use in source and binary forms, with or without
> + *   modification, are permitted provided that the following conditions
> + *   are met:
> + *
> + *     * Redistributions of source code must retain the above copyright
> + *       notice, this list of conditions and the following disclaimer.
> + *     * Redistributions in binary form must reproduce the above copyright
> + *       notice, this list of conditions and the following disclaimer in
> + *       the documentation and/or other materials provided with the
> + *       distribution.
> + *     * Neither the name of Intel Corporation nor the names of its
> + *       contributors may be used to endorse or promote products derived
> + *       from this software without specific prior written permission.
> + *
> + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> + */
> +
> +#include <stdint.h>
> +#include <inttypes.h>
> +#include <unistd.h>
> +#include <signal.h>
> +#include <getopt.h>
> +
> +#include <rte_eal.h>
> +#include <rte_ethdev.h>
> +#include <rte_cycles.h>
> +#include <rte_malloc.h>
> +#include <rte_debug.h>
> +#include <rte_distributor.h>
> +
> +#include "main.h"
> +
> +#define RX_RING_SIZE 256
> +#define RX_FREE_THRESH 32
> +#define RX_PTHRESH 8
> +#define RX_HTHRESH 8
> +#define RX_WTHRESH 0
> +
> +#define TX_RING_SIZE 512
> +#define TX_FREE_THRESH 32
> +#define TX_PTHRESH 32
> +#define TX_HTHRESH 0
> +#define TX_WTHRESH 0
> +#define TX_RSBIT_THRESH 32
> +#define TX_Q_FLAGS (ETH_TXQ_FLAGS_NOMULTSEGS | ETH_TXQ_FLAGS_NOVLANOFFL |\
> +	ETH_TXQ_FLAGS_NOXSUMSCTP | ETH_TXQ_FLAGS_NOXSUMUDP | \
> +	ETH_TXQ_FLAGS_NOXSUMTCP)
> +
> +#define NUM_MBUFS ((64*1024)-1)
> +#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
> +#define MBUF_CACHE_SIZE 250
> +#define BURST_SIZE 32
> +#define RTE_RING_SZ 1024
> +
> +/* uncommnet below line to enable debug logs */
> +/* #define DEBUG */
> +
> +#ifdef DEBUG
> +#define LOG_LEVEL RTE_LOG_DEBUG
> +#define LOG_DEBUG(log_type, fmt, args...) do {	\
> +	RTE_LOG(DEBUG, log_type, fmt, ##args)		\
> +} while (0)
> +#else
> +#define LOG_LEVEL RTE_LOG_INFO
> +#define LOG_DEBUG(log_type, fmt, args...) do {} while (0)
> +#endif
> +
> +#define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
> +
> +/* mask of enabled ports */
> +static uint32_t enabled_port_mask = 0;
> +
> +static volatile struct app_stats {
> +	struct {
> +		uint64_t rx_pkts;
> +		uint64_t returned_pkts;
> +		uint64_t enqueued_pkts;
> +	} rx __rte_cache_aligned;
> +
> +	struct {
> +		uint64_t dequeue_pkts;
> +		uint64_t tx_pkts;
> +	} tx __rte_cache_aligned;
> +} app_stats;
> +
> +static const struct rte_eth_conf port_conf_default = {
> +	.rxmode = {
> +		.mq_mode = ETH_MQ_RX_RSS,
> +		.max_rx_pkt_len = ETHER_MAX_LEN,
> +		.split_hdr_size = 0,
> +		.header_split   = 0, /**< Header Split disabled */
> +		.hw_ip_checksum = 0, /**< IP checksum offload enabled */
> +		.hw_vlan_filter = 0, /**< VLAN filtering disabled */
> +		.jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
> +		.hw_strip_crc   = 0, /**< CRC stripped by hardware */
> +	},
> +	.txmode = {
> +		.mq_mode = ETH_MQ_TX_NONE,
> +	},
> +	.lpbk_mode = 0,
> +	.rx_adv_conf = {
> +			.rss_conf = {
> +				.rss_hf = ETH_RSS_IPV4 | ETH_RSS_IPV6 |
> +					ETH_RSS_IPV4_TCP | ETH_RSS_IPV4_UDP |
> +					ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
> +			}
> +	},
> +};
> +
> +static const struct rte_eth_rxconf rx_conf_default = {
> +	.rx_thresh = {
> +		.pthresh = RX_PTHRESH,
> +		.hthresh = RX_HTHRESH,
> +		.wthresh = RX_WTHRESH,
> +	},
> +	.rx_free_thresh = RX_FREE_THRESH,
> +	.rx_drop_en = 0,
> +};
> +
> +static const struct rte_eth_txconf tx_conf_default = {
> +	.tx_thresh = {
> +		.pthresh = TX_PTHRESH,
> +		.hthresh = TX_HTHRESH,
> +		.wthresh = TX_WTHRESH,
> +	},
> +	.tx_free_thresh = TX_FREE_THRESH,
> +	.tx_rs_thresh = TX_RSBIT_THRESH,
> +	.txq_flags = TX_Q_FLAGS
> +
> +};
> +
> +struct output_buffer {
> +	unsigned count;
> +	struct rte_mbuf *mbufs[BURST_SIZE];
> +};
> +
> +/*
> + * Initialises a given port using global settings and with the rx buffers
> + * coming from the mbuf_pool passed as parameter
> + */
> +static inline int
> +port_init(uint8_t port, struct rte_mempool *mbuf_pool)
> +{
> +	struct rte_eth_conf port_conf = port_conf_default;
> +	const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
> +	int retval;
> +	uint16_t q;
> +
> +	if (port >= rte_eth_dev_count())
> +		return -1;
> +
> +	retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf);
> +	if (retval != 0)
> +		return retval;
> +
> +	for (q = 0; q < rxRings; q++) {
> +		retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
> +						rte_eth_dev_socket_id(port),
> +						&rx_conf_default, mbuf_pool);
> +		if (retval < 0)
> +			return retval;
> +	}
> +
> +	for (q = 0; q < txRings; q++) {
> +		retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE,
> +						rte_eth_dev_socket_id(port),
> +						&tx_conf_default);
> +		if (retval < 0)
> +			return retval;
> +	}
> +
> +	retval  = rte_eth_dev_start(port);
> +	if (retval < 0)
> +		return retval;
> +
> +	struct rte_eth_link link;
> +	rte_eth_link_get_nowait(port, &link);
> +	if (!link.link_status) {
> +		sleep(1);
> +		rte_eth_link_get_nowait(port, &link);
> +	}
> +
> +	if (!link.link_status) {
> +		printf("Link down on port %"PRIu8"\n", port);
> +		return 0;
> +	}
> +
> +	struct ether_addr addr;
> +	rte_eth_macaddr_get(port, &addr);
> +	printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
> +			" %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
> +			(unsigned)port,
> +			addr.addr_bytes[0], addr.addr_bytes[1],
> +			addr.addr_bytes[2], addr.addr_bytes[3],
> +			addr.addr_bytes[4], addr.addr_bytes[5]);
> +
> +	rte_eth_promiscuous_enable(port);
> +
> +	return 0;
> +}
> +
> +struct lcore_params {
> +	unsigned worker_id;
> +	struct rte_distributor *d;
> +	struct rte_ring *r;
> +};
> +
> +static __attribute__((noreturn)) void
> +lcore_rx(struct lcore_params *p)
> +{
> +	struct rte_distributor *d = p->d;
> +	struct rte_ring *r = p->r;
> +	const uint8_t nb_ports = rte_eth_dev_count();
> +	const int socket_id = rte_socket_id();
> +	uint8_t port;
> +
> +	for (port = 0; port < nb_ports; port++) {
> +		/* skip ports that are not enabled */
> +		if ((enabled_port_mask & (1 << port)) == 0)
> +			continue;
> +
> +		if (rte_eth_dev_socket_id(port) > 0 &&
> +				rte_eth_dev_socket_id(port) != socket_id)
> +			printf("WARNING, port %u is on remote NUMA node to "
> +					"RX thread.\n\tPerformance will not "
> +					"be optimal.\n", port);
> +	}
> +
> +	printf("\nCore %u doing packet RX.\n", rte_lcore_id());
> +	port = 0;
> +	for (;;) {
> +		/* skip ports that are not enabled */
> +		if ((enabled_port_mask & (1 << port)) == 0) {
> +			if (++port == nb_ports)
> +				port = 0;
> +			continue;
> +		}
> +		struct rte_mbuf *bufs[BURST_SIZE*2];
> +		const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
> +				BURST_SIZE);
> +		app_stats.rx.rx_pkts += nb_rx;
> +
> +		rte_distributor_process(d, bufs, nb_rx);
> +		const uint16_t nb_ret = rte_distributor_returned_pkts(d,
> +				bufs, BURST_SIZE*2);
> +		app_stats.rx.returned_pkts += nb_ret;
> +		if (unlikely(nb_ret == 0))
> +			continue;
> +
> +		uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret);
> +		app_stats.rx.enqueued_pkts += sent;
> +		if (unlikely(sent < nb_ret)) {
> +			LOG_DEBUG(DISTRAPP, "%s:Packet loss due to full ring\n", __func__);
> +			while (sent < nb_ret)
> +				rte_pktmbuf_free(bufs[sent++]);
> +		}
> +		if (++port == nb_ports)
> +			port = 0;
> +	}
> +}
> +
> +static inline void
> +flush_one_port(struct output_buffer *outbuf, uint8_t outp)
> +{
> +	unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs,
> +			outbuf->count);
> +	app_stats.tx.tx_pkts += nb_tx;
> +
> +	if (unlikely(nb_tx < outbuf->count)) {
> +		LOG_DEBUG(DISTRAPP, "%s:Packet loss with tx_burst\n", __func__);
> +		do {
> +			rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
> +		} while (++nb_tx < outbuf->count);
> +	}
> +	outbuf->count = 0;
> +}
> +
> +static inline void
> +flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports)
> +{
> +	uint8_t outp;
> +	for (outp = 0; outp < nb_ports; outp++) {
> +		/* skip ports that are not enabled */
> +		if ((enabled_port_mask & (1 << outp)) == 0)
> +			continue;
> +
> +		if (tx_buffers[outp].count == 0)
> +			continue;
> +
> +		flush_one_port(&tx_buffers[outp], outp);
> +	}
> +}
> +
> +static __attribute__((noreturn)) void
> +lcore_tx(struct rte_ring *in_r)
> +{
> +	static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS];
> +	const uint8_t nb_ports = rte_eth_dev_count();
> +	const int socket_id = rte_socket_id();
> +	uint8_t port;
> +
> +	for (port = 0; port < nb_ports; port++) {
> +		/* skip ports that are not enabled */
> +		if ((enabled_port_mask & (1 << port)) == 0)
> +			continue;
> +
> +		if (rte_eth_dev_socket_id(port) > 0 &&
> +				rte_eth_dev_socket_id(port) != socket_id)
> +			printf("WARNING, port %u is on remote NUMA node to "
> +					"TX thread.\n\tPerformance will not "
> +					"be optimal.\n", port);
> +	}
> +
> +	printf("\nCore %u doing packet TX.\n", rte_lcore_id());
> +	for (;;) {
> +		for (port = 0; port < nb_ports; port++) {
> +			/* skip ports that are not enabled */
> +			if ((enabled_port_mask & (1 << port)) == 0)
> +				continue;
> +
> +			struct rte_mbuf *bufs[BURST_SIZE];
> +			const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
> +					(void *)bufs, BURST_SIZE);
> +			app_stats.tx.dequeue_pkts += nb_rx;
> +
> +			/* if we get no traffic, flush anything we have */
> +			if (unlikely(nb_rx == 0)) {
> +				flush_all_ports(tx_buffers, nb_ports);
> +				continue;
> +			}
> +
> +			/* for traffic we receive, queue it up for transmit */
> +			uint16_t i;
> +			_mm_prefetch(bufs[0], 0);
> +			_mm_prefetch(bufs[1], 0);
> +			_mm_prefetch(bufs[2], 0);
> +			for (i = 0; i < nb_rx; i++) {
> +				struct output_buffer *outbuf;
> +				uint8_t outp;
> +				_mm_prefetch(bufs[i + 3], 0);
> +				/* workers should update in_port to hold the
> +				 * output port value */
> +				outp = bufs[i]->port;
> +				/* skip ports that are not enabled */
> +				if ((enabled_port_mask & (1 << outp)) == 0)
> +					continue;
> +
> +				outbuf = &tx_buffers[outp];
> +				outbuf->mbufs[outbuf->count++] = bufs[i];
> +				if (outbuf->count == BURST_SIZE)
> +					flush_one_port(outbuf, outp);
> +			}
> +		}
> +	}
> +}
> +
> +
> +static __attribute__((noreturn)) void
> +lcore_worker(struct lcore_params *p)
> +{
> +	struct rte_distributor *d = p->d;
> +	const unsigned id = p->worker_id;
> +	/* for single port, xor_val will be zero so we won't modify the output
> +	 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa
> +	 */
> +	const unsigned xor_val = (rte_eth_dev_count() > 1);
> +	struct rte_mbuf *buf = NULL;
> +
> +	printf("\nCore %u acting as worker core.\n", rte_lcore_id());
> +	for (;;) {
> +		buf = rte_distributor_get_pkt(d, id, buf);
> +		buf->port ^= xor_val;
> +	}
> +}
> +
> +static void
> +int_handler(int sig_num)
> +{
> +	struct rte_eth_stats eth_stats;
> +	unsigned i;
> +
> +	printf("Exiting on signal %d\n", sig_num);
> +
> +	printf("\nRX thread stats:\n");
> +	printf(" - Received:    %"PRIu64"\n", app_stats.rx.rx_pkts);
> +	printf(" - Processed:   %"PRIu64"\n", app_stats.rx.returned_pkts);
> +	printf(" - Enqueued:    %"PRIu64"\n", app_stats.rx.enqueued_pkts);
> +
> +	printf("\nTX thread stats:\n");
> +	printf(" - Dequeued:    %"PRIu64"\n", app_stats.tx.dequeue_pkts);
> +	printf(" - Transmitted: %"PRIu64"\n", app_stats.tx.tx_pkts);
> +
> +	for (i = 0; i < rte_eth_dev_count(); i++) {
> +		rte_eth_stats_get(i, &eth_stats);
> +		printf("\nPort %u stats:\n", i);
> +		printf(" - Pkts in:   %"PRIu64"\n", eth_stats.ipackets);
> +		printf(" - Pkts out:  %"PRIu64"\n", eth_stats.opackets);
> +		printf(" - In Errs:   %"PRIu64"\n", eth_stats.ierrors);
> +		printf(" - Out Errs:  %"PRIu64"\n", eth_stats.oerrors);
> +		printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf);
> +	}
> +	exit(0);
rte_exit here?  Also, this is a pretty ungraceful exit strategy as all the
threads you've created and memory you've allocated are just forgotten here.
Given that dpdk mempools are shared, this has the potential to leak lots of
memory if other apps are using the dpdk at the same time that you run this.  You
probably want to use the sigint handler to raise a flag to the tx/rx threads to
shutdown gracefully, and then free your allocated memory and mempool.

Neil
  
Bruce Richardson Sept. 30, 2014, 12:18 p.m. UTC | #2
On Tue, Sep 30, 2014 at 07:34:45AM -0400, Neil Horman wrote:
> On Tue, Sep 30, 2014 at 11:39:37AM +0100, reshmapa wrote:
> > From: Reshma Pattan <reshma.pattan@intel.com>
> > 
> > A new sample app that shows the usage of the distributor library. This
> > app works as follows:
> > 
> > * An RX thread runs which pulls packets from each ethernet port in turn
> >   and passes those packets to worker using a distributor component.
> > * The workers take the packets in turn, and determine the output port
> >   for those packets using basic l2forwarding doing an xor on the source
> >   port id.
> > * The RX thread takes the returned packets from the workers and enqueue
> >   those packets into an rte_ring structure.
> > * A TX thread pulls the packets off the rte_ring structure and then
> >   sends each packet out the output port specified previously by the worker
> > * Command-line option support provided only for portmask.
> > 
> > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> > Signed-off-by: Reshma Pattan    <reshma.pattan@intel.com>
> > ---
> >  examples/Makefile                 |    1 +
> >  examples/distributor_app/Makefile |   57 ++++
> >  examples/distributor_app/main.c   |  600 +++++++++++++++++++++++++++++++++++++
> >  examples/distributor_app/main.h   |   46 +++
> >  4 files changed, 704 insertions(+), 0 deletions(-)
> >  create mode 100644 examples/distributor_app/Makefile
> >  create mode 100644 examples/distributor_app/main.c
> >  create mode 100644 examples/distributor_app/main.h
> > 
> > diff --git a/examples/Makefile b/examples/Makefile
> > index 6245f83..2ba82b0 100644
> > --- a/examples/Makefile
> > +++ b/examples/Makefile
> > @@ -66,5 +66,6 @@ DIRS-y += vhost
> >  DIRS-$(CONFIG_RTE_LIBRTE_XEN_DOM0) += vhost_xen
> >  DIRS-y += vmdq
> >  DIRS-y += vmdq_dcb
> > +DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += distributor_app
> >  
> >  include $(RTE_SDK)/mk/rte.extsubdir.mk
> > diff --git a/examples/distributor_app/Makefile b/examples/distributor_app/Makefile
> > new file mode 100644
> > index 0000000..6a5bada
> > --- /dev/null
> > +++ b/examples/distributor_app/Makefile
> > @@ -0,0 +1,57 @@
> > +#   BSD LICENSE
> > +#
> > +#   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> > +#   All rights reserved.
> > +#
> > +#   Redistribution and use in source and binary forms, with or without
> > +#   modification, are permitted provided that the following conditions
> > +#   are met:
> > +#
> > +#     * Redistributions of source code must retain the above copyright
> > +#       notice, this list of conditions and the following disclaimer.
> > +#     * Redistributions in binary form must reproduce the above copyright
> > +#       notice, this list of conditions and the following disclaimer in
> > +#       the documentation and/or other materials provided with the
> > +#       distribution.
> > +#     * Neither the name of Intel Corporation nor the names of its
> > +#       contributors may be used to endorse or promote products derived
> > +#       from this software without specific prior written permission.
> > +#
> > +#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> > +#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> > +#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> > +#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> > +#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> > +#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> > +#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> > +#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> > +#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> > +#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> > +#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> > +
> > +ifeq ($(RTE_SDK),)
> > +$(error "Please define RTE_SDK environment variable")
> > +endif
> > +
> > +# Default target, can be overriden by command line or environment
> > +RTE_TARGET ?= x86_64-native-linuxapp-gcc
> > +
> > +include $(RTE_SDK)/mk/rte.vars.mk
> > +
> > +# binary name
> > +APP = distributor_app
> > +
> > +# all source are stored in SRCS-y
> > +SRCS-y := main.c
> > +
> > +CFLAGS += $(WERROR_FLAGS)
> > +
> > +# workaround for a gcc bug with noreturn attribute
> > +# http://gcc.gnu.org/bugzilla/show_bug.cgi?id=12603
> > +ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y)
> > +CFLAGS_main.o += -Wno-return-type
> > +endif
> > +
> > +EXTRA_CFLAGS += -O3 -Wfatal-errors
> > +
> > +include $(RTE_SDK)/mk/rte.extapp.mk
> > diff --git a/examples/distributor_app/main.c b/examples/distributor_app/main.c
> > new file mode 100644
> > index 0000000..f555d93
> > --- /dev/null
> > +++ b/examples/distributor_app/main.c
> > @@ -0,0 +1,600 @@
> > +/*-
> > + *   BSD LICENSE
> > + *
> > + *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> > + *   All rights reserved.
> > + *
> > + *   Redistribution and use in source and binary forms, with or without
> > + *   modification, are permitted provided that the following conditions
> > + *   are met:
> > + *
> > + *     * Redistributions of source code must retain the above copyright
> > + *       notice, this list of conditions and the following disclaimer.
> > + *     * Redistributions in binary form must reproduce the above copyright
> > + *       notice, this list of conditions and the following disclaimer in
> > + *       the documentation and/or other materials provided with the
> > + *       distribution.
> > + *     * Neither the name of Intel Corporation nor the names of its
> > + *       contributors may be used to endorse or promote products derived
> > + *       from this software without specific prior written permission.
> > + *
> > + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> > + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> > + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> > + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> > + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> > + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> > + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> > + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> > + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> > + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> > + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> > + */
> > +
> > +#include <stdint.h>
> > +#include <inttypes.h>
> > +#include <unistd.h>
> > +#include <signal.h>
> > +#include <getopt.h>
> > +
> > +#include <rte_eal.h>
> > +#include <rte_ethdev.h>
> > +#include <rte_cycles.h>
> > +#include <rte_malloc.h>
> > +#include <rte_debug.h>
> > +#include <rte_distributor.h>
> > +
> > +#include "main.h"
> > +
> > +#define RX_RING_SIZE 256
> > +#define RX_FREE_THRESH 32
> > +#define RX_PTHRESH 8
> > +#define RX_HTHRESH 8
> > +#define RX_WTHRESH 0
> > +
> > +#define TX_RING_SIZE 512
> > +#define TX_FREE_THRESH 32
> > +#define TX_PTHRESH 32
> > +#define TX_HTHRESH 0
> > +#define TX_WTHRESH 0
> > +#define TX_RSBIT_THRESH 32
> > +#define TX_Q_FLAGS (ETH_TXQ_FLAGS_NOMULTSEGS | ETH_TXQ_FLAGS_NOVLANOFFL |\
> > +	ETH_TXQ_FLAGS_NOXSUMSCTP | ETH_TXQ_FLAGS_NOXSUMUDP | \
> > +	ETH_TXQ_FLAGS_NOXSUMTCP)
> > +
> > +#define NUM_MBUFS ((64*1024)-1)
> > +#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
> > +#define MBUF_CACHE_SIZE 250
> > +#define BURST_SIZE 32
> > +#define RTE_RING_SZ 1024
> > +
> > +/* uncommnet below line to enable debug logs */
> > +/* #define DEBUG */
> > +
> > +#ifdef DEBUG
> > +#define LOG_LEVEL RTE_LOG_DEBUG
> > +#define LOG_DEBUG(log_type, fmt, args...) do {	\
> > +	RTE_LOG(DEBUG, log_type, fmt, ##args)		\
> > +} while (0)
> > +#else
> > +#define LOG_LEVEL RTE_LOG_INFO
> > +#define LOG_DEBUG(log_type, fmt, args...) do {} while (0)
> > +#endif
> > +
> > +#define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
> > +
> > +/* mask of enabled ports */
> > +static uint32_t enabled_port_mask = 0;
> > +
> > +static volatile struct app_stats {
> > +	struct {
> > +		uint64_t rx_pkts;
> > +		uint64_t returned_pkts;
> > +		uint64_t enqueued_pkts;
> > +	} rx __rte_cache_aligned;
> > +
> > +	struct {
> > +		uint64_t dequeue_pkts;
> > +		uint64_t tx_pkts;
> > +	} tx __rte_cache_aligned;
> > +} app_stats;
> > +
> > +static const struct rte_eth_conf port_conf_default = {
> > +	.rxmode = {
> > +		.mq_mode = ETH_MQ_RX_RSS,
> > +		.max_rx_pkt_len = ETHER_MAX_LEN,
> > +		.split_hdr_size = 0,
> > +		.header_split   = 0, /**< Header Split disabled */
> > +		.hw_ip_checksum = 0, /**< IP checksum offload enabled */
> > +		.hw_vlan_filter = 0, /**< VLAN filtering disabled */
> > +		.jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
> > +		.hw_strip_crc   = 0, /**< CRC stripped by hardware */
> > +	},
> > +	.txmode = {
> > +		.mq_mode = ETH_MQ_TX_NONE,
> > +	},
> > +	.lpbk_mode = 0,
> > +	.rx_adv_conf = {
> > +			.rss_conf = {
> > +				.rss_hf = ETH_RSS_IPV4 | ETH_RSS_IPV6 |
> > +					ETH_RSS_IPV4_TCP | ETH_RSS_IPV4_UDP |
> > +					ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
> > +			}
> > +	},
> > +};
> > +
> > +static const struct rte_eth_rxconf rx_conf_default = {
> > +	.rx_thresh = {
> > +		.pthresh = RX_PTHRESH,
> > +		.hthresh = RX_HTHRESH,
> > +		.wthresh = RX_WTHRESH,
> > +	},
> > +	.rx_free_thresh = RX_FREE_THRESH,
> > +	.rx_drop_en = 0,
> > +};
> > +
> > +static const struct rte_eth_txconf tx_conf_default = {
> > +	.tx_thresh = {
> > +		.pthresh = TX_PTHRESH,
> > +		.hthresh = TX_HTHRESH,
> > +		.wthresh = TX_WTHRESH,
> > +	},
> > +	.tx_free_thresh = TX_FREE_THRESH,
> > +	.tx_rs_thresh = TX_RSBIT_THRESH,
> > +	.txq_flags = TX_Q_FLAGS
> > +
> > +};
> > +
> > +struct output_buffer {
> > +	unsigned count;
> > +	struct rte_mbuf *mbufs[BURST_SIZE];
> > +};
> > +
> > +/*
> > + * Initialises a given port using global settings and with the rx buffers
> > + * coming from the mbuf_pool passed as parameter
> > + */
> > +static inline int
> > +port_init(uint8_t port, struct rte_mempool *mbuf_pool)
> > +{
> > +	struct rte_eth_conf port_conf = port_conf_default;
> > +	const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
> > +	int retval;
> > +	uint16_t q;
> > +
> > +	if (port >= rte_eth_dev_count())
> > +		return -1;
> > +
> > +	retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf);
> > +	if (retval != 0)
> > +		return retval;
> > +
> > +	for (q = 0; q < rxRings; q++) {
> > +		retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
> > +						rte_eth_dev_socket_id(port),
> > +						&rx_conf_default, mbuf_pool);
> > +		if (retval < 0)
> > +			return retval;
> > +	}
> > +
> > +	for (q = 0; q < txRings; q++) {
> > +		retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE,
> > +						rte_eth_dev_socket_id(port),
> > +						&tx_conf_default);
> > +		if (retval < 0)
> > +			return retval;
> > +	}
> > +
> > +	retval  = rte_eth_dev_start(port);
> > +	if (retval < 0)
> > +		return retval;
> > +
> > +	struct rte_eth_link link;
> > +	rte_eth_link_get_nowait(port, &link);
> > +	if (!link.link_status) {
> > +		sleep(1);
> > +		rte_eth_link_get_nowait(port, &link);
> > +	}
> > +
> > +	if (!link.link_status) {
> > +		printf("Link down on port %"PRIu8"\n", port);
> > +		return 0;
> > +	}
> > +
> > +	struct ether_addr addr;
> > +	rte_eth_macaddr_get(port, &addr);
> > +	printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
> > +			" %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
> > +			(unsigned)port,
> > +			addr.addr_bytes[0], addr.addr_bytes[1],
> > +			addr.addr_bytes[2], addr.addr_bytes[3],
> > +			addr.addr_bytes[4], addr.addr_bytes[5]);
> > +
> > +	rte_eth_promiscuous_enable(port);
> > +
> > +	return 0;
> > +}
> > +
> > +struct lcore_params {
> > +	unsigned worker_id;
> > +	struct rte_distributor *d;
> > +	struct rte_ring *r;
> > +};
> > +
> > +static __attribute__((noreturn)) void
> > +lcore_rx(struct lcore_params *p)
> > +{
> > +	struct rte_distributor *d = p->d;
> > +	struct rte_ring *r = p->r;
> > +	const uint8_t nb_ports = rte_eth_dev_count();
> > +	const int socket_id = rte_socket_id();
> > +	uint8_t port;
> > +
> > +	for (port = 0; port < nb_ports; port++) {
> > +		/* skip ports that are not enabled */
> > +		if ((enabled_port_mask & (1 << port)) == 0)
> > +			continue;
> > +
> > +		if (rte_eth_dev_socket_id(port) > 0 &&
> > +				rte_eth_dev_socket_id(port) != socket_id)
> > +			printf("WARNING, port %u is on remote NUMA node to "
> > +					"RX thread.\n\tPerformance will not "
> > +					"be optimal.\n", port);
> > +	}
> > +
> > +	printf("\nCore %u doing packet RX.\n", rte_lcore_id());
> > +	port = 0;
> > +	for (;;) {
> > +		/* skip ports that are not enabled */
> > +		if ((enabled_port_mask & (1 << port)) == 0) {
> > +			if (++port == nb_ports)
> > +				port = 0;
> > +			continue;
> > +		}
> > +		struct rte_mbuf *bufs[BURST_SIZE*2];
> > +		const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
> > +				BURST_SIZE);
> > +		app_stats.rx.rx_pkts += nb_rx;
> > +
> > +		rte_distributor_process(d, bufs, nb_rx);
> > +		const uint16_t nb_ret = rte_distributor_returned_pkts(d,
> > +				bufs, BURST_SIZE*2);
> > +		app_stats.rx.returned_pkts += nb_ret;
> > +		if (unlikely(nb_ret == 0))
> > +			continue;
> > +
> > +		uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret);
> > +		app_stats.rx.enqueued_pkts += sent;
> > +		if (unlikely(sent < nb_ret)) {
> > +			LOG_DEBUG(DISTRAPP, "%s:Packet loss due to full ring\n", __func__);
> > +			while (sent < nb_ret)
> > +				rte_pktmbuf_free(bufs[sent++]);
> > +		}
> > +		if (++port == nb_ports)
> > +			port = 0;
> > +	}
> > +}
> > +
> > +static inline void
> > +flush_one_port(struct output_buffer *outbuf, uint8_t outp)
> > +{
> > +	unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs,
> > +			outbuf->count);
> > +	app_stats.tx.tx_pkts += nb_tx;
> > +
> > +	if (unlikely(nb_tx < outbuf->count)) {
> > +		LOG_DEBUG(DISTRAPP, "%s:Packet loss with tx_burst\n", __func__);
> > +		do {
> > +			rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
> > +		} while (++nb_tx < outbuf->count);
> > +	}
> > +	outbuf->count = 0;
> > +}
> > +
> > +static inline void
> > +flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports)
> > +{
> > +	uint8_t outp;
> > +	for (outp = 0; outp < nb_ports; outp++) {
> > +		/* skip ports that are not enabled */
> > +		if ((enabled_port_mask & (1 << outp)) == 0)
> > +			continue;
> > +
> > +		if (tx_buffers[outp].count == 0)
> > +			continue;
> > +
> > +		flush_one_port(&tx_buffers[outp], outp);
> > +	}
> > +}
> > +
> > +static __attribute__((noreturn)) void
> > +lcore_tx(struct rte_ring *in_r)
> > +{
> > +	static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS];
> > +	const uint8_t nb_ports = rte_eth_dev_count();
> > +	const int socket_id = rte_socket_id();
> > +	uint8_t port;
> > +
> > +	for (port = 0; port < nb_ports; port++) {
> > +		/* skip ports that are not enabled */
> > +		if ((enabled_port_mask & (1 << port)) == 0)
> > +			continue;
> > +
> > +		if (rte_eth_dev_socket_id(port) > 0 &&
> > +				rte_eth_dev_socket_id(port) != socket_id)
> > +			printf("WARNING, port %u is on remote NUMA node to "
> > +					"TX thread.\n\tPerformance will not "
> > +					"be optimal.\n", port);
> > +	}
> > +
> > +	printf("\nCore %u doing packet TX.\n", rte_lcore_id());
> > +	for (;;) {
> > +		for (port = 0; port < nb_ports; port++) {
> > +			/* skip ports that are not enabled */
> > +			if ((enabled_port_mask & (1 << port)) == 0)
> > +				continue;
> > +
> > +			struct rte_mbuf *bufs[BURST_SIZE];
> > +			const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
> > +					(void *)bufs, BURST_SIZE);
> > +			app_stats.tx.dequeue_pkts += nb_rx;
> > +
> > +			/* if we get no traffic, flush anything we have */
> > +			if (unlikely(nb_rx == 0)) {
> > +				flush_all_ports(tx_buffers, nb_ports);
> > +				continue;
> > +			}
> > +
> > +			/* for traffic we receive, queue it up for transmit */
> > +			uint16_t i;
> > +			_mm_prefetch(bufs[0], 0);
> > +			_mm_prefetch(bufs[1], 0);
> > +			_mm_prefetch(bufs[2], 0);
> > +			for (i = 0; i < nb_rx; i++) {
> > +				struct output_buffer *outbuf;
> > +				uint8_t outp;
> > +				_mm_prefetch(bufs[i + 3], 0);
> > +				/* workers should update in_port to hold the
> > +				 * output port value */
> > +				outp = bufs[i]->port;
> > +				/* skip ports that are not enabled */
> > +				if ((enabled_port_mask & (1 << outp)) == 0)
> > +					continue;
> > +
> > +				outbuf = &tx_buffers[outp];
> > +				outbuf->mbufs[outbuf->count++] = bufs[i];
> > +				if (outbuf->count == BURST_SIZE)
> > +					flush_one_port(outbuf, outp);
> > +			}
> > +		}
> > +	}
> > +}
> > +
> > +
> > +static __attribute__((noreturn)) void
> > +lcore_worker(struct lcore_params *p)
> > +{
> > +	struct rte_distributor *d = p->d;
> > +	const unsigned id = p->worker_id;
> > +	/* for single port, xor_val will be zero so we won't modify the output
> > +	 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa
> > +	 */
> > +	const unsigned xor_val = (rte_eth_dev_count() > 1);
> > +	struct rte_mbuf *buf = NULL;
> > +
> > +	printf("\nCore %u acting as worker core.\n", rte_lcore_id());
> > +	for (;;) {
> > +		buf = rte_distributor_get_pkt(d, id, buf);
> > +		buf->port ^= xor_val;
> > +	}
> > +}
> > +
> > +static void
> > +int_handler(int sig_num)
> > +{
> > +	struct rte_eth_stats eth_stats;
> > +	unsigned i;
> > +
> > +	printf("Exiting on signal %d\n", sig_num);
> > +
> > +	printf("\nRX thread stats:\n");
> > +	printf(" - Received:    %"PRIu64"\n", app_stats.rx.rx_pkts);
> > +	printf(" - Processed:   %"PRIu64"\n", app_stats.rx.returned_pkts);
> > +	printf(" - Enqueued:    %"PRIu64"\n", app_stats.rx.enqueued_pkts);
> > +
> > +	printf("\nTX thread stats:\n");
> > +	printf(" - Dequeued:    %"PRIu64"\n", app_stats.tx.dequeue_pkts);
> > +	printf(" - Transmitted: %"PRIu64"\n", app_stats.tx.tx_pkts);
> > +
> > +	for (i = 0; i < rte_eth_dev_count(); i++) {
> > +		rte_eth_stats_get(i, &eth_stats);
> > +		printf("\nPort %u stats:\n", i);
> > +		printf(" - Pkts in:   %"PRIu64"\n", eth_stats.ipackets);
> > +		printf(" - Pkts out:  %"PRIu64"\n", eth_stats.opackets);
> > +		printf(" - In Errs:   %"PRIu64"\n", eth_stats.ierrors);
> > +		printf(" - Out Errs:  %"PRIu64"\n", eth_stats.oerrors);
> > +		printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf);
> > +	}
> > +	exit(0);
> rte_exit here?  Also, this is a pretty ungraceful exit strategy as all the
> threads you've created and memory you've allocated are just forgotten here.
> Given that dpdk mempools are shared, this has the potential to leak lots of
> memory if other apps are using the dpdk at the same time that you run this.  You
> probably want to use the sigint handler to raise a flag to the tx/rx threads to
> shutdown gracefully, and then free your allocated memory and mempool.
> 
> Neil
>

Unless the different processes are explicitly cooperating as 
primary/secondary, the mempools are not shared. I just don't see the need 
for this app to do more cleanup on ctrl-c signal, as it's not intended to be 
a multiprocess app, and there is little that any secondary process could do 
to work with this app, except possibly some resource monitoring, which would 
be completely unaffected by it exiting the way it does.

/Bruce
  
Neil Horman Sept. 30, 2014, 1:39 p.m. UTC | #3
On Tue, Sep 30, 2014 at 01:18:28PM +0100, Bruce Richardson wrote:
> On Tue, Sep 30, 2014 at 07:34:45AM -0400, Neil Horman wrote:
> > On Tue, Sep 30, 2014 at 11:39:37AM +0100, reshmapa wrote:
> > > From: Reshma Pattan <reshma.pattan@intel.com>
> > > 
> > > A new sample app that shows the usage of the distributor library. This
> > > app works as follows:
> > > 
> > > * An RX thread runs which pulls packets from each ethernet port in turn
> > >   and passes those packets to worker using a distributor component.
> > > * The workers take the packets in turn, and determine the output port
> > >   for those packets using basic l2forwarding doing an xor on the source
> > >   port id.
> > > * The RX thread takes the returned packets from the workers and enqueue
> > >   those packets into an rte_ring structure.
> > > * A TX thread pulls the packets off the rte_ring structure and then
> > >   sends each packet out the output port specified previously by the worker
> > > * Command-line option support provided only for portmask.
> > > 
> > > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> > > Signed-off-by: Reshma Pattan    <reshma.pattan@intel.com>
> > > ---
> > >  examples/Makefile                 |    1 +
> > >  examples/distributor_app/Makefile |   57 ++++
> > >  examples/distributor_app/main.c   |  600 +++++++++++++++++++++++++++++++++++++
> > >  examples/distributor_app/main.h   |   46 +++
> > >  4 files changed, 704 insertions(+), 0 deletions(-)
> > >  create mode 100644 examples/distributor_app/Makefile
> > >  create mode 100644 examples/distributor_app/main.c
> > >  create mode 100644 examples/distributor_app/main.h
> > > 
> > > diff --git a/examples/Makefile b/examples/Makefile
> > > index 6245f83..2ba82b0 100644
> > > --- a/examples/Makefile
> > > +++ b/examples/Makefile
> > > @@ -66,5 +66,6 @@ DIRS-y += vhost
> > >  DIRS-$(CONFIG_RTE_LIBRTE_XEN_DOM0) += vhost_xen
> > >  DIRS-y += vmdq
> > >  DIRS-y += vmdq_dcb
> > > +DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += distributor_app
> > >  
> > >  include $(RTE_SDK)/mk/rte.extsubdir.mk
> > > diff --git a/examples/distributor_app/Makefile b/examples/distributor_app/Makefile
> > > new file mode 100644
> > > index 0000000..6a5bada
> > > --- /dev/null
> > > +++ b/examples/distributor_app/Makefile
> > > @@ -0,0 +1,57 @@
> > > +#   BSD LICENSE
> > > +#
> > > +#   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> > > +#   All rights reserved.
> > > +#
> > > +#   Redistribution and use in source and binary forms, with or without
> > > +#   modification, are permitted provided that the following conditions
> > > +#   are met:
> > > +#
> > > +#     * Redistributions of source code must retain the above copyright
> > > +#       notice, this list of conditions and the following disclaimer.
> > > +#     * Redistributions in binary form must reproduce the above copyright
> > > +#       notice, this list of conditions and the following disclaimer in
> > > +#       the documentation and/or other materials provided with the
> > > +#       distribution.
> > > +#     * Neither the name of Intel Corporation nor the names of its
> > > +#       contributors may be used to endorse or promote products derived
> > > +#       from this software without specific prior written permission.
> > > +#
> > > +#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> > > +#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> > > +#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> > > +#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> > > +#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> > > +#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> > > +#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> > > +#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> > > +#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> > > +#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> > > +#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> > > +
> > > +ifeq ($(RTE_SDK),)
> > > +$(error "Please define RTE_SDK environment variable")
> > > +endif
> > > +
> > > +# Default target, can be overriden by command line or environment
> > > +RTE_TARGET ?= x86_64-native-linuxapp-gcc
> > > +
> > > +include $(RTE_SDK)/mk/rte.vars.mk
> > > +
> > > +# binary name
> > > +APP = distributor_app
> > > +
> > > +# all source are stored in SRCS-y
> > > +SRCS-y := main.c
> > > +
> > > +CFLAGS += $(WERROR_FLAGS)
> > > +
> > > +# workaround for a gcc bug with noreturn attribute
> > > +# http://gcc.gnu.org/bugzilla/show_bug.cgi?id=12603
> > > +ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y)
> > > +CFLAGS_main.o += -Wno-return-type
> > > +endif
> > > +
> > > +EXTRA_CFLAGS += -O3 -Wfatal-errors
> > > +
> > > +include $(RTE_SDK)/mk/rte.extapp.mk
> > > diff --git a/examples/distributor_app/main.c b/examples/distributor_app/main.c
> > > new file mode 100644
> > > index 0000000..f555d93
> > > --- /dev/null
> > > +++ b/examples/distributor_app/main.c
> > > @@ -0,0 +1,600 @@
> > > +/*-
> > > + *   BSD LICENSE
> > > + *
> > > + *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> > > + *   All rights reserved.
> > > + *
> > > + *   Redistribution and use in source and binary forms, with or without
> > > + *   modification, are permitted provided that the following conditions
> > > + *   are met:
> > > + *
> > > + *     * Redistributions of source code must retain the above copyright
> > > + *       notice, this list of conditions and the following disclaimer.
> > > + *     * Redistributions in binary form must reproduce the above copyright
> > > + *       notice, this list of conditions and the following disclaimer in
> > > + *       the documentation and/or other materials provided with the
> > > + *       distribution.
> > > + *     * Neither the name of Intel Corporation nor the names of its
> > > + *       contributors may be used to endorse or promote products derived
> > > + *       from this software without specific prior written permission.
> > > + *
> > > + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> > > + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> > > + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> > > + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> > > + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> > > + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> > > + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> > > + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> > > + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> > > + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> > > + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> > > + */
> > > +
> > > +#include <stdint.h>
> > > +#include <inttypes.h>
> > > +#include <unistd.h>
> > > +#include <signal.h>
> > > +#include <getopt.h>
> > > +
> > > +#include <rte_eal.h>
> > > +#include <rte_ethdev.h>
> > > +#include <rte_cycles.h>
> > > +#include <rte_malloc.h>
> > > +#include <rte_debug.h>
> > > +#include <rte_distributor.h>
> > > +
> > > +#include "main.h"
> > > +
> > > +#define RX_RING_SIZE 256
> > > +#define RX_FREE_THRESH 32
> > > +#define RX_PTHRESH 8
> > > +#define RX_HTHRESH 8
> > > +#define RX_WTHRESH 0
> > > +
> > > +#define TX_RING_SIZE 512
> > > +#define TX_FREE_THRESH 32
> > > +#define TX_PTHRESH 32
> > > +#define TX_HTHRESH 0
> > > +#define TX_WTHRESH 0
> > > +#define TX_RSBIT_THRESH 32
> > > +#define TX_Q_FLAGS (ETH_TXQ_FLAGS_NOMULTSEGS | ETH_TXQ_FLAGS_NOVLANOFFL |\
> > > +	ETH_TXQ_FLAGS_NOXSUMSCTP | ETH_TXQ_FLAGS_NOXSUMUDP | \
> > > +	ETH_TXQ_FLAGS_NOXSUMTCP)
> > > +
> > > +#define NUM_MBUFS ((64*1024)-1)
> > > +#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
> > > +#define MBUF_CACHE_SIZE 250
> > > +#define BURST_SIZE 32
> > > +#define RTE_RING_SZ 1024
> > > +
> > > +/* uncommnet below line to enable debug logs */
> > > +/* #define DEBUG */
> > > +
> > > +#ifdef DEBUG
> > > +#define LOG_LEVEL RTE_LOG_DEBUG
> > > +#define LOG_DEBUG(log_type, fmt, args...) do {	\
> > > +	RTE_LOG(DEBUG, log_type, fmt, ##args)		\
> > > +} while (0)
> > > +#else
> > > +#define LOG_LEVEL RTE_LOG_INFO
> > > +#define LOG_DEBUG(log_type, fmt, args...) do {} while (0)
> > > +#endif
> > > +
> > > +#define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
> > > +
> > > +/* mask of enabled ports */
> > > +static uint32_t enabled_port_mask = 0;
> > > +
> > > +static volatile struct app_stats {
> > > +	struct {
> > > +		uint64_t rx_pkts;
> > > +		uint64_t returned_pkts;
> > > +		uint64_t enqueued_pkts;
> > > +	} rx __rte_cache_aligned;
> > > +
> > > +	struct {
> > > +		uint64_t dequeue_pkts;
> > > +		uint64_t tx_pkts;
> > > +	} tx __rte_cache_aligned;
> > > +} app_stats;
> > > +
> > > +static const struct rte_eth_conf port_conf_default = {
> > > +	.rxmode = {
> > > +		.mq_mode = ETH_MQ_RX_RSS,
> > > +		.max_rx_pkt_len = ETHER_MAX_LEN,
> > > +		.split_hdr_size = 0,
> > > +		.header_split   = 0, /**< Header Split disabled */
> > > +		.hw_ip_checksum = 0, /**< IP checksum offload enabled */
> > > +		.hw_vlan_filter = 0, /**< VLAN filtering disabled */
> > > +		.jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
> > > +		.hw_strip_crc   = 0, /**< CRC stripped by hardware */
> > > +	},
> > > +	.txmode = {
> > > +		.mq_mode = ETH_MQ_TX_NONE,
> > > +	},
> > > +	.lpbk_mode = 0,
> > > +	.rx_adv_conf = {
> > > +			.rss_conf = {
> > > +				.rss_hf = ETH_RSS_IPV4 | ETH_RSS_IPV6 |
> > > +					ETH_RSS_IPV4_TCP | ETH_RSS_IPV4_UDP |
> > > +					ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
> > > +			}
> > > +	},
> > > +};
> > > +
> > > +static const struct rte_eth_rxconf rx_conf_default = {
> > > +	.rx_thresh = {
> > > +		.pthresh = RX_PTHRESH,
> > > +		.hthresh = RX_HTHRESH,
> > > +		.wthresh = RX_WTHRESH,
> > > +	},
> > > +	.rx_free_thresh = RX_FREE_THRESH,
> > > +	.rx_drop_en = 0,
> > > +};
> > > +
> > > +static const struct rte_eth_txconf tx_conf_default = {
> > > +	.tx_thresh = {
> > > +		.pthresh = TX_PTHRESH,
> > > +		.hthresh = TX_HTHRESH,
> > > +		.wthresh = TX_WTHRESH,
> > > +	},
> > > +	.tx_free_thresh = TX_FREE_THRESH,
> > > +	.tx_rs_thresh = TX_RSBIT_THRESH,
> > > +	.txq_flags = TX_Q_FLAGS
> > > +
> > > +};
> > > +
> > > +struct output_buffer {
> > > +	unsigned count;
> > > +	struct rte_mbuf *mbufs[BURST_SIZE];
> > > +};
> > > +
> > > +/*
> > > + * Initialises a given port using global settings and with the rx buffers
> > > + * coming from the mbuf_pool passed as parameter
> > > + */
> > > +static inline int
> > > +port_init(uint8_t port, struct rte_mempool *mbuf_pool)
> > > +{
> > > +	struct rte_eth_conf port_conf = port_conf_default;
> > > +	const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
> > > +	int retval;
> > > +	uint16_t q;
> > > +
> > > +	if (port >= rte_eth_dev_count())
> > > +		return -1;
> > > +
> > > +	retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf);
> > > +	if (retval != 0)
> > > +		return retval;
> > > +
> > > +	for (q = 0; q < rxRings; q++) {
> > > +		retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
> > > +						rte_eth_dev_socket_id(port),
> > > +						&rx_conf_default, mbuf_pool);
> > > +		if (retval < 0)
> > > +			return retval;
> > > +	}
> > > +
> > > +	for (q = 0; q < txRings; q++) {
> > > +		retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE,
> > > +						rte_eth_dev_socket_id(port),
> > > +						&tx_conf_default);
> > > +		if (retval < 0)
> > > +			return retval;
> > > +	}
> > > +
> > > +	retval  = rte_eth_dev_start(port);
> > > +	if (retval < 0)
> > > +		return retval;
> > > +
> > > +	struct rte_eth_link link;
> > > +	rte_eth_link_get_nowait(port, &link);
> > > +	if (!link.link_status) {
> > > +		sleep(1);
> > > +		rte_eth_link_get_nowait(port, &link);
> > > +	}
> > > +
> > > +	if (!link.link_status) {
> > > +		printf("Link down on port %"PRIu8"\n", port);
> > > +		return 0;
> > > +	}
> > > +
> > > +	struct ether_addr addr;
> > > +	rte_eth_macaddr_get(port, &addr);
> > > +	printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
> > > +			" %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
> > > +			(unsigned)port,
> > > +			addr.addr_bytes[0], addr.addr_bytes[1],
> > > +			addr.addr_bytes[2], addr.addr_bytes[3],
> > > +			addr.addr_bytes[4], addr.addr_bytes[5]);
> > > +
> > > +	rte_eth_promiscuous_enable(port);
> > > +
> > > +	return 0;
> > > +}
> > > +
> > > +struct lcore_params {
> > > +	unsigned worker_id;
> > > +	struct rte_distributor *d;
> > > +	struct rte_ring *r;
> > > +};
> > > +
> > > +static __attribute__((noreturn)) void
> > > +lcore_rx(struct lcore_params *p)
> > > +{
> > > +	struct rte_distributor *d = p->d;
> > > +	struct rte_ring *r = p->r;
> > > +	const uint8_t nb_ports = rte_eth_dev_count();
> > > +	const int socket_id = rte_socket_id();
> > > +	uint8_t port;
> > > +
> > > +	for (port = 0; port < nb_ports; port++) {
> > > +		/* skip ports that are not enabled */
> > > +		if ((enabled_port_mask & (1 << port)) == 0)
> > > +			continue;
> > > +
> > > +		if (rte_eth_dev_socket_id(port) > 0 &&
> > > +				rte_eth_dev_socket_id(port) != socket_id)
> > > +			printf("WARNING, port %u is on remote NUMA node to "
> > > +					"RX thread.\n\tPerformance will not "
> > > +					"be optimal.\n", port);
> > > +	}
> > > +
> > > +	printf("\nCore %u doing packet RX.\n", rte_lcore_id());
> > > +	port = 0;
> > > +	for (;;) {
> > > +		/* skip ports that are not enabled */
> > > +		if ((enabled_port_mask & (1 << port)) == 0) {
> > > +			if (++port == nb_ports)
> > > +				port = 0;
> > > +			continue;
> > > +		}
> > > +		struct rte_mbuf *bufs[BURST_SIZE*2];
> > > +		const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
> > > +				BURST_SIZE);
> > > +		app_stats.rx.rx_pkts += nb_rx;
> > > +
> > > +		rte_distributor_process(d, bufs, nb_rx);
> > > +		const uint16_t nb_ret = rte_distributor_returned_pkts(d,
> > > +				bufs, BURST_SIZE*2);
> > > +		app_stats.rx.returned_pkts += nb_ret;
> > > +		if (unlikely(nb_ret == 0))
> > > +			continue;
> > > +
> > > +		uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret);
> > > +		app_stats.rx.enqueued_pkts += sent;
> > > +		if (unlikely(sent < nb_ret)) {
> > > +			LOG_DEBUG(DISTRAPP, "%s:Packet loss due to full ring\n", __func__);
> > > +			while (sent < nb_ret)
> > > +				rte_pktmbuf_free(bufs[sent++]);
> > > +		}
> > > +		if (++port == nb_ports)
> > > +			port = 0;
> > > +	}
> > > +}
> > > +
> > > +static inline void
> > > +flush_one_port(struct output_buffer *outbuf, uint8_t outp)
> > > +{
> > > +	unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs,
> > > +			outbuf->count);
> > > +	app_stats.tx.tx_pkts += nb_tx;
> > > +
> > > +	if (unlikely(nb_tx < outbuf->count)) {
> > > +		LOG_DEBUG(DISTRAPP, "%s:Packet loss with tx_burst\n", __func__);
> > > +		do {
> > > +			rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
> > > +		} while (++nb_tx < outbuf->count);
> > > +	}
> > > +	outbuf->count = 0;
> > > +}
> > > +
> > > +static inline void
> > > +flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports)
> > > +{
> > > +	uint8_t outp;
> > > +	for (outp = 0; outp < nb_ports; outp++) {
> > > +		/* skip ports that are not enabled */
> > > +		if ((enabled_port_mask & (1 << outp)) == 0)
> > > +			continue;
> > > +
> > > +		if (tx_buffers[outp].count == 0)
> > > +			continue;
> > > +
> > > +		flush_one_port(&tx_buffers[outp], outp);
> > > +	}
> > > +}
> > > +
> > > +static __attribute__((noreturn)) void
> > > +lcore_tx(struct rte_ring *in_r)
> > > +{
> > > +	static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS];
> > > +	const uint8_t nb_ports = rte_eth_dev_count();
> > > +	const int socket_id = rte_socket_id();
> > > +	uint8_t port;
> > > +
> > > +	for (port = 0; port < nb_ports; port++) {
> > > +		/* skip ports that are not enabled */
> > > +		if ((enabled_port_mask & (1 << port)) == 0)
> > > +			continue;
> > > +
> > > +		if (rte_eth_dev_socket_id(port) > 0 &&
> > > +				rte_eth_dev_socket_id(port) != socket_id)
> > > +			printf("WARNING, port %u is on remote NUMA node to "
> > > +					"TX thread.\n\tPerformance will not "
> > > +					"be optimal.\n", port);
> > > +	}
> > > +
> > > +	printf("\nCore %u doing packet TX.\n", rte_lcore_id());
> > > +	for (;;) {
> > > +		for (port = 0; port < nb_ports; port++) {
> > > +			/* skip ports that are not enabled */
> > > +			if ((enabled_port_mask & (1 << port)) == 0)
> > > +				continue;
> > > +
> > > +			struct rte_mbuf *bufs[BURST_SIZE];
> > > +			const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
> > > +					(void *)bufs, BURST_SIZE);
> > > +			app_stats.tx.dequeue_pkts += nb_rx;
> > > +
> > > +			/* if we get no traffic, flush anything we have */
> > > +			if (unlikely(nb_rx == 0)) {
> > > +				flush_all_ports(tx_buffers, nb_ports);
> > > +				continue;
> > > +			}
> > > +
> > > +			/* for traffic we receive, queue it up for transmit */
> > > +			uint16_t i;
> > > +			_mm_prefetch(bufs[0], 0);
> > > +			_mm_prefetch(bufs[1], 0);
> > > +			_mm_prefetch(bufs[2], 0);
> > > +			for (i = 0; i < nb_rx; i++) {
> > > +				struct output_buffer *outbuf;
> > > +				uint8_t outp;
> > > +				_mm_prefetch(bufs[i + 3], 0);
> > > +				/* workers should update in_port to hold the
> > > +				 * output port value */
> > > +				outp = bufs[i]->port;
> > > +				/* skip ports that are not enabled */
> > > +				if ((enabled_port_mask & (1 << outp)) == 0)
> > > +					continue;
> > > +
> > > +				outbuf = &tx_buffers[outp];
> > > +				outbuf->mbufs[outbuf->count++] = bufs[i];
> > > +				if (outbuf->count == BURST_SIZE)
> > > +					flush_one_port(outbuf, outp);
> > > +			}
> > > +		}
> > > +	}
> > > +}
> > > +
> > > +
> > > +static __attribute__((noreturn)) void
> > > +lcore_worker(struct lcore_params *p)
> > > +{
> > > +	struct rte_distributor *d = p->d;
> > > +	const unsigned id = p->worker_id;
> > > +	/* for single port, xor_val will be zero so we won't modify the output
> > > +	 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa
> > > +	 */
> > > +	const unsigned xor_val = (rte_eth_dev_count() > 1);
> > > +	struct rte_mbuf *buf = NULL;
> > > +
> > > +	printf("\nCore %u acting as worker core.\n", rte_lcore_id());
> > > +	for (;;) {
> > > +		buf = rte_distributor_get_pkt(d, id, buf);
> > > +		buf->port ^= xor_val;
> > > +	}
> > > +}
> > > +
> > > +static void
> > > +int_handler(int sig_num)
> > > +{
> > > +	struct rte_eth_stats eth_stats;
> > > +	unsigned i;
> > > +
> > > +	printf("Exiting on signal %d\n", sig_num);
> > > +
> > > +	printf("\nRX thread stats:\n");
> > > +	printf(" - Received:    %"PRIu64"\n", app_stats.rx.rx_pkts);
> > > +	printf(" - Processed:   %"PRIu64"\n", app_stats.rx.returned_pkts);
> > > +	printf(" - Enqueued:    %"PRIu64"\n", app_stats.rx.enqueued_pkts);
> > > +
> > > +	printf("\nTX thread stats:\n");
> > > +	printf(" - Dequeued:    %"PRIu64"\n", app_stats.tx.dequeue_pkts);
> > > +	printf(" - Transmitted: %"PRIu64"\n", app_stats.tx.tx_pkts);
> > > +
> > > +	for (i = 0; i < rte_eth_dev_count(); i++) {
> > > +		rte_eth_stats_get(i, &eth_stats);
> > > +		printf("\nPort %u stats:\n", i);
> > > +		printf(" - Pkts in:   %"PRIu64"\n", eth_stats.ipackets);
> > > +		printf(" - Pkts out:  %"PRIu64"\n", eth_stats.opackets);
> > > +		printf(" - In Errs:   %"PRIu64"\n", eth_stats.ierrors);
> > > +		printf(" - Out Errs:  %"PRIu64"\n", eth_stats.oerrors);
> > > +		printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf);
> > > +	}
> > > +	exit(0);
> > rte_exit here?  Also, this is a pretty ungraceful exit strategy as all the
> > threads you've created and memory you've allocated are just forgotten here.
> > Given that dpdk mempools are shared, this has the potential to leak lots of
> > memory if other apps are using the dpdk at the same time that you run this.  You
> > probably want to use the sigint handler to raise a flag to the tx/rx threads to
> > shutdown gracefully, and then free your allocated memory and mempool.
> > 
> > Neil
> >
> 
> Unless the different processes are explicitly cooperating as 
> primary/secondary, the mempools are not shared. I just don't see the need 
> for this app to do more cleanup on ctrl-c signal, as it's not intended to be 
> a multiprocess app, and there is little that any secondary process could do 
> to work with this app, except possibly some resource monitoring, which would 
> be completely unaffected by it exiting the way it does.
> 
Ah, ok, so we don't use a common shared pool between isolated processes then,
thats good.  Still though, this is a sample application, I think its lazy
programming practice to illustrate to application developers that its generally
ok to exit programs without freeing your resources.  Its about 20 lines of
additional code to change the sigint handler to flag an exit condition, and have
all the other threads join on it.

Neil

> /Bruce
>
  
Pattan, Reshma Oct. 1, 2014, 2:47 p.m. UTC | #4
> -----Original Message-----
> From: Neil Horman [mailto:nhorman@tuxdriver.com]
> Sent: Tuesday, September 30, 2014 2:40 PM
> To: Richardson, Bruce
> Cc: Pattan, Reshma; dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app
> 
> On Tue, Sep 30, 2014 at 01:18:28PM +0100, Bruce Richardson wrote:
> > On Tue, Sep 30, 2014 at 07:34:45AM -0400, Neil Horman wrote:
> > > On Tue, Sep 30, 2014 at 11:39:37AM +0100, reshmapa wrote:
> > > > From: Reshma Pattan <reshma.pattan@intel.com>
> > > >
> > > > A new sample app that shows the usage of the distributor library.
> > > > This app works as follows:
> > > >
> > > > * An RX thread runs which pulls packets from each ethernet port in turn
> > > >   and passes those packets to worker using a distributor component.
> > > > * The workers take the packets in turn, and determine the output port
> > > >   for those packets using basic l2forwarding doing an xor on the source
> > > >   port id.
> > > > * The RX thread takes the returned packets from the workers and enqueue
> > > >   those packets into an rte_ring structure.
> > > > * A TX thread pulls the packets off the rte_ring structure and then
> > > >   sends each packet out the output port specified previously by
> > > > the worker
> > > > * Command-line option support provided only for portmask.
> > > >
> > > > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> > > > Signed-off-by: Reshma Pattan    <reshma.pattan@intel.com>
> > > > ---
> > > >  examples/Makefile                 |    1 +
> > > >  examples/distributor_app/Makefile |   57 ++++
> > > >  examples/distributor_app/main.c   |  600
> +++++++++++++++++++++++++++++++++++++
> > > >  examples/distributor_app/main.h   |   46 +++
> > > >  4 files changed, 704 insertions(+), 0 deletions(-)  create mode
> > > > 100644 examples/distributor_app/Makefile  create mode 100644
> > > > examples/distributor_app/main.c  create mode 100644
> > > > examples/distributor_app/main.h
> > > >
> > > > diff --git a/examples/Makefile b/examples/Makefile index
> > > > 6245f83..2ba82b0 100644
> > > > --- a/examples/Makefile
> > > > +++ b/examples/Makefile
> > > > @@ -66,5 +66,6 @@ DIRS-y += vhost
> > > >  DIRS-$(CONFIG_RTE_LIBRTE_XEN_DOM0) += vhost_xen  DIRS-y += vmdq
> > > > DIRS-y += vmdq_dcb
> > > > +DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += distributor_app
> > > >
> > > >  include $(RTE_SDK)/mk/rte.extsubdir.mk diff --git
> > > > a/examples/distributor_app/Makefile
> > > > b/examples/distributor_app/Makefile
> > > > new file mode 100644
> > > > index 0000000..6a5bada
> > > > --- /dev/null
> > > > +++ b/examples/distributor_app/Makefile
> > > > @@ -0,0 +1,57 @@
> > > > +#   BSD LICENSE
> > > > +#
> > > > +#   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> > > > +#   All rights reserved.
> > > > +#
> > > > +#   Redistribution and use in source and binary forms, with or without
> > > > +#   modification, are permitted provided that the following conditions
> > > > +#   are met:
> > > > +#
> > > > +#     * Redistributions of source code must retain the above copyright
> > > > +#       notice, this list of conditions and the following disclaimer.
> > > > +#     * Redistributions in binary form must reproduce the above copyright
> > > > +#       notice, this list of conditions and the following disclaimer in
> > > > +#       the documentation and/or other materials provided with the
> > > > +#       distribution.
> > > > +#     * Neither the name of Intel Corporation nor the names of its
> > > > +#       contributors may be used to endorse or promote products derived
> > > > +#       from this software without specific prior written permission.
> > > > +#
> > > > +#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
> CONTRIBUTORS
> > > > +#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT
> NOT
> > > > +#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
> FITNESS FOR
> > > > +#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
> COPYRIGHT
> > > > +#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
> INCIDENTAL,
> > > > +#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
> BUT NOT
> > > > +#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
> LOSS OF USE,
> > > > +#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
> AND ON ANY
> > > > +#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
> TORT
> > > > +#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
> OF THE USE
> > > > +#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
> DAMAGE.
> > > > +
> > > > +ifeq ($(RTE_SDK),)
> > > > +$(error "Please define RTE_SDK environment variable") endif
> > > > +
> > > > +# Default target, can be overriden by command line or environment
> > > > +RTE_TARGET ?= x86_64-native-linuxapp-gcc
> > > > +
> > > > +include $(RTE_SDK)/mk/rte.vars.mk
> > > > +
> > > > +# binary name
> > > > +APP = distributor_app
> > > > +
> > > > +# all source are stored in SRCS-y SRCS-y := main.c
> > > > +
> > > > +CFLAGS += $(WERROR_FLAGS)
> > > > +
> > > > +# workaround for a gcc bug with noreturn attribute #
> > > > +http://gcc.gnu.org/bugzilla/show_bug.cgi?id=12603
> > > > +ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y) CFLAGS_main.o +=
> > > > +-Wno-return-type endif
> > > > +
> > > > +EXTRA_CFLAGS += -O3 -Wfatal-errors
> > > > +
> > > > +include $(RTE_SDK)/mk/rte.extapp.mk
> > > > diff --git a/examples/distributor_app/main.c
> > > > b/examples/distributor_app/main.c new file mode 100644 index
> > > > 0000000..f555d93
> > > > --- /dev/null
> > > > +++ b/examples/distributor_app/main.c
> > > > @@ -0,0 +1,600 @@
> > > > +/*-
> > > > + *   BSD LICENSE
> > > > + *
> > > > + *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> > > > + *   All rights reserved.
> > > > + *
> > > > + *   Redistribution and use in source and binary forms, with or without
> > > > + *   modification, are permitted provided that the following conditions
> > > > + *   are met:
> > > > + *
> > > > + *     * Redistributions of source code must retain the above copyright
> > > > + *       notice, this list of conditions and the following disclaimer.
> > > > + *     * Redistributions in binary form must reproduce the above copyright
> > > > + *       notice, this list of conditions and the following disclaimer in
> > > > + *       the documentation and/or other materials provided with the
> > > > + *       distribution.
> > > > + *     * Neither the name of Intel Corporation nor the names of its
> > > > + *       contributors may be used to endorse or promote products derived
> > > > + *       from this software without specific prior written permission.
> > > > + *
> > > > + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
> CONTRIBUTORS
> > > > + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
> BUT NOT
> > > > + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
> FITNESS FOR
> > > > + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
> COPYRIGHT
> > > > + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
> INCIDENTAL,
> > > > + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
> BUT NOT
> > > > + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
> LOSS OF USE,
> > > > + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
> AND ON ANY
> > > > + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
> TORT
> > > > + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
> OF THE USE
> > > > + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
> DAMAGE.
> > > > + */
> > > > +
> > > > +#include <stdint.h>
> > > > +#include <inttypes.h>
> > > > +#include <unistd.h>
> > > > +#include <signal.h>
> > > > +#include <getopt.h>
> > > > +
> > > > +#include <rte_eal.h>
> > > > +#include <rte_ethdev.h>
> > > > +#include <rte_cycles.h>
> > > > +#include <rte_malloc.h>
> > > > +#include <rte_debug.h>
> > > > +#include <rte_distributor.h>
> > > > +
> > > > +#include "main.h"
> > > > +
> > > > +#define RX_RING_SIZE 256
> > > > +#define RX_FREE_THRESH 32
> > > > +#define RX_PTHRESH 8
> > > > +#define RX_HTHRESH 8
> > > > +#define RX_WTHRESH 0
> > > > +
> > > > +#define TX_RING_SIZE 512
> > > > +#define TX_FREE_THRESH 32
> > > > +#define TX_PTHRESH 32
> > > > +#define TX_HTHRESH 0
> > > > +#define TX_WTHRESH 0
> > > > +#define TX_RSBIT_THRESH 32
> > > > +#define TX_Q_FLAGS (ETH_TXQ_FLAGS_NOMULTSEGS |
> ETH_TXQ_FLAGS_NOVLANOFFL |\
> > > > +	ETH_TXQ_FLAGS_NOXSUMSCTP | ETH_TXQ_FLAGS_NOXSUMUDP | \
> > > > +	ETH_TXQ_FLAGS_NOXSUMTCP)
> > > > +
> > > > +#define NUM_MBUFS ((64*1024)-1)
> > > > +#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) +
> > > > +RTE_PKTMBUF_HEADROOM) #define MBUF_CACHE_SIZE 250 #define
> > > > +BURST_SIZE 32 #define RTE_RING_SZ 1024
> > > > +
> > > > +/* uncommnet below line to enable debug logs */
> > > > +/* #define DEBUG */
> > > > +
> > > > +#ifdef DEBUG
> > > > +#define LOG_LEVEL RTE_LOG_DEBUG
> > > > +#define LOG_DEBUG(log_type, fmt, args...) do {	\
> > > > +	RTE_LOG(DEBUG, log_type, fmt, ##args)		\
> > > > +} while (0)
> > > > +#else
> > > > +#define LOG_LEVEL RTE_LOG_INFO
> > > > +#define LOG_DEBUG(log_type, fmt, args...) do {} while (0) #endif
> > > > +
> > > > +#define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
> > > > +
> > > > +/* mask of enabled ports */
> > > > +static uint32_t enabled_port_mask = 0;
> > > > +
> > > > +static volatile struct app_stats {
> > > > +	struct {
> > > > +		uint64_t rx_pkts;
> > > > +		uint64_t returned_pkts;
> > > > +		uint64_t enqueued_pkts;
> > > > +	} rx __rte_cache_aligned;
> > > > +
> > > > +	struct {
> > > > +		uint64_t dequeue_pkts;
> > > > +		uint64_t tx_pkts;
> > > > +	} tx __rte_cache_aligned;
> > > > +} app_stats;
> > > > +
> > > > +static const struct rte_eth_conf port_conf_default = {
> > > > +	.rxmode = {
> > > > +		.mq_mode = ETH_MQ_RX_RSS,
> > > > +		.max_rx_pkt_len = ETHER_MAX_LEN,
> > > > +		.split_hdr_size = 0,
> > > > +		.header_split   = 0, /**< Header Split disabled */
> > > > +		.hw_ip_checksum = 0, /**< IP checksum offload enabled */
> > > > +		.hw_vlan_filter = 0, /**< VLAN filtering disabled */
> > > > +		.jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
> > > > +		.hw_strip_crc   = 0, /**< CRC stripped by hardware */
> > > > +	},
> > > > +	.txmode = {
> > > > +		.mq_mode = ETH_MQ_TX_NONE,
> > > > +	},
> > > > +	.lpbk_mode = 0,
> > > > +	.rx_adv_conf = {
> > > > +			.rss_conf = {
> > > > +				.rss_hf = ETH_RSS_IPV4 | ETH_RSS_IPV6 |
> > > > +					ETH_RSS_IPV4_TCP |
> ETH_RSS_IPV4_UDP |
> > > > +					ETH_RSS_IPV6_TCP |
> ETH_RSS_IPV6_UDP,
> > > > +			}
> > > > +	},
> > > > +};
> > > > +
> > > > +static const struct rte_eth_rxconf rx_conf_default = {
> > > > +	.rx_thresh = {
> > > > +		.pthresh = RX_PTHRESH,
> > > > +		.hthresh = RX_HTHRESH,
> > > > +		.wthresh = RX_WTHRESH,
> > > > +	},
> > > > +	.rx_free_thresh = RX_FREE_THRESH,
> > > > +	.rx_drop_en = 0,
> > > > +};
> > > > +
> > > > +static const struct rte_eth_txconf tx_conf_default = {
> > > > +	.tx_thresh = {
> > > > +		.pthresh = TX_PTHRESH,
> > > > +		.hthresh = TX_HTHRESH,
> > > > +		.wthresh = TX_WTHRESH,
> > > > +	},
> > > > +	.tx_free_thresh = TX_FREE_THRESH,
> > > > +	.tx_rs_thresh = TX_RSBIT_THRESH,
> > > > +	.txq_flags = TX_Q_FLAGS
> > > > +
> > > > +};
> > > > +
> > > > +struct output_buffer {
> > > > +	unsigned count;
> > > > +	struct rte_mbuf *mbufs[BURST_SIZE]; };
> > > > +
> > > > +/*
> > > > + * Initialises a given port using global settings and with the rx
> > > > +buffers
> > > > + * coming from the mbuf_pool passed as parameter  */ static
> > > > +inline int port_init(uint8_t port, struct rte_mempool *mbuf_pool)
> > > > +{
> > > > +	struct rte_eth_conf port_conf = port_conf_default;
> > > > +	const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
> > > > +	int retval;
> > > > +	uint16_t q;
> > > > +
> > > > +	if (port >= rte_eth_dev_count())
> > > > +		return -1;
> > > > +
> > > > +	retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf);
> > > > +	if (retval != 0)
> > > > +		return retval;
> > > > +
> > > > +	for (q = 0; q < rxRings; q++) {
> > > > +		retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
> > > > +						rte_eth_dev_socket_id(port),
> > > > +						&rx_conf_default, mbuf_pool);
> > > > +		if (retval < 0)
> > > > +			return retval;
> > > > +	}
> > > > +
> > > > +	for (q = 0; q < txRings; q++) {
> > > > +		retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE,
> > > > +						rte_eth_dev_socket_id(port),
> > > > +						&tx_conf_default);
> > > > +		if (retval < 0)
> > > > +			return retval;
> > > > +	}
> > > > +
> > > > +	retval  = rte_eth_dev_start(port);
> > > > +	if (retval < 0)
> > > > +		return retval;
> > > > +
> > > > +	struct rte_eth_link link;
> > > > +	rte_eth_link_get_nowait(port, &link);
> > > > +	if (!link.link_status) {
> > > > +		sleep(1);
> > > > +		rte_eth_link_get_nowait(port, &link);
> > > > +	}
> > > > +
> > > > +	if (!link.link_status) {
> > > > +		printf("Link down on port %"PRIu8"\n", port);
> > > > +		return 0;
> > > > +	}
> > > > +
> > > > +	struct ether_addr addr;
> > > > +	rte_eth_macaddr_get(port, &addr);
> > > > +	printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
> > > > +			" %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
> > > > +			(unsigned)port,
> > > > +			addr.addr_bytes[0], addr.addr_bytes[1],
> > > > +			addr.addr_bytes[2], addr.addr_bytes[3],
> > > > +			addr.addr_bytes[4], addr.addr_bytes[5]);
> > > > +
> > > > +	rte_eth_promiscuous_enable(port);
> > > > +
> > > > +	return 0;
> > > > +}
> > > > +
> > > > +struct lcore_params {
> > > > +	unsigned worker_id;
> > > > +	struct rte_distributor *d;
> > > > +	struct rte_ring *r;
> > > > +};
> > > > +
> > > > +static __attribute__((noreturn)) void lcore_rx(struct
> > > > +lcore_params *p) {
> > > > +	struct rte_distributor *d = p->d;
> > > > +	struct rte_ring *r = p->r;
> > > > +	const uint8_t nb_ports = rte_eth_dev_count();
> > > > +	const int socket_id = rte_socket_id();
> > > > +	uint8_t port;
> > > > +
> > > > +	for (port = 0; port < nb_ports; port++) {
> > > > +		/* skip ports that are not enabled */
> > > > +		if ((enabled_port_mask & (1 << port)) == 0)
> > > > +			continue;
> > > > +
> > > > +		if (rte_eth_dev_socket_id(port) > 0 &&
> > > > +				rte_eth_dev_socket_id(port) != socket_id)
> > > > +			printf("WARNING, port %u is on remote NUMA node to
> "
> > > > +					"RX thread.\n\tPerformance will not "
> > > > +					"be optimal.\n", port);
> > > > +	}
> > > > +
> > > > +	printf("\nCore %u doing packet RX.\n", rte_lcore_id());
> > > > +	port = 0;
> > > > +	for (;;) {
> > > > +		/* skip ports that are not enabled */
> > > > +		if ((enabled_port_mask & (1 << port)) == 0) {
> > > > +			if (++port == nb_ports)
> > > > +				port = 0;
> > > > +			continue;
> > > > +		}
> > > > +		struct rte_mbuf *bufs[BURST_SIZE*2];
> > > > +		const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
> > > > +				BURST_SIZE);
> > > > +		app_stats.rx.rx_pkts += nb_rx;
> > > > +
> > > > +		rte_distributor_process(d, bufs, nb_rx);
> > > > +		const uint16_t nb_ret = rte_distributor_returned_pkts(d,
> > > > +				bufs, BURST_SIZE*2);
> > > > +		app_stats.rx.returned_pkts += nb_ret;
> > > > +		if (unlikely(nb_ret == 0))
> > > > +			continue;
> > > > +
> > > > +		uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret);
> > > > +		app_stats.rx.enqueued_pkts += sent;
> > > > +		if (unlikely(sent < nb_ret)) {
> > > > +			LOG_DEBUG(DISTRAPP, "%s:Packet loss due to full
> ring\n", __func__);
> > > > +			while (sent < nb_ret)
> > > > +				rte_pktmbuf_free(bufs[sent++]);
> > > > +		}
> > > > +		if (++port == nb_ports)
> > > > +			port = 0;
> > > > +	}
> > > > +}
> > > > +
> > > > +static inline void
> > > > +flush_one_port(struct output_buffer *outbuf, uint8_t outp) {
> > > > +	unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs,
> > > > +			outbuf->count);
> > > > +	app_stats.tx.tx_pkts += nb_tx;
> > > > +
> > > > +	if (unlikely(nb_tx < outbuf->count)) {
> > > > +		LOG_DEBUG(DISTRAPP, "%s:Packet loss with tx_burst\n",
> __func__);
> > > > +		do {
> > > > +			rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
> > > > +		} while (++nb_tx < outbuf->count);
> > > > +	}
> > > > +	outbuf->count = 0;
> > > > +}
> > > > +
> > > > +static inline void
> > > > +flush_all_ports(struct output_buffer *tx_buffers, uint8_t
> > > > +nb_ports) {
> > > > +	uint8_t outp;
> > > > +	for (outp = 0; outp < nb_ports; outp++) {
> > > > +		/* skip ports that are not enabled */
> > > > +		if ((enabled_port_mask & (1 << outp)) == 0)
> > > > +			continue;
> > > > +
> > > > +		if (tx_buffers[outp].count == 0)
> > > > +			continue;
> > > > +
> > > > +		flush_one_port(&tx_buffers[outp], outp);
> > > > +	}
> > > > +}
> > > > +
> > > > +static __attribute__((noreturn)) void lcore_tx(struct rte_ring
> > > > +*in_r) {
> > > > +	static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS];
> > > > +	const uint8_t nb_ports = rte_eth_dev_count();
> > > > +	const int socket_id = rte_socket_id();
> > > > +	uint8_t port;
> > > > +
> > > > +	for (port = 0; port < nb_ports; port++) {
> > > > +		/* skip ports that are not enabled */
> > > > +		if ((enabled_port_mask & (1 << port)) == 0)
> > > > +			continue;
> > > > +
> > > > +		if (rte_eth_dev_socket_id(port) > 0 &&
> > > > +				rte_eth_dev_socket_id(port) != socket_id)
> > > > +			printf("WARNING, port %u is on remote NUMA node to
> "
> > > > +					"TX thread.\n\tPerformance will not "
> > > > +					"be optimal.\n", port);
> > > > +	}
> > > > +
> > > > +	printf("\nCore %u doing packet TX.\n", rte_lcore_id());
> > > > +	for (;;) {
> > > > +		for (port = 0; port < nb_ports; port++) {
> > > > +			/* skip ports that are not enabled */
> > > > +			if ((enabled_port_mask & (1 << port)) == 0)
> > > > +				continue;
> > > > +
> > > > +			struct rte_mbuf *bufs[BURST_SIZE];
> > > > +			const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
> > > > +					(void *)bufs, BURST_SIZE);
> > > > +			app_stats.tx.dequeue_pkts += nb_rx;
> > > > +
> > > > +			/* if we get no traffic, flush anything we have */
> > > > +			if (unlikely(nb_rx == 0)) {
> > > > +				flush_all_ports(tx_buffers, nb_ports);
> > > > +				continue;
> > > > +			}
> > > > +
> > > > +			/* for traffic we receive, queue it up for transmit */
> > > > +			uint16_t i;
> > > > +			_mm_prefetch(bufs[0], 0);
> > > > +			_mm_prefetch(bufs[1], 0);
> > > > +			_mm_prefetch(bufs[2], 0);
> > > > +			for (i = 0; i < nb_rx; i++) {
> > > > +				struct output_buffer *outbuf;
> > > > +				uint8_t outp;
> > > > +				_mm_prefetch(bufs[i + 3], 0);
> > > > +				/* workers should update in_port to hold the
> > > > +				 * output port value */
> > > > +				outp = bufs[i]->port;
> > > > +				/* skip ports that are not enabled */
> > > > +				if ((enabled_port_mask & (1 << outp)) == 0)
> > > > +					continue;
> > > > +
> > > > +				outbuf = &tx_buffers[outp];
> > > > +				outbuf->mbufs[outbuf->count++] = bufs[i];
> > > > +				if (outbuf->count == BURST_SIZE)
> > > > +					flush_one_port(outbuf, outp);
> > > > +			}
> > > > +		}
> > > > +	}
> > > > +}
> > > > +
> > > > +
> > > > +static __attribute__((noreturn)) void lcore_worker(struct
> > > > +lcore_params *p) {
> > > > +	struct rte_distributor *d = p->d;
> > > > +	const unsigned id = p->worker_id;
> > > > +	/* for single port, xor_val will be zero so we won't modify the output
> > > > +	 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa
> > > > +	 */
> > > > +	const unsigned xor_val = (rte_eth_dev_count() > 1);
> > > > +	struct rte_mbuf *buf = NULL;
> > > > +
> > > > +	printf("\nCore %u acting as worker core.\n", rte_lcore_id());
> > > > +	for (;;) {
> > > > +		buf = rte_distributor_get_pkt(d, id, buf);
> > > > +		buf->port ^= xor_val;
> > > > +	}
> > > > +}
> > > > +
> > > > +static void
> > > > +int_handler(int sig_num)
> > > > +{
> > > > +	struct rte_eth_stats eth_stats;
> > > > +	unsigned i;
> > > > +
> > > > +	printf("Exiting on signal %d\n", sig_num);
> > > > +
> > > > +	printf("\nRX thread stats:\n");
> > > > +	printf(" - Received:    %"PRIu64"\n", app_stats.rx.rx_pkts);
> > > > +	printf(" - Processed:   %"PRIu64"\n", app_stats.rx.returned_pkts);
> > > > +	printf(" - Enqueued:    %"PRIu64"\n", app_stats.rx.enqueued_pkts);
> > > > +
> > > > +	printf("\nTX thread stats:\n");
> > > > +	printf(" - Dequeued:    %"PRIu64"\n", app_stats.tx.dequeue_pkts);
> > > > +	printf(" - Transmitted: %"PRIu64"\n", app_stats.tx.tx_pkts);
> > > > +
> > > > +	for (i = 0; i < rte_eth_dev_count(); i++) {
> > > > +		rte_eth_stats_get(i, &eth_stats);
> > > > +		printf("\nPort %u stats:\n", i);
> > > > +		printf(" - Pkts in:   %"PRIu64"\n", eth_stats.ipackets);
> > > > +		printf(" - Pkts out:  %"PRIu64"\n", eth_stats.opackets);
> > > > +		printf(" - In Errs:   %"PRIu64"\n", eth_stats.ierrors);
> > > > +		printf(" - Out Errs:  %"PRIu64"\n", eth_stats.oerrors);
> > > > +		printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf);
> > > > +	}
> > > > +	exit(0);
> > > rte_exit here?  Also, this is a pretty ungraceful exit strategy as
> > > all the threads you've created and memory you've allocated are just
> forgotten here.
> > > Given that dpdk mempools are shared, this has the potential to leak
> > > lots of memory if other apps are using the dpdk at the same time
> > > that you run this.  You probably want to use the sigint handler to
> > > raise a flag to the tx/rx threads to shutdown gracefully, and then free your
> allocated memory and mempool.
> > >
> > > Neil
> > >
> >
> > Unless the different processes are explicitly cooperating as
> > primary/secondary, the mempools are not shared. I just don't see the
> > need for this app to do more cleanup on ctrl-c signal, as it's not
> > intended to be a multiprocess app, and there is little that any
> > secondary process could do to work with this app, except possibly some
> > resource monitoring, which would be completely unaffected by it exiting the
> way it does.
> >
> Ah, ok, so we don't use a common shared pool between isolated processes
> then, thats good.  Still though, this is a sample application, I think its lazy
> programming practice to illustrate to application developers that its generally ok
> to exit programs without freeing your resources.  Its about 20 lines of additional
> code to change the sigint handler to flag an exit condition, and have all the
> other threads join on it.
> 
> Neil

1)I had sent v5 patch which handles graceful shutdown of rx and tx threads upon SIGINT
2)Worker thread graceful shutdown was not handled as of now as it needs some change in lcore_worker logic , which will be done in future enhancements.
3)Freeing of mempool is also not handled , as the framework support is not available.
4)Cleaning of rx/tx queues not done, as it needs some extensive logic which we haven't planned as of now. Will check the possibility of doing it in future  enhancements    i.e in next version of sample application.

/Reshma

> 
> > /Bruce
> >


--------------------------------------------------------------
Intel Shannon Limited
Registered in Ireland
Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
Registered Number: 308263
Business address: Dromore House, East Park, Shannon, Co. Clare

This e-mail and any attachments may contain confidential material for the sole use of the intended recipient(s). Any review or distribution by others is strictly prohibited. If you are not the intended recipient, please contact the sender and delete all copies.
  
Neil Horman Oct. 1, 2014, 2:56 p.m. UTC | #5
On Wed, Oct 01, 2014 at 02:47:00PM +0000, Pattan, Reshma wrote:
> 
> 
> > -----Original Message-----
> > From: Neil Horman [mailto:nhorman@tuxdriver.com]
> > Sent: Tuesday, September 30, 2014 2:40 PM
> > To: Richardson, Bruce
> > Cc: Pattan, Reshma; dev@dpdk.org
> > Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app
> > 
> > On Tue, Sep 30, 2014 at 01:18:28PM +0100, Bruce Richardson wrote:
> > > On Tue, Sep 30, 2014 at 07:34:45AM -0400, Neil Horman wrote:
> > > > On Tue, Sep 30, 2014 at 11:39:37AM +0100, reshmapa wrote:
> > > > > From: Reshma Pattan <reshma.pattan@intel.com>
> > > > >
> > > > > A new sample app that shows the usage of the distributor library.
> > > > > This app works as follows:
> > > > >
> > > > > * An RX thread runs which pulls packets from each ethernet port in turn
> > > > >   and passes those packets to worker using a distributor component.
> > > > > * The workers take the packets in turn, and determine the output port
> > > > >   for those packets using basic l2forwarding doing an xor on the source
> > > > >   port id.
> > > > > * The RX thread takes the returned packets from the workers and enqueue
> > > > >   those packets into an rte_ring structure.
> > > > > * A TX thread pulls the packets off the rte_ring structure and then
> > > > >   sends each packet out the output port specified previously by
> > > > > the worker
> > > > > * Command-line option support provided only for portmask.
> > > > >
> > > > > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> > > > > Signed-off-by: Reshma Pattan    <reshma.pattan@intel.com>
> > > > > ---
> > > > >  examples/Makefile                 |    1 +
> > > > >  examples/distributor_app/Makefile |   57 ++++
> > > > >  examples/distributor_app/main.c   |  600
> > +++++++++++++++++++++++++++++++++++++
> > > > >  examples/distributor_app/main.h   |   46 +++
> > > > >  4 files changed, 704 insertions(+), 0 deletions(-)  create mode
> > > > > 100644 examples/distributor_app/Makefile  create mode 100644
> > > > > examples/distributor_app/main.c  create mode 100644
> > > > > examples/distributor_app/main.h
> > > > >
> > > > > diff --git a/examples/Makefile b/examples/Makefile index
> > > > > 6245f83..2ba82b0 100644
> > > > > --- a/examples/Makefile
> > > > > +++ b/examples/Makefile
> > > > > @@ -66,5 +66,6 @@ DIRS-y += vhost
> > > > >  DIRS-$(CONFIG_RTE_LIBRTE_XEN_DOM0) += vhost_xen  DIRS-y += vmdq
> > > > > DIRS-y += vmdq_dcb
> > > > > +DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += distributor_app
> > > > >
> > > > >  include $(RTE_SDK)/mk/rte.extsubdir.mk diff --git
> > > > > a/examples/distributor_app/Makefile
> > > > > b/examples/distributor_app/Makefile
> > > > > new file mode 100644
> > > > > index 0000000..6a5bada
> > > > > --- /dev/null
> > > > > +++ b/examples/distributor_app/Makefile
> > > > > @@ -0,0 +1,57 @@
> > > > > +#   BSD LICENSE
> > > > > +#
> > > > > +#   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> > > > > +#   All rights reserved.
> > > > > +#
> > > > > +#   Redistribution and use in source and binary forms, with or without
> > > > > +#   modification, are permitted provided that the following conditions
> > > > > +#   are met:
> > > > > +#
> > > > > +#     * Redistributions of source code must retain the above copyright
> > > > > +#       notice, this list of conditions and the following disclaimer.
> > > > > +#     * Redistributions in binary form must reproduce the above copyright
> > > > > +#       notice, this list of conditions and the following disclaimer in
> > > > > +#       the documentation and/or other materials provided with the
> > > > > +#       distribution.
> > > > > +#     * Neither the name of Intel Corporation nor the names of its
> > > > > +#       contributors may be used to endorse or promote products derived
> > > > > +#       from this software without specific prior written permission.
> > > > > +#
> > > > > +#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
> > CONTRIBUTORS
> > > > > +#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT
> > NOT
> > > > > +#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
> > FITNESS FOR
> > > > > +#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
> > COPYRIGHT
> > > > > +#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
> > INCIDENTAL,
> > > > > +#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
> > BUT NOT
> > > > > +#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
> > LOSS OF USE,
> > > > > +#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
> > AND ON ANY
> > > > > +#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
> > TORT
> > > > > +#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
> > OF THE USE
> > > > > +#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
> > DAMAGE.
> > > > > +
> > > > > +ifeq ($(RTE_SDK),)
> > > > > +$(error "Please define RTE_SDK environment variable") endif
> > > > > +
> > > > > +# Default target, can be overriden by command line or environment
> > > > > +RTE_TARGET ?= x86_64-native-linuxapp-gcc
> > > > > +
> > > > > +include $(RTE_SDK)/mk/rte.vars.mk
> > > > > +
> > > > > +# binary name
> > > > > +APP = distributor_app
> > > > > +
> > > > > +# all source are stored in SRCS-y SRCS-y := main.c
> > > > > +
> > > > > +CFLAGS += $(WERROR_FLAGS)
> > > > > +
> > > > > +# workaround for a gcc bug with noreturn attribute #
> > > > > +http://gcc.gnu.org/bugzilla/show_bug.cgi?id=12603
> > > > > +ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y) CFLAGS_main.o +=
> > > > > +-Wno-return-type endif
> > > > > +
> > > > > +EXTRA_CFLAGS += -O3 -Wfatal-errors
> > > > > +
> > > > > +include $(RTE_SDK)/mk/rte.extapp.mk
> > > > > diff --git a/examples/distributor_app/main.c
> > > > > b/examples/distributor_app/main.c new file mode 100644 index
> > > > > 0000000..f555d93
> > > > > --- /dev/null
> > > > > +++ b/examples/distributor_app/main.c
> > > > > @@ -0,0 +1,600 @@
> > > > > +/*-
> > > > > + *   BSD LICENSE
> > > > > + *
> > > > > + *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> > > > > + *   All rights reserved.
> > > > > + *
> > > > > + *   Redistribution and use in source and binary forms, with or without
> > > > > + *   modification, are permitted provided that the following conditions
> > > > > + *   are met:
> > > > > + *
> > > > > + *     * Redistributions of source code must retain the above copyright
> > > > > + *       notice, this list of conditions and the following disclaimer.
> > > > > + *     * Redistributions in binary form must reproduce the above copyright
> > > > > + *       notice, this list of conditions and the following disclaimer in
> > > > > + *       the documentation and/or other materials provided with the
> > > > > + *       distribution.
> > > > > + *     * Neither the name of Intel Corporation nor the names of its
> > > > > + *       contributors may be used to endorse or promote products derived
> > > > > + *       from this software without specific prior written permission.
> > > > > + *
> > > > > + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
> > CONTRIBUTORS
> > > > > + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
> > BUT NOT
> > > > > + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
> > FITNESS FOR
> > > > > + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
> > COPYRIGHT
> > > > > + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
> > INCIDENTAL,
> > > > > + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
> > BUT NOT
> > > > > + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
> > LOSS OF USE,
> > > > > + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
> > AND ON ANY
> > > > > + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
> > TORT
> > > > > + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
> > OF THE USE
> > > > > + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
> > DAMAGE.
> > > > > + */
> > > > > +
> > > > > +#include <stdint.h>
> > > > > +#include <inttypes.h>
> > > > > +#include <unistd.h>
> > > > > +#include <signal.h>
> > > > > +#include <getopt.h>
> > > > > +
> > > > > +#include <rte_eal.h>
> > > > > +#include <rte_ethdev.h>
> > > > > +#include <rte_cycles.h>
> > > > > +#include <rte_malloc.h>
> > > > > +#include <rte_debug.h>
> > > > > +#include <rte_distributor.h>
> > > > > +
> > > > > +#include "main.h"
> > > > > +
> > > > > +#define RX_RING_SIZE 256
> > > > > +#define RX_FREE_THRESH 32
> > > > > +#define RX_PTHRESH 8
> > > > > +#define RX_HTHRESH 8
> > > > > +#define RX_WTHRESH 0
> > > > > +
> > > > > +#define TX_RING_SIZE 512
> > > > > +#define TX_FREE_THRESH 32
> > > > > +#define TX_PTHRESH 32
> > > > > +#define TX_HTHRESH 0
> > > > > +#define TX_WTHRESH 0
> > > > > +#define TX_RSBIT_THRESH 32
> > > > > +#define TX_Q_FLAGS (ETH_TXQ_FLAGS_NOMULTSEGS |
> > ETH_TXQ_FLAGS_NOVLANOFFL |\
> > > > > +	ETH_TXQ_FLAGS_NOXSUMSCTP | ETH_TXQ_FLAGS_NOXSUMUDP | \
> > > > > +	ETH_TXQ_FLAGS_NOXSUMTCP)
> > > > > +
> > > > > +#define NUM_MBUFS ((64*1024)-1)
> > > > > +#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) +
> > > > > +RTE_PKTMBUF_HEADROOM) #define MBUF_CACHE_SIZE 250 #define
> > > > > +BURST_SIZE 32 #define RTE_RING_SZ 1024
> > > > > +
> > > > > +/* uncommnet below line to enable debug logs */
> > > > > +/* #define DEBUG */
> > > > > +
> > > > > +#ifdef DEBUG
> > > > > +#define LOG_LEVEL RTE_LOG_DEBUG
> > > > > +#define LOG_DEBUG(log_type, fmt, args...) do {	\
> > > > > +	RTE_LOG(DEBUG, log_type, fmt, ##args)		\
> > > > > +} while (0)
> > > > > +#else
> > > > > +#define LOG_LEVEL RTE_LOG_INFO
> > > > > +#define LOG_DEBUG(log_type, fmt, args...) do {} while (0) #endif
> > > > > +
> > > > > +#define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
> > > > > +
> > > > > +/* mask of enabled ports */
> > > > > +static uint32_t enabled_port_mask = 0;
> > > > > +
> > > > > +static volatile struct app_stats {
> > > > > +	struct {
> > > > > +		uint64_t rx_pkts;
> > > > > +		uint64_t returned_pkts;
> > > > > +		uint64_t enqueued_pkts;
> > > > > +	} rx __rte_cache_aligned;
> > > > > +
> > > > > +	struct {
> > > > > +		uint64_t dequeue_pkts;
> > > > > +		uint64_t tx_pkts;
> > > > > +	} tx __rte_cache_aligned;
> > > > > +} app_stats;
> > > > > +
> > > > > +static const struct rte_eth_conf port_conf_default = {
> > > > > +	.rxmode = {
> > > > > +		.mq_mode = ETH_MQ_RX_RSS,
> > > > > +		.max_rx_pkt_len = ETHER_MAX_LEN,
> > > > > +		.split_hdr_size = 0,
> > > > > +		.header_split   = 0, /**< Header Split disabled */
> > > > > +		.hw_ip_checksum = 0, /**< IP checksum offload enabled */
> > > > > +		.hw_vlan_filter = 0, /**< VLAN filtering disabled */
> > > > > +		.jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
> > > > > +		.hw_strip_crc   = 0, /**< CRC stripped by hardware */
> > > > > +	},
> > > > > +	.txmode = {
> > > > > +		.mq_mode = ETH_MQ_TX_NONE,
> > > > > +	},
> > > > > +	.lpbk_mode = 0,
> > > > > +	.rx_adv_conf = {
> > > > > +			.rss_conf = {
> > > > > +				.rss_hf = ETH_RSS_IPV4 | ETH_RSS_IPV6 |
> > > > > +					ETH_RSS_IPV4_TCP |
> > ETH_RSS_IPV4_UDP |
> > > > > +					ETH_RSS_IPV6_TCP |
> > ETH_RSS_IPV6_UDP,
> > > > > +			}
> > > > > +	},
> > > > > +};
> > > > > +
> > > > > +static const struct rte_eth_rxconf rx_conf_default = {
> > > > > +	.rx_thresh = {
> > > > > +		.pthresh = RX_PTHRESH,
> > > > > +		.hthresh = RX_HTHRESH,
> > > > > +		.wthresh = RX_WTHRESH,
> > > > > +	},
> > > > > +	.rx_free_thresh = RX_FREE_THRESH,
> > > > > +	.rx_drop_en = 0,
> > > > > +};
> > > > > +
> > > > > +static const struct rte_eth_txconf tx_conf_default = {
> > > > > +	.tx_thresh = {
> > > > > +		.pthresh = TX_PTHRESH,
> > > > > +		.hthresh = TX_HTHRESH,
> > > > > +		.wthresh = TX_WTHRESH,
> > > > > +	},
> > > > > +	.tx_free_thresh = TX_FREE_THRESH,
> > > > > +	.tx_rs_thresh = TX_RSBIT_THRESH,
> > > > > +	.txq_flags = TX_Q_FLAGS
> > > > > +
> > > > > +};
> > > > > +
> > > > > +struct output_buffer {
> > > > > +	unsigned count;
> > > > > +	struct rte_mbuf *mbufs[BURST_SIZE]; };
> > > > > +
> > > > > +/*
> > > > > + * Initialises a given port using global settings and with the rx
> > > > > +buffers
> > > > > + * coming from the mbuf_pool passed as parameter  */ static
> > > > > +inline int port_init(uint8_t port, struct rte_mempool *mbuf_pool)
> > > > > +{
> > > > > +	struct rte_eth_conf port_conf = port_conf_default;
> > > > > +	const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
> > > > > +	int retval;
> > > > > +	uint16_t q;
> > > > > +
> > > > > +	if (port >= rte_eth_dev_count())
> > > > > +		return -1;
> > > > > +
> > > > > +	retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf);
> > > > > +	if (retval != 0)
> > > > > +		return retval;
> > > > > +
> > > > > +	for (q = 0; q < rxRings; q++) {
> > > > > +		retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
> > > > > +						rte_eth_dev_socket_id(port),
> > > > > +						&rx_conf_default, mbuf_pool);
> > > > > +		if (retval < 0)
> > > > > +			return retval;
> > > > > +	}
> > > > > +
> > > > > +	for (q = 0; q < txRings; q++) {
> > > > > +		retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE,
> > > > > +						rte_eth_dev_socket_id(port),
> > > > > +						&tx_conf_default);
> > > > > +		if (retval < 0)
> > > > > +			return retval;
> > > > > +	}
> > > > > +
> > > > > +	retval  = rte_eth_dev_start(port);
> > > > > +	if (retval < 0)
> > > > > +		return retval;
> > > > > +
> > > > > +	struct rte_eth_link link;
> > > > > +	rte_eth_link_get_nowait(port, &link);
> > > > > +	if (!link.link_status) {
> > > > > +		sleep(1);
> > > > > +		rte_eth_link_get_nowait(port, &link);
> > > > > +	}
> > > > > +
> > > > > +	if (!link.link_status) {
> > > > > +		printf("Link down on port %"PRIu8"\n", port);
> > > > > +		return 0;
> > > > > +	}
> > > > > +
> > > > > +	struct ether_addr addr;
> > > > > +	rte_eth_macaddr_get(port, &addr);
> > > > > +	printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
> > > > > +			" %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
> > > > > +			(unsigned)port,
> > > > > +			addr.addr_bytes[0], addr.addr_bytes[1],
> > > > > +			addr.addr_bytes[2], addr.addr_bytes[3],
> > > > > +			addr.addr_bytes[4], addr.addr_bytes[5]);
> > > > > +
> > > > > +	rte_eth_promiscuous_enable(port);
> > > > > +
> > > > > +	return 0;
> > > > > +}
> > > > > +
> > > > > +struct lcore_params {
> > > > > +	unsigned worker_id;
> > > > > +	struct rte_distributor *d;
> > > > > +	struct rte_ring *r;
> > > > > +};
> > > > > +
> > > > > +static __attribute__((noreturn)) void lcore_rx(struct
> > > > > +lcore_params *p) {
> > > > > +	struct rte_distributor *d = p->d;
> > > > > +	struct rte_ring *r = p->r;
> > > > > +	const uint8_t nb_ports = rte_eth_dev_count();
> > > > > +	const int socket_id = rte_socket_id();
> > > > > +	uint8_t port;
> > > > > +
> > > > > +	for (port = 0; port < nb_ports; port++) {
> > > > > +		/* skip ports that are not enabled */
> > > > > +		if ((enabled_port_mask & (1 << port)) == 0)
> > > > > +			continue;
> > > > > +
> > > > > +		if (rte_eth_dev_socket_id(port) > 0 &&
> > > > > +				rte_eth_dev_socket_id(port) != socket_id)
> > > > > +			printf("WARNING, port %u is on remote NUMA node to
> > "
> > > > > +					"RX thread.\n\tPerformance will not "
> > > > > +					"be optimal.\n", port);
> > > > > +	}
> > > > > +
> > > > > +	printf("\nCore %u doing packet RX.\n", rte_lcore_id());
> > > > > +	port = 0;
> > > > > +	for (;;) {
> > > > > +		/* skip ports that are not enabled */
> > > > > +		if ((enabled_port_mask & (1 << port)) == 0) {
> > > > > +			if (++port == nb_ports)
> > > > > +				port = 0;
> > > > > +			continue;
> > > > > +		}
> > > > > +		struct rte_mbuf *bufs[BURST_SIZE*2];
> > > > > +		const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
> > > > > +				BURST_SIZE);
> > > > > +		app_stats.rx.rx_pkts += nb_rx;
> > > > > +
> > > > > +		rte_distributor_process(d, bufs, nb_rx);
> > > > > +		const uint16_t nb_ret = rte_distributor_returned_pkts(d,
> > > > > +				bufs, BURST_SIZE*2);
> > > > > +		app_stats.rx.returned_pkts += nb_ret;
> > > > > +		if (unlikely(nb_ret == 0))
> > > > > +			continue;
> > > > > +
> > > > > +		uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret);
> > > > > +		app_stats.rx.enqueued_pkts += sent;
> > > > > +		if (unlikely(sent < nb_ret)) {
> > > > > +			LOG_DEBUG(DISTRAPP, "%s:Packet loss due to full
> > ring\n", __func__);
> > > > > +			while (sent < nb_ret)
> > > > > +				rte_pktmbuf_free(bufs[sent++]);
> > > > > +		}
> > > > > +		if (++port == nb_ports)
> > > > > +			port = 0;
> > > > > +	}
> > > > > +}
> > > > > +
> > > > > +static inline void
> > > > > +flush_one_port(struct output_buffer *outbuf, uint8_t outp) {
> > > > > +	unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs,
> > > > > +			outbuf->count);
> > > > > +	app_stats.tx.tx_pkts += nb_tx;
> > > > > +
> > > > > +	if (unlikely(nb_tx < outbuf->count)) {
> > > > > +		LOG_DEBUG(DISTRAPP, "%s:Packet loss with tx_burst\n",
> > __func__);
> > > > > +		do {
> > > > > +			rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
> > > > > +		} while (++nb_tx < outbuf->count);
> > > > > +	}
> > > > > +	outbuf->count = 0;
> > > > > +}
> > > > > +
> > > > > +static inline void
> > > > > +flush_all_ports(struct output_buffer *tx_buffers, uint8_t
> > > > > +nb_ports) {
> > > > > +	uint8_t outp;
> > > > > +	for (outp = 0; outp < nb_ports; outp++) {
> > > > > +		/* skip ports that are not enabled */
> > > > > +		if ((enabled_port_mask & (1 << outp)) == 0)
> > > > > +			continue;
> > > > > +
> > > > > +		if (tx_buffers[outp].count == 0)
> > > > > +			continue;
> > > > > +
> > > > > +		flush_one_port(&tx_buffers[outp], outp);
> > > > > +	}
> > > > > +}
> > > > > +
> > > > > +static __attribute__((noreturn)) void lcore_tx(struct rte_ring
> > > > > +*in_r) {
> > > > > +	static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS];
> > > > > +	const uint8_t nb_ports = rte_eth_dev_count();
> > > > > +	const int socket_id = rte_socket_id();
> > > > > +	uint8_t port;
> > > > > +
> > > > > +	for (port = 0; port < nb_ports; port++) {
> > > > > +		/* skip ports that are not enabled */
> > > > > +		if ((enabled_port_mask & (1 << port)) == 0)
> > > > > +			continue;
> > > > > +
> > > > > +		if (rte_eth_dev_socket_id(port) > 0 &&
> > > > > +				rte_eth_dev_socket_id(port) != socket_id)
> > > > > +			printf("WARNING, port %u is on remote NUMA node to
> > "
> > > > > +					"TX thread.\n\tPerformance will not "
> > > > > +					"be optimal.\n", port);
> > > > > +	}
> > > > > +
> > > > > +	printf("\nCore %u doing packet TX.\n", rte_lcore_id());
> > > > > +	for (;;) {
> > > > > +		for (port = 0; port < nb_ports; port++) {
> > > > > +			/* skip ports that are not enabled */
> > > > > +			if ((enabled_port_mask & (1 << port)) == 0)
> > > > > +				continue;
> > > > > +
> > > > > +			struct rte_mbuf *bufs[BURST_SIZE];
> > > > > +			const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
> > > > > +					(void *)bufs, BURST_SIZE);
> > > > > +			app_stats.tx.dequeue_pkts += nb_rx;
> > > > > +
> > > > > +			/* if we get no traffic, flush anything we have */
> > > > > +			if (unlikely(nb_rx == 0)) {
> > > > > +				flush_all_ports(tx_buffers, nb_ports);
> > > > > +				continue;
> > > > > +			}
> > > > > +
> > > > > +			/* for traffic we receive, queue it up for transmit */
> > > > > +			uint16_t i;
> > > > > +			_mm_prefetch(bufs[0], 0);
> > > > > +			_mm_prefetch(bufs[1], 0);
> > > > > +			_mm_prefetch(bufs[2], 0);
> > > > > +			for (i = 0; i < nb_rx; i++) {
> > > > > +				struct output_buffer *outbuf;
> > > > > +				uint8_t outp;
> > > > > +				_mm_prefetch(bufs[i + 3], 0);
> > > > > +				/* workers should update in_port to hold the
> > > > > +				 * output port value */
> > > > > +				outp = bufs[i]->port;
> > > > > +				/* skip ports that are not enabled */
> > > > > +				if ((enabled_port_mask & (1 << outp)) == 0)
> > > > > +					continue;
> > > > > +
> > > > > +				outbuf = &tx_buffers[outp];
> > > > > +				outbuf->mbufs[outbuf->count++] = bufs[i];
> > > > > +				if (outbuf->count == BURST_SIZE)
> > > > > +					flush_one_port(outbuf, outp);
> > > > > +			}
> > > > > +		}
> > > > > +	}
> > > > > +}
> > > > > +
> > > > > +
> > > > > +static __attribute__((noreturn)) void lcore_worker(struct
> > > > > +lcore_params *p) {
> > > > > +	struct rte_distributor *d = p->d;
> > > > > +	const unsigned id = p->worker_id;
> > > > > +	/* for single port, xor_val will be zero so we won't modify the output
> > > > > +	 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa
> > > > > +	 */
> > > > > +	const unsigned xor_val = (rte_eth_dev_count() > 1);
> > > > > +	struct rte_mbuf *buf = NULL;
> > > > > +
> > > > > +	printf("\nCore %u acting as worker core.\n", rte_lcore_id());
> > > > > +	for (;;) {
> > > > > +		buf = rte_distributor_get_pkt(d, id, buf);
> > > > > +		buf->port ^= xor_val;
> > > > > +	}
> > > > > +}
> > > > > +
> > > > > +static void
> > > > > +int_handler(int sig_num)
> > > > > +{
> > > > > +	struct rte_eth_stats eth_stats;
> > > > > +	unsigned i;
> > > > > +
> > > > > +	printf("Exiting on signal %d\n", sig_num);
> > > > > +
> > > > > +	printf("\nRX thread stats:\n");
> > > > > +	printf(" - Received:    %"PRIu64"\n", app_stats.rx.rx_pkts);
> > > > > +	printf(" - Processed:   %"PRIu64"\n", app_stats.rx.returned_pkts);
> > > > > +	printf(" - Enqueued:    %"PRIu64"\n", app_stats.rx.enqueued_pkts);
> > > > > +
> > > > > +	printf("\nTX thread stats:\n");
> > > > > +	printf(" - Dequeued:    %"PRIu64"\n", app_stats.tx.dequeue_pkts);
> > > > > +	printf(" - Transmitted: %"PRIu64"\n", app_stats.tx.tx_pkts);
> > > > > +
> > > > > +	for (i = 0; i < rte_eth_dev_count(); i++) {
> > > > > +		rte_eth_stats_get(i, &eth_stats);
> > > > > +		printf("\nPort %u stats:\n", i);
> > > > > +		printf(" - Pkts in:   %"PRIu64"\n", eth_stats.ipackets);
> > > > > +		printf(" - Pkts out:  %"PRIu64"\n", eth_stats.opackets);
> > > > > +		printf(" - In Errs:   %"PRIu64"\n", eth_stats.ierrors);
> > > > > +		printf(" - Out Errs:  %"PRIu64"\n", eth_stats.oerrors);
> > > > > +		printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf);
> > > > > +	}
> > > > > +	exit(0);
> > > > rte_exit here?  Also, this is a pretty ungraceful exit strategy as
> > > > all the threads you've created and memory you've allocated are just
> > forgotten here.
> > > > Given that dpdk mempools are shared, this has the potential to leak
> > > > lots of memory if other apps are using the dpdk at the same time
> > > > that you run this.  You probably want to use the sigint handler to
> > > > raise a flag to the tx/rx threads to shutdown gracefully, and then free your
> > allocated memory and mempool.
> > > >
> > > > Neil
> > > >
> > >
> > > Unless the different processes are explicitly cooperating as
> > > primary/secondary, the mempools are not shared. I just don't see the
> > > need for this app to do more cleanup on ctrl-c signal, as it's not
> > > intended to be a multiprocess app, and there is little that any
> > > secondary process could do to work with this app, except possibly some
> > > resource monitoring, which would be completely unaffected by it exiting the
> > way it does.
> > >
> > Ah, ok, so we don't use a common shared pool between isolated processes
> > then, thats good.  Still though, this is a sample application, I think its lazy
> > programming practice to illustrate to application developers that its generally ok
> > to exit programs without freeing your resources.  Its about 20 lines of additional
> > code to change the sigint handler to flag an exit condition, and have all the
> > other threads join on it.
> > 
> > Neil
> 
> 1)I had sent v5 patch which handles graceful shutdown of rx and tx threads upon SIGINT
I see it and will take a look shortly, thanks.

> 2)Worker thread graceful shutdown was not handled as of now as it needs some change in lcore_worker logic , which will be done in future enhancements.
Not sure I understand what you mean here.  Can you elaborate?

> 3)Freeing of mempool is also not handled , as the framework support is not available.
Ew, I hadn't noticed that, freeing of mempools seems like something we should
implement.

> 4)Cleaning of rx/tx queues not done, as it needs some extensive logic which we haven't planned as of now. Will check the possibility of doing it in future  enhancements    i.e in next version of sample application.
We can't just flush the queues after we shutdown the workers?  I presume a queue
flush operation exists, yes?
Neil

> 
> /Reshma
> 
> > 
> > > /Bruce
> > >
> 
> 
> --------------------------------------------------------------
> Intel Shannon Limited
> Registered in Ireland
> Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
> Registered Number: 308263
> Business address: Dromore House, East Park, Shannon, Co. Clare
> 
> This e-mail and any attachments may contain confidential material for the sole use of the intended recipient(s). Any review or distribution by others is strictly prohibited. If you are not the intended recipient, please contact the sender and delete all copies.
> 
> 
>
  
Bruce Richardson Oct. 1, 2014, 3:37 p.m. UTC | #6
On Wed, Oct 01, 2014 at 10:56:20AM -0400, Neil Horman wrote:
> On Wed, Oct 01, 2014 at 02:47:00PM +0000, Pattan, Reshma wrote:
> > 
> > 
> > > -----Original Message-----
> > > From: Neil Horman [mailto:nhorman@tuxdriver.com]
> > > Sent: Tuesday, September 30, 2014 2:40 PM
> > > To: Richardson, Bruce
> > > Cc: Pattan, Reshma; dev@dpdk.org
> > > Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app
> > > 
> > > On Tue, Sep 30, 2014 at 01:18:28PM +0100, Bruce Richardson wrote:
> > > > On Tue, Sep 30, 2014 at 07:34:45AM -0400, Neil Horman wrote:
> > > > > On Tue, Sep 30, 2014 at 11:39:37AM +0100, reshmapa wrote:
> > > > > > From: Reshma Pattan <reshma.pattan@intel.com>
> > > > > >
> > > > > > A new sample app that shows the usage of the distributor library.
> > > > > > This app works as follows:
> > > > > >
> > > > > > * An RX thread runs which pulls packets from each ethernet port in turn
> > > > > >   and passes those packets to worker using a distributor component.
> > > > > > * The workers take the packets in turn, and determine the output port
> > > > > >   for those packets using basic l2forwarding doing an xor on the source
> > > > > >   port id.
> > > > > > * The RX thread takes the returned packets from the workers and enqueue
> > > > > >   those packets into an rte_ring structure.
> > > > > > * A TX thread pulls the packets off the rte_ring structure and then
> > > > > >   sends each packet out the output port specified previously by
> > > > > > the worker
> > > > > > * Command-line option support provided only for portmask.
> > > > > >
> > > > > > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> > > > > > Signed-off-by: Reshma Pattan    <reshma.pattan@intel.com>
> > > > > > ---
> > > > > >  examples/Makefile                 |    1 +
> > > > > >  examples/distributor_app/Makefile |   57 ++++
> > > > > >  examples/distributor_app/main.c   |  600
> > > +++++++++++++++++++++++++++++++++++++
> > > > > >  examples/distributor_app/main.h   |   46 +++
> > > > > >  4 files changed, 704 insertions(+), 0 deletions(-)  create mode
> > > > > > 100644 examples/distributor_app/Makefile  create mode 100644
> > > > > > examples/distributor_app/main.c  create mode 100644
> > > > > > examples/distributor_app/main.h
> > > > > >
> > > > > > diff --git a/examples/Makefile b/examples/Makefile index
> > > > > > 6245f83..2ba82b0 100644
> > > > > > --- a/examples/Makefile
> > > > > > +++ b/examples/Makefile
> > > > > > @@ -66,5 +66,6 @@ DIRS-y += vhost
> > > > > >  DIRS-$(CONFIG_RTE_LIBRTE_XEN_DOM0) += vhost_xen  DIRS-y += vmdq
> > > > > > DIRS-y += vmdq_dcb
> > > > > > +DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += distributor_app
> > > > > >
> > > > > >  include $(RTE_SDK)/mk/rte.extsubdir.mk diff --git
> > > > > > a/examples/distributor_app/Makefile
> > > > > > b/examples/distributor_app/Makefile
> > > > > > new file mode 100644
> > > > > > index 0000000..6a5bada
> > > > > > --- /dev/null
> > > > > > +++ b/examples/distributor_app/Makefile
> > > > > > @@ -0,0 +1,57 @@
> > > > > > +#   BSD LICENSE
> > > > > > +#
> > > > > > +#   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> > > > > > +#   All rights reserved.
> > > > > > +#
> > > > > > +#   Redistribution and use in source and binary forms, with or without
> > > > > > +#   modification, are permitted provided that the following conditions
> > > > > > +#   are met:
> > > > > > +#
> > > > > > +#     * Redistributions of source code must retain the above copyright
> > > > > > +#       notice, this list of conditions and the following disclaimer.
> > > > > > +#     * Redistributions in binary form must reproduce the above copyright
> > > > > > +#       notice, this list of conditions and the following disclaimer in
> > > > > > +#       the documentation and/or other materials provided with the
> > > > > > +#       distribution.
> > > > > > +#     * Neither the name of Intel Corporation nor the names of its
> > > > > > +#       contributors may be used to endorse or promote products derived
> > > > > > +#       from this software without specific prior written permission.
> > > > > > +#
> > > > > > +#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
> > > CONTRIBUTORS
> > > > > > +#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT
> > > NOT
> > > > > > +#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
> > > FITNESS FOR
> > > > > > +#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
> > > COPYRIGHT
> > > > > > +#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
> > > INCIDENTAL,
> > > > > > +#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
> > > BUT NOT
> > > > > > +#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
> > > LOSS OF USE,
> > > > > > +#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
> > > AND ON ANY
> > > > > > +#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
> > > TORT
> > > > > > +#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
> > > OF THE USE
> > > > > > +#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
> > > DAMAGE.
> > > > > > +
> > > > > > +ifeq ($(RTE_SDK),)
> > > > > > +$(error "Please define RTE_SDK environment variable") endif
> > > > > > +
> > > > > > +# Default target, can be overriden by command line or environment
> > > > > > +RTE_TARGET ?= x86_64-native-linuxapp-gcc
> > > > > > +
> > > > > > +include $(RTE_SDK)/mk/rte.vars.mk
> > > > > > +
> > > > > > +# binary name
> > > > > > +APP = distributor_app
> > > > > > +
> > > > > > +# all source are stored in SRCS-y SRCS-y := main.c
> > > > > > +
> > > > > > +CFLAGS += $(WERROR_FLAGS)
> > > > > > +
> > > > > > +# workaround for a gcc bug with noreturn attribute #
> > > > > > +http://gcc.gnu.org/bugzilla/show_bug.cgi?id=12603
> > > > > > +ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y) CFLAGS_main.o +=
> > > > > > +-Wno-return-type endif
> > > > > > +
> > > > > > +EXTRA_CFLAGS += -O3 -Wfatal-errors
> > > > > > +
> > > > > > +include $(RTE_SDK)/mk/rte.extapp.mk
> > > > > > diff --git a/examples/distributor_app/main.c
> > > > > > b/examples/distributor_app/main.c new file mode 100644 index
> > > > > > 0000000..f555d93
> > > > > > --- /dev/null
> > > > > > +++ b/examples/distributor_app/main.c
> > > > > > @@ -0,0 +1,600 @@
> > > > > > +/*-
> > > > > > + *   BSD LICENSE
> > > > > > + *
> > > > > > + *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> > > > > > + *   All rights reserved.
> > > > > > + *
> > > > > > + *   Redistribution and use in source and binary forms, with or without
> > > > > > + *   modification, are permitted provided that the following conditions
> > > > > > + *   are met:
> > > > > > + *
> > > > > > + *     * Redistributions of source code must retain the above copyright
> > > > > > + *       notice, this list of conditions and the following disclaimer.
> > > > > > + *     * Redistributions in binary form must reproduce the above copyright
> > > > > > + *       notice, this list of conditions and the following disclaimer in
> > > > > > + *       the documentation and/or other materials provided with the
> > > > > > + *       distribution.
> > > > > > + *     * Neither the name of Intel Corporation nor the names of its
> > > > > > + *       contributors may be used to endorse or promote products derived
> > > > > > + *       from this software without specific prior written permission.
> > > > > > + *
> > > > > > + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
> > > CONTRIBUTORS
> > > > > > + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
> > > BUT NOT
> > > > > > + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
> > > FITNESS FOR
> > > > > > + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
> > > COPYRIGHT
> > > > > > + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
> > > INCIDENTAL,
> > > > > > + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
> > > BUT NOT
> > > > > > + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
> > > LOSS OF USE,
> > > > > > + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
> > > AND ON ANY
> > > > > > + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
> > > TORT
> > > > > > + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
> > > OF THE USE
> > > > > > + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
> > > DAMAGE.
> > > > > > + */
> > > > > > +
> > > > > > +#include <stdint.h>
> > > > > > +#include <inttypes.h>
> > > > > > +#include <unistd.h>
> > > > > > +#include <signal.h>
> > > > > > +#include <getopt.h>
> > > > > > +
> > > > > > +#include <rte_eal.h>
> > > > > > +#include <rte_ethdev.h>
> > > > > > +#include <rte_cycles.h>
> > > > > > +#include <rte_malloc.h>
> > > > > > +#include <rte_debug.h>
> > > > > > +#include <rte_distributor.h>
> > > > > > +
> > > > > > +#include "main.h"
> > > > > > +
> > > > > > +#define RX_RING_SIZE 256
> > > > > > +#define RX_FREE_THRESH 32
> > > > > > +#define RX_PTHRESH 8
> > > > > > +#define RX_HTHRESH 8
> > > > > > +#define RX_WTHRESH 0
> > > > > > +
> > > > > > +#define TX_RING_SIZE 512
> > > > > > +#define TX_FREE_THRESH 32
> > > > > > +#define TX_PTHRESH 32
> > > > > > +#define TX_HTHRESH 0
> > > > > > +#define TX_WTHRESH 0
> > > > > > +#define TX_RSBIT_THRESH 32
> > > > > > +#define TX_Q_FLAGS (ETH_TXQ_FLAGS_NOMULTSEGS |
> > > ETH_TXQ_FLAGS_NOVLANOFFL |\
> > > > > > +	ETH_TXQ_FLAGS_NOXSUMSCTP | ETH_TXQ_FLAGS_NOXSUMUDP | \
> > > > > > +	ETH_TXQ_FLAGS_NOXSUMTCP)
> > > > > > +
> > > > > > +#define NUM_MBUFS ((64*1024)-1)
> > > > > > +#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) +
> > > > > > +RTE_PKTMBUF_HEADROOM) #define MBUF_CACHE_SIZE 250 #define
> > > > > > +BURST_SIZE 32 #define RTE_RING_SZ 1024
> > > > > > +
> > > > > > +/* uncommnet below line to enable debug logs */
> > > > > > +/* #define DEBUG */
> > > > > > +
> > > > > > +#ifdef DEBUG
> > > > > > +#define LOG_LEVEL RTE_LOG_DEBUG
> > > > > > +#define LOG_DEBUG(log_type, fmt, args...) do {	\
> > > > > > +	RTE_LOG(DEBUG, log_type, fmt, ##args)		\
> > > > > > +} while (0)
> > > > > > +#else
> > > > > > +#define LOG_LEVEL RTE_LOG_INFO
> > > > > > +#define LOG_DEBUG(log_type, fmt, args...) do {} while (0) #endif
> > > > > > +
> > > > > > +#define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
> > > > > > +
> > > > > > +/* mask of enabled ports */
> > > > > > +static uint32_t enabled_port_mask = 0;
> > > > > > +
> > > > > > +static volatile struct app_stats {
> > > > > > +	struct {
> > > > > > +		uint64_t rx_pkts;
> > > > > > +		uint64_t returned_pkts;
> > > > > > +		uint64_t enqueued_pkts;
> > > > > > +	} rx __rte_cache_aligned;
> > > > > > +
> > > > > > +	struct {
> > > > > > +		uint64_t dequeue_pkts;
> > > > > > +		uint64_t tx_pkts;
> > > > > > +	} tx __rte_cache_aligned;
> > > > > > +} app_stats;
> > > > > > +
> > > > > > +static const struct rte_eth_conf port_conf_default = {
> > > > > > +	.rxmode = {
> > > > > > +		.mq_mode = ETH_MQ_RX_RSS,
> > > > > > +		.max_rx_pkt_len = ETHER_MAX_LEN,
> > > > > > +		.split_hdr_size = 0,
> > > > > > +		.header_split   = 0, /**< Header Split disabled */
> > > > > > +		.hw_ip_checksum = 0, /**< IP checksum offload enabled */
> > > > > > +		.hw_vlan_filter = 0, /**< VLAN filtering disabled */
> > > > > > +		.jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
> > > > > > +		.hw_strip_crc   = 0, /**< CRC stripped by hardware */
> > > > > > +	},
> > > > > > +	.txmode = {
> > > > > > +		.mq_mode = ETH_MQ_TX_NONE,
> > > > > > +	},
> > > > > > +	.lpbk_mode = 0,
> > > > > > +	.rx_adv_conf = {
> > > > > > +			.rss_conf = {
> > > > > > +				.rss_hf = ETH_RSS_IPV4 | ETH_RSS_IPV6 |
> > > > > > +					ETH_RSS_IPV4_TCP |
> > > ETH_RSS_IPV4_UDP |
> > > > > > +					ETH_RSS_IPV6_TCP |
> > > ETH_RSS_IPV6_UDP,
> > > > > > +			}
> > > > > > +	},
> > > > > > +};
> > > > > > +
> > > > > > +static const struct rte_eth_rxconf rx_conf_default = {
> > > > > > +	.rx_thresh = {
> > > > > > +		.pthresh = RX_PTHRESH,
> > > > > > +		.hthresh = RX_HTHRESH,
> > > > > > +		.wthresh = RX_WTHRESH,
> > > > > > +	},
> > > > > > +	.rx_free_thresh = RX_FREE_THRESH,
> > > > > > +	.rx_drop_en = 0,
> > > > > > +};
> > > > > > +
> > > > > > +static const struct rte_eth_txconf tx_conf_default = {
> > > > > > +	.tx_thresh = {
> > > > > > +		.pthresh = TX_PTHRESH,
> > > > > > +		.hthresh = TX_HTHRESH,
> > > > > > +		.wthresh = TX_WTHRESH,
> > > > > > +	},
> > > > > > +	.tx_free_thresh = TX_FREE_THRESH,
> > > > > > +	.tx_rs_thresh = TX_RSBIT_THRESH,
> > > > > > +	.txq_flags = TX_Q_FLAGS
> > > > > > +
> > > > > > +};
> > > > > > +
> > > > > > +struct output_buffer {
> > > > > > +	unsigned count;
> > > > > > +	struct rte_mbuf *mbufs[BURST_SIZE]; };
> > > > > > +
> > > > > > +/*
> > > > > > + * Initialises a given port using global settings and with the rx
> > > > > > +buffers
> > > > > > + * coming from the mbuf_pool passed as parameter  */ static
> > > > > > +inline int port_init(uint8_t port, struct rte_mempool *mbuf_pool)
> > > > > > +{
> > > > > > +	struct rte_eth_conf port_conf = port_conf_default;
> > > > > > +	const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
> > > > > > +	int retval;
> > > > > > +	uint16_t q;
> > > > > > +
> > > > > > +	if (port >= rte_eth_dev_count())
> > > > > > +		return -1;
> > > > > > +
> > > > > > +	retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf);
> > > > > > +	if (retval != 0)
> > > > > > +		return retval;
> > > > > > +
> > > > > > +	for (q = 0; q < rxRings; q++) {
> > > > > > +		retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
> > > > > > +						rte_eth_dev_socket_id(port),
> > > > > > +						&rx_conf_default, mbuf_pool);
> > > > > > +		if (retval < 0)
> > > > > > +			return retval;
> > > > > > +	}
> > > > > > +
> > > > > > +	for (q = 0; q < txRings; q++) {
> > > > > > +		retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE,
> > > > > > +						rte_eth_dev_socket_id(port),
> > > > > > +						&tx_conf_default);
> > > > > > +		if (retval < 0)
> > > > > > +			return retval;
> > > > > > +	}
> > > > > > +
> > > > > > +	retval  = rte_eth_dev_start(port);
> > > > > > +	if (retval < 0)
> > > > > > +		return retval;
> > > > > > +
> > > > > > +	struct rte_eth_link link;
> > > > > > +	rte_eth_link_get_nowait(port, &link);
> > > > > > +	if (!link.link_status) {
> > > > > > +		sleep(1);
> > > > > > +		rte_eth_link_get_nowait(port, &link);
> > > > > > +	}
> > > > > > +
> > > > > > +	if (!link.link_status) {
> > > > > > +		printf("Link down on port %"PRIu8"\n", port);
> > > > > > +		return 0;
> > > > > > +	}
> > > > > > +
> > > > > > +	struct ether_addr addr;
> > > > > > +	rte_eth_macaddr_get(port, &addr);
> > > > > > +	printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
> > > > > > +			" %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
> > > > > > +			(unsigned)port,
> > > > > > +			addr.addr_bytes[0], addr.addr_bytes[1],
> > > > > > +			addr.addr_bytes[2], addr.addr_bytes[3],
> > > > > > +			addr.addr_bytes[4], addr.addr_bytes[5]);
> > > > > > +
> > > > > > +	rte_eth_promiscuous_enable(port);
> > > > > > +
> > > > > > +	return 0;
> > > > > > +}
> > > > > > +
> > > > > > +struct lcore_params {
> > > > > > +	unsigned worker_id;
> > > > > > +	struct rte_distributor *d;
> > > > > > +	struct rte_ring *r;
> > > > > > +};
> > > > > > +
> > > > > > +static __attribute__((noreturn)) void lcore_rx(struct
> > > > > > +lcore_params *p) {
> > > > > > +	struct rte_distributor *d = p->d;
> > > > > > +	struct rte_ring *r = p->r;
> > > > > > +	const uint8_t nb_ports = rte_eth_dev_count();
> > > > > > +	const int socket_id = rte_socket_id();
> > > > > > +	uint8_t port;
> > > > > > +
> > > > > > +	for (port = 0; port < nb_ports; port++) {
> > > > > > +		/* skip ports that are not enabled */
> > > > > > +		if ((enabled_port_mask & (1 << port)) == 0)
> > > > > > +			continue;
> > > > > > +
> > > > > > +		if (rte_eth_dev_socket_id(port) > 0 &&
> > > > > > +				rte_eth_dev_socket_id(port) != socket_id)
> > > > > > +			printf("WARNING, port %u is on remote NUMA node to
> > > "
> > > > > > +					"RX thread.\n\tPerformance will not "
> > > > > > +					"be optimal.\n", port);
> > > > > > +	}
> > > > > > +
> > > > > > +	printf("\nCore %u doing packet RX.\n", rte_lcore_id());
> > > > > > +	port = 0;
> > > > > > +	for (;;) {
> > > > > > +		/* skip ports that are not enabled */
> > > > > > +		if ((enabled_port_mask & (1 << port)) == 0) {
> > > > > > +			if (++port == nb_ports)
> > > > > > +				port = 0;
> > > > > > +			continue;
> > > > > > +		}
> > > > > > +		struct rte_mbuf *bufs[BURST_SIZE*2];
> > > > > > +		const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
> > > > > > +				BURST_SIZE);
> > > > > > +		app_stats.rx.rx_pkts += nb_rx;
> > > > > > +
> > > > > > +		rte_distributor_process(d, bufs, nb_rx);
> > > > > > +		const uint16_t nb_ret = rte_distributor_returned_pkts(d,
> > > > > > +				bufs, BURST_SIZE*2);
> > > > > > +		app_stats.rx.returned_pkts += nb_ret;
> > > > > > +		if (unlikely(nb_ret == 0))
> > > > > > +			continue;
> > > > > > +
> > > > > > +		uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret);
> > > > > > +		app_stats.rx.enqueued_pkts += sent;
> > > > > > +		if (unlikely(sent < nb_ret)) {
> > > > > > +			LOG_DEBUG(DISTRAPP, "%s:Packet loss due to full
> > > ring\n", __func__);
> > > > > > +			while (sent < nb_ret)
> > > > > > +				rte_pktmbuf_free(bufs[sent++]);
> > > > > > +		}
> > > > > > +		if (++port == nb_ports)
> > > > > > +			port = 0;
> > > > > > +	}
> > > > > > +}
> > > > > > +
> > > > > > +static inline void
> > > > > > +flush_one_port(struct output_buffer *outbuf, uint8_t outp) {
> > > > > > +	unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs,
> > > > > > +			outbuf->count);
> > > > > > +	app_stats.tx.tx_pkts += nb_tx;
> > > > > > +
> > > > > > +	if (unlikely(nb_tx < outbuf->count)) {
> > > > > > +		LOG_DEBUG(DISTRAPP, "%s:Packet loss with tx_burst\n",
> > > __func__);
> > > > > > +		do {
> > > > > > +			rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
> > > > > > +		} while (++nb_tx < outbuf->count);
> > > > > > +	}
> > > > > > +	outbuf->count = 0;
> > > > > > +}
> > > > > > +
> > > > > > +static inline void
> > > > > > +flush_all_ports(struct output_buffer *tx_buffers, uint8_t
> > > > > > +nb_ports) {
> > > > > > +	uint8_t outp;
> > > > > > +	for (outp = 0; outp < nb_ports; outp++) {
> > > > > > +		/* skip ports that are not enabled */
> > > > > > +		if ((enabled_port_mask & (1 << outp)) == 0)
> > > > > > +			continue;
> > > > > > +
> > > > > > +		if (tx_buffers[outp].count == 0)
> > > > > > +			continue;
> > > > > > +
> > > > > > +		flush_one_port(&tx_buffers[outp], outp);
> > > > > > +	}
> > > > > > +}
> > > > > > +
> > > > > > +static __attribute__((noreturn)) void lcore_tx(struct rte_ring
> > > > > > +*in_r) {
> > > > > > +	static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS];
> > > > > > +	const uint8_t nb_ports = rte_eth_dev_count();
> > > > > > +	const int socket_id = rte_socket_id();
> > > > > > +	uint8_t port;
> > > > > > +
> > > > > > +	for (port = 0; port < nb_ports; port++) {
> > > > > > +		/* skip ports that are not enabled */
> > > > > > +		if ((enabled_port_mask & (1 << port)) == 0)
> > > > > > +			continue;
> > > > > > +
> > > > > > +		if (rte_eth_dev_socket_id(port) > 0 &&
> > > > > > +				rte_eth_dev_socket_id(port) != socket_id)
> > > > > > +			printf("WARNING, port %u is on remote NUMA node to
> > > "
> > > > > > +					"TX thread.\n\tPerformance will not "
> > > > > > +					"be optimal.\n", port);
> > > > > > +	}
> > > > > > +
> > > > > > +	printf("\nCore %u doing packet TX.\n", rte_lcore_id());
> > > > > > +	for (;;) {
> > > > > > +		for (port = 0; port < nb_ports; port++) {
> > > > > > +			/* skip ports that are not enabled */
> > > > > > +			if ((enabled_port_mask & (1 << port)) == 0)
> > > > > > +				continue;
> > > > > > +
> > > > > > +			struct rte_mbuf *bufs[BURST_SIZE];
> > > > > > +			const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
> > > > > > +					(void *)bufs, BURST_SIZE);
> > > > > > +			app_stats.tx.dequeue_pkts += nb_rx;
> > > > > > +
> > > > > > +			/* if we get no traffic, flush anything we have */
> > > > > > +			if (unlikely(nb_rx == 0)) {
> > > > > > +				flush_all_ports(tx_buffers, nb_ports);
> > > > > > +				continue;
> > > > > > +			}
> > > > > > +
> > > > > > +			/* for traffic we receive, queue it up for transmit */
> > > > > > +			uint16_t i;
> > > > > > +			_mm_prefetch(bufs[0], 0);
> > > > > > +			_mm_prefetch(bufs[1], 0);
> > > > > > +			_mm_prefetch(bufs[2], 0);
> > > > > > +			for (i = 0; i < nb_rx; i++) {
> > > > > > +				struct output_buffer *outbuf;
> > > > > > +				uint8_t outp;
> > > > > > +				_mm_prefetch(bufs[i + 3], 0);
> > > > > > +				/* workers should update in_port to hold the
> > > > > > +				 * output port value */
> > > > > > +				outp = bufs[i]->port;
> > > > > > +				/* skip ports that are not enabled */
> > > > > > +				if ((enabled_port_mask & (1 << outp)) == 0)
> > > > > > +					continue;
> > > > > > +
> > > > > > +				outbuf = &tx_buffers[outp];
> > > > > > +				outbuf->mbufs[outbuf->count++] = bufs[i];
> > > > > > +				if (outbuf->count == BURST_SIZE)
> > > > > > +					flush_one_port(outbuf, outp);
> > > > > > +			}
> > > > > > +		}
> > > > > > +	}
> > > > > > +}
> > > > > > +
> > > > > > +
> > > > > > +static __attribute__((noreturn)) void lcore_worker(struct
> > > > > > +lcore_params *p) {
> > > > > > +	struct rte_distributor *d = p->d;
> > > > > > +	const unsigned id = p->worker_id;
> > > > > > +	/* for single port, xor_val will be zero so we won't modify the output
> > > > > > +	 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa
> > > > > > +	 */
> > > > > > +	const unsigned xor_val = (rte_eth_dev_count() > 1);
> > > > > > +	struct rte_mbuf *buf = NULL;
> > > > > > +
> > > > > > +	printf("\nCore %u acting as worker core.\n", rte_lcore_id());
> > > > > > +	for (;;) {
> > > > > > +		buf = rte_distributor_get_pkt(d, id, buf);
> > > > > > +		buf->port ^= xor_val;
> > > > > > +	}
> > > > > > +}
> > > > > > +
> > > > > > +static void
> > > > > > +int_handler(int sig_num)
> > > > > > +{
> > > > > > +	struct rte_eth_stats eth_stats;
> > > > > > +	unsigned i;
> > > > > > +
> > > > > > +	printf("Exiting on signal %d\n", sig_num);
> > > > > > +
> > > > > > +	printf("\nRX thread stats:\n");
> > > > > > +	printf(" - Received:    %"PRIu64"\n", app_stats.rx.rx_pkts);
> > > > > > +	printf(" - Processed:   %"PRIu64"\n", app_stats.rx.returned_pkts);
> > > > > > +	printf(" - Enqueued:    %"PRIu64"\n", app_stats.rx.enqueued_pkts);
> > > > > > +
> > > > > > +	printf("\nTX thread stats:\n");
> > > > > > +	printf(" - Dequeued:    %"PRIu64"\n", app_stats.tx.dequeue_pkts);
> > > > > > +	printf(" - Transmitted: %"PRIu64"\n", app_stats.tx.tx_pkts);
> > > > > > +
> > > > > > +	for (i = 0; i < rte_eth_dev_count(); i++) {
> > > > > > +		rte_eth_stats_get(i, &eth_stats);
> > > > > > +		printf("\nPort %u stats:\n", i);
> > > > > > +		printf(" - Pkts in:   %"PRIu64"\n", eth_stats.ipackets);
> > > > > > +		printf(" - Pkts out:  %"PRIu64"\n", eth_stats.opackets);
> > > > > > +		printf(" - In Errs:   %"PRIu64"\n", eth_stats.ierrors);
> > > > > > +		printf(" - Out Errs:  %"PRIu64"\n", eth_stats.oerrors);
> > > > > > +		printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf);
> > > > > > +	}
> > > > > > +	exit(0);
> > > > > rte_exit here?  Also, this is a pretty ungraceful exit strategy as
> > > > > all the threads you've created and memory you've allocated are just
> > > forgotten here.
> > > > > Given that dpdk mempools are shared, this has the potential to leak
> > > > > lots of memory if other apps are using the dpdk at the same time
> > > > > that you run this.  You probably want to use the sigint handler to
> > > > > raise a flag to the tx/rx threads to shutdown gracefully, and then free your
> > > allocated memory and mempool.
> > > > >
> > > > > Neil
> > > > >
> > > >
> > > > Unless the different processes are explicitly cooperating as
> > > > primary/secondary, the mempools are not shared. I just don't see the
> > > > need for this app to do more cleanup on ctrl-c signal, as it's not
> > > > intended to be a multiprocess app, and there is little that any
> > > > secondary process could do to work with this app, except possibly some
> > > > resource monitoring, which would be completely unaffected by it exiting the
> > > way it does.
> > > >
> > > Ah, ok, so we don't use a common shared pool between isolated processes
> > > then, thats good.  Still though, this is a sample application, I think its lazy
> > > programming practice to illustrate to application developers that its generally ok
> > > to exit programs without freeing your resources.  Its about 20 lines of additional
> > > code to change the sigint handler to flag an exit condition, and have all the
> > > other threads join on it.
> > > 
> > > Neil
> > 
> > 1)I had sent v5 patch which handles graceful shutdown of rx and tx threads upon SIGINT
> I see it and will take a look shortly, thanks.
> 
> > 2)Worker thread graceful shutdown was not handled as of now as it needs some change in lcore_worker logic , which will be done in future enhancements.
> Not sure I understand what you mean here.  Can you elaborate?
> 
> > 3)Freeing of mempool is also not handled , as the framework support is not available.
> Ew, I hadn't noticed that, freeing of mempools seems like something we should
> implement.
> 
> > 4)Cleaning of rx/tx queues not done, as it needs some extensive logic which we haven't planned as of now. Will check the possibility of doing it in future  enhancements    i.e in next version of sample application.
> We can't just flush the queues after we shutdown the workers?  I presume a queue
> flush operation exists, yes?
> Neil

Other than code hygiene, which does have some value in itself, I can't 
really see what the practical point of such cleanup would be. 

If traffic is going through the system, and the process is killed packets 
will be dropped, whatever we do, as packet reception will stop. If traffic 
is not going through the system, then there are no packets in flight and 
therefore no relevant cleanup to be done. [And if the traffic is stopped 
just before shutting down the app, at a throughput rate of a couple of 
million packets per second, the app should be flushed of packets within tiny 
fractions of a second].

So overall, I just don't see complicated jumping through hoops for flushing 
and cleaning up things being worth the effort. These apps are designed to 
run in a forever loop, and, in the exception case, when killed the cleanup 
done by kernel is sufficient.

regards,
/Bruce
  
Neil Horman Oct. 1, 2014, 4:07 p.m. UTC | #7
> > > 
> > > 1)I had sent v5 patch which handles graceful shutdown of rx and tx threads upon SIGINT
> > I see it and will take a look shortly, thanks.
> > 
> > > 2)Worker thread graceful shutdown was not handled as of now as it needs some change in lcore_worker logic , which will be done in future enhancements.
> > Not sure I understand what you mean here.  Can you elaborate?
> > 
> > > 3)Freeing of mempool is also not handled , as the framework support is not available.
> > Ew, I hadn't noticed that, freeing of mempools seems like something we should
> > implement.
> > 
> > > 4)Cleaning of rx/tx queues not done, as it needs some extensive logic which we haven't planned as of now. Will check the possibility of doing it in future  enhancements    i.e in next version of sample application.
> > We can't just flush the queues after we shutdown the workers?  I presume a queue
> > flush operation exists, yes?
> > Neil
> 
> Other than code hygiene, which does have some value in itself, I can't 
> really see what the practical point of such cleanup would be. 
> 
This is really the only assertion I'm trying to make.  I understand this
application won't suffer from exiting uncleanly, and that makes the need for
preforming cleanup little more than overhead.

But that said, hygine is exactly the point I'm driving at here.  These are
example applications, that presumably people look at when writing their own
apps.  If you don't do things properly, people looking at your code are less
likely to do them as well.  Even if it doesn't hurt for you to exit uncleanly,
it will hurt someone, and if they look to these examples as a source of best
practices, it seems to me that it would be in everyones interest, if best
practices were demonstrated.

Neil
  
Ananyev, Konstantin Oct. 2, 2014, 9:04 a.m. UTC | #8
> -----Original Message-----
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Bruce Richardson
> Sent: Wednesday, October 01, 2014 4:38 PM
> To: Neil Horman
> Cc: dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app
> 
> On Wed, Oct 01, 2014 at 10:56:20AM -0400, Neil Horman wrote:
> > On Wed, Oct 01, 2014 at 02:47:00PM +0000, Pattan, Reshma wrote:
> > >
> > >
> > > > -----Original Message-----
> > > > From: Neil Horman [mailto:nhorman@tuxdriver.com]
> > > > Sent: Tuesday, September 30, 2014 2:40 PM
> > > > To: Richardson, Bruce
> > > > Cc: Pattan, Reshma; dev@dpdk.org
> > > > Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app
> > > >
> > > > On Tue, Sep 30, 2014 at 01:18:28PM +0100, Bruce Richardson wrote:
> > > > > On Tue, Sep 30, 2014 at 07:34:45AM -0400, Neil Horman wrote:
> > > > > > On Tue, Sep 30, 2014 at 11:39:37AM +0100, reshmapa wrote:
> > > > > > > From: Reshma Pattan <reshma.pattan@intel.com>
> > > > > > >
> > > > > > > A new sample app that shows the usage of the distributor library.
> > > > > > > This app works as follows:
> > > > > > >
> > > > > > > * An RX thread runs which pulls packets from each ethernet port in turn
> > > > > > >   and passes those packets to worker using a distributor component.
> > > > > > > * The workers take the packets in turn, and determine the output port
> > > > > > >   for those packets using basic l2forwarding doing an xor on the source
> > > > > > >   port id.
> > > > > > > * The RX thread takes the returned packets from the workers and enqueue
> > > > > > >   those packets into an rte_ring structure.
> > > > > > > * A TX thread pulls the packets off the rte_ring structure and then
> > > > > > >   sends each packet out the output port specified previously by
> > > > > > > the worker
> > > > > > > * Command-line option support provided only for portmask.
> > > > > > >
> > > > > > > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> > > > > > > Signed-off-by: Reshma Pattan    <reshma.pattan@intel.com>
> > > > > > > ---
> > > > > > >  examples/Makefile                 |    1 +
> > > > > > >  examples/distributor_app/Makefile |   57 ++++
> > > > > > >  examples/distributor_app/main.c   |  600
> > > > +++++++++++++++++++++++++++++++++++++
> > > > > > >  examples/distributor_app/main.h   |   46 +++
> > > > > > >  4 files changed, 704 insertions(+), 0 deletions(-)  create mode
> > > > > > > 100644 examples/distributor_app/Makefile  create mode 100644
> > > > > > > examples/distributor_app/main.c  create mode 100644
> > > > > > > examples/distributor_app/main.h
> > > > > > >
> > > > > > > diff --git a/examples/Makefile b/examples/Makefile index
> > > > > > > 6245f83..2ba82b0 100644
> > > > > > > --- a/examples/Makefile
> > > > > > > +++ b/examples/Makefile
> > > > > > > @@ -66,5 +66,6 @@ DIRS-y += vhost
> > > > > > >  DIRS-$(CONFIG_RTE_LIBRTE_XEN_DOM0) += vhost_xen  DIRS-y += vmdq
> > > > > > > DIRS-y += vmdq_dcb
> > > > > > > +DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += distributor_app
> > > > > > >
> > > > > > >  include $(RTE_SDK)/mk/rte.extsubdir.mk diff --git
> > > > > > > a/examples/distributor_app/Makefile
> > > > > > > b/examples/distributor_app/Makefile
> > > > > > > new file mode 100644
> > > > > > > index 0000000..6a5bada
> > > > > > > --- /dev/null
> > > > > > > +++ b/examples/distributor_app/Makefile
> > > > > > > @@ -0,0 +1,57 @@
> > > > > > > +#   BSD LICENSE
> > > > > > > +#
> > > > > > > +#   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> > > > > > > +#   All rights reserved.
> > > > > > > +#
> > > > > > > +#   Redistribution and use in source and binary forms, with or without
> > > > > > > +#   modification, are permitted provided that the following conditions
> > > > > > > +#   are met:
> > > > > > > +#
> > > > > > > +#     * Redistributions of source code must retain the above copyright
> > > > > > > +#       notice, this list of conditions and the following disclaimer.
> > > > > > > +#     * Redistributions in binary form must reproduce the above copyright
> > > > > > > +#       notice, this list of conditions and the following disclaimer in
> > > > > > > +#       the documentation and/or other materials provided with the
> > > > > > > +#       distribution.
> > > > > > > +#     * Neither the name of Intel Corporation nor the names of its
> > > > > > > +#       contributors may be used to endorse or promote products derived
> > > > > > > +#       from this software without specific prior written permission.
> > > > > > > +#
> > > > > > > +#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
> > > > CONTRIBUTORS
> > > > > > > +#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT
> > > > NOT
> > > > > > > +#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
> > > > FITNESS FOR
> > > > > > > +#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
> > > > COPYRIGHT
> > > > > > > +#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
> > > > INCIDENTAL,
> > > > > > > +#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
> > > > BUT NOT
> > > > > > > +#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
> > > > LOSS OF USE,
> > > > > > > +#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
> > > > AND ON ANY
> > > > > > > +#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
> > > > TORT
> > > > > > > +#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
> > > > OF THE USE
> > > > > > > +#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
> > > > DAMAGE.
> > > > > > > +
> > > > > > > +ifeq ($(RTE_SDK),)
> > > > > > > +$(error "Please define RTE_SDK environment variable") endif
> > > > > > > +
> > > > > > > +# Default target, can be overriden by command line or environment
> > > > > > > +RTE_TARGET ?= x86_64-native-linuxapp-gcc
> > > > > > > +
> > > > > > > +include $(RTE_SDK)/mk/rte.vars.mk
> > > > > > > +
> > > > > > > +# binary name
> > > > > > > +APP = distributor_app
> > > > > > > +
> > > > > > > +# all source are stored in SRCS-y SRCS-y := main.c
> > > > > > > +
> > > > > > > +CFLAGS += $(WERROR_FLAGS)
> > > > > > > +
> > > > > > > +# workaround for a gcc bug with noreturn attribute #
> > > > > > > +http://gcc.gnu.org/bugzilla/show_bug.cgi?id=12603
> > > > > > > +ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y) CFLAGS_main.o +=
> > > > > > > +-Wno-return-type endif
> > > > > > > +
> > > > > > > +EXTRA_CFLAGS += -O3 -Wfatal-errors
> > > > > > > +
> > > > > > > +include $(RTE_SDK)/mk/rte.extapp.mk
> > > > > > > diff --git a/examples/distributor_app/main.c
> > > > > > > b/examples/distributor_app/main.c new file mode 100644 index
> > > > > > > 0000000..f555d93
> > > > > > > --- /dev/null
> > > > > > > +++ b/examples/distributor_app/main.c
> > > > > > > @@ -0,0 +1,600 @@
> > > > > > > +/*-
> > > > > > > + *   BSD LICENSE
> > > > > > > + *
> > > > > > > + *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
> > > > > > > + *   All rights reserved.
> > > > > > > + *
> > > > > > > + *   Redistribution and use in source and binary forms, with or without
> > > > > > > + *   modification, are permitted provided that the following conditions
> > > > > > > + *   are met:
> > > > > > > + *
> > > > > > > + *     * Redistributions of source code must retain the above copyright
> > > > > > > + *       notice, this list of conditions and the following disclaimer.
> > > > > > > + *     * Redistributions in binary form must reproduce the above copyright
> > > > > > > + *       notice, this list of conditions and the following disclaimer in
> > > > > > > + *       the documentation and/or other materials provided with the
> > > > > > > + *       distribution.
> > > > > > > + *     * Neither the name of Intel Corporation nor the names of its
> > > > > > > + *       contributors may be used to endorse or promote products derived
> > > > > > > + *       from this software without specific prior written permission.
> > > > > > > + *
> > > > > > > + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
> > > > CONTRIBUTORS
> > > > > > > + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
> > > > BUT NOT
> > > > > > > + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
> > > > FITNESS FOR
> > > > > > > + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
> > > > COPYRIGHT
> > > > > > > + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
> > > > INCIDENTAL,
> > > > > > > + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
> > > > BUT NOT
> > > > > > > + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
> > > > LOSS OF USE,
> > > > > > > + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
> > > > AND ON ANY
> > > > > > > + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
> > > > TORT
> > > > > > > + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
> > > > OF THE USE
> > > > > > > + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
> > > > DAMAGE.
> > > > > > > + */
> > > > > > > +
> > > > > > > +#include <stdint.h>
> > > > > > > +#include <inttypes.h>
> > > > > > > +#include <unistd.h>
> > > > > > > +#include <signal.h>
> > > > > > > +#include <getopt.h>
> > > > > > > +
> > > > > > > +#include <rte_eal.h>
> > > > > > > +#include <rte_ethdev.h>
> > > > > > > +#include <rte_cycles.h>
> > > > > > > +#include <rte_malloc.h>
> > > > > > > +#include <rte_debug.h>
> > > > > > > +#include <rte_distributor.h>
> > > > > > > +
> > > > > > > +#include "main.h"
> > > > > > > +
> > > > > > > +#define RX_RING_SIZE 256
> > > > > > > +#define RX_FREE_THRESH 32
> > > > > > > +#define RX_PTHRESH 8
> > > > > > > +#define RX_HTHRESH 8
> > > > > > > +#define RX_WTHRESH 0
> > > > > > > +
> > > > > > > +#define TX_RING_SIZE 512
> > > > > > > +#define TX_FREE_THRESH 32
> > > > > > > +#define TX_PTHRESH 32
> > > > > > > +#define TX_HTHRESH 0
> > > > > > > +#define TX_WTHRESH 0
> > > > > > > +#define TX_RSBIT_THRESH 32
> > > > > > > +#define TX_Q_FLAGS (ETH_TXQ_FLAGS_NOMULTSEGS |
> > > > ETH_TXQ_FLAGS_NOVLANOFFL |\
> > > > > > > +	ETH_TXQ_FLAGS_NOXSUMSCTP | ETH_TXQ_FLAGS_NOXSUMUDP | \
> > > > > > > +	ETH_TXQ_FLAGS_NOXSUMTCP)
> > > > > > > +
> > > > > > > +#define NUM_MBUFS ((64*1024)-1)
> > > > > > > +#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) +
> > > > > > > +RTE_PKTMBUF_HEADROOM) #define MBUF_CACHE_SIZE 250 #define
> > > > > > > +BURST_SIZE 32 #define RTE_RING_SZ 1024
> > > > > > > +
> > > > > > > +/* uncommnet below line to enable debug logs */
> > > > > > > +/* #define DEBUG */
> > > > > > > +
> > > > > > > +#ifdef DEBUG
> > > > > > > +#define LOG_LEVEL RTE_LOG_DEBUG
> > > > > > > +#define LOG_DEBUG(log_type, fmt, args...) do {	\
> > > > > > > +	RTE_LOG(DEBUG, log_type, fmt, ##args)		\
> > > > > > > +} while (0)
> > > > > > > +#else
> > > > > > > +#define LOG_LEVEL RTE_LOG_INFO
> > > > > > > +#define LOG_DEBUG(log_type, fmt, args...) do {} while (0) #endif
> > > > > > > +
> > > > > > > +#define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
> > > > > > > +
> > > > > > > +/* mask of enabled ports */
> > > > > > > +static uint32_t enabled_port_mask = 0;
> > > > > > > +
> > > > > > > +static volatile struct app_stats {
> > > > > > > +	struct {
> > > > > > > +		uint64_t rx_pkts;
> > > > > > > +		uint64_t returned_pkts;
> > > > > > > +		uint64_t enqueued_pkts;
> > > > > > > +	} rx __rte_cache_aligned;
> > > > > > > +
> > > > > > > +	struct {
> > > > > > > +		uint64_t dequeue_pkts;
> > > > > > > +		uint64_t tx_pkts;
> > > > > > > +	} tx __rte_cache_aligned;
> > > > > > > +} app_stats;
> > > > > > > +
> > > > > > > +static const struct rte_eth_conf port_conf_default = {
> > > > > > > +	.rxmode = {
> > > > > > > +		.mq_mode = ETH_MQ_RX_RSS,
> > > > > > > +		.max_rx_pkt_len = ETHER_MAX_LEN,
> > > > > > > +		.split_hdr_size = 0,
> > > > > > > +		.header_split   = 0, /**< Header Split disabled */
> > > > > > > +		.hw_ip_checksum = 0, /**< IP checksum offload enabled */
> > > > > > > +		.hw_vlan_filter = 0, /**< VLAN filtering disabled */
> > > > > > > +		.jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
> > > > > > > +		.hw_strip_crc   = 0, /**< CRC stripped by hardware */
> > > > > > > +	},
> > > > > > > +	.txmode = {
> > > > > > > +		.mq_mode = ETH_MQ_TX_NONE,
> > > > > > > +	},
> > > > > > > +	.lpbk_mode = 0,
> > > > > > > +	.rx_adv_conf = {
> > > > > > > +			.rss_conf = {
> > > > > > > +				.rss_hf = ETH_RSS_IPV4 | ETH_RSS_IPV6 |
> > > > > > > +					ETH_RSS_IPV4_TCP |
> > > > ETH_RSS_IPV4_UDP |
> > > > > > > +					ETH_RSS_IPV6_TCP |
> > > > ETH_RSS_IPV6_UDP,
> > > > > > > +			}
> > > > > > > +	},
> > > > > > > +};
> > > > > > > +
> > > > > > > +static const struct rte_eth_rxconf rx_conf_default = {
> > > > > > > +	.rx_thresh = {
> > > > > > > +		.pthresh = RX_PTHRESH,
> > > > > > > +		.hthresh = RX_HTHRESH,
> > > > > > > +		.wthresh = RX_WTHRESH,
> > > > > > > +	},
> > > > > > > +	.rx_free_thresh = RX_FREE_THRESH,
> > > > > > > +	.rx_drop_en = 0,
> > > > > > > +};
> > > > > > > +
> > > > > > > +static const struct rte_eth_txconf tx_conf_default = {
> > > > > > > +	.tx_thresh = {
> > > > > > > +		.pthresh = TX_PTHRESH,
> > > > > > > +		.hthresh = TX_HTHRESH,
> > > > > > > +		.wthresh = TX_WTHRESH,
> > > > > > > +	},
> > > > > > > +	.tx_free_thresh = TX_FREE_THRESH,
> > > > > > > +	.tx_rs_thresh = TX_RSBIT_THRESH,
> > > > > > > +	.txq_flags = TX_Q_FLAGS
> > > > > > > +
> > > > > > > +};
> > > > > > > +
> > > > > > > +struct output_buffer {
> > > > > > > +	unsigned count;
> > > > > > > +	struct rte_mbuf *mbufs[BURST_SIZE]; };
> > > > > > > +
> > > > > > > +/*
> > > > > > > + * Initialises a given port using global settings and with the rx
> > > > > > > +buffers
> > > > > > > + * coming from the mbuf_pool passed as parameter  */ static
> > > > > > > +inline int port_init(uint8_t port, struct rte_mempool *mbuf_pool)
> > > > > > > +{
> > > > > > > +	struct rte_eth_conf port_conf = port_conf_default;
> > > > > > > +	const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
> > > > > > > +	int retval;
> > > > > > > +	uint16_t q;
> > > > > > > +
> > > > > > > +	if (port >= rte_eth_dev_count())
> > > > > > > +		return -1;
> > > > > > > +
> > > > > > > +	retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf);
> > > > > > > +	if (retval != 0)
> > > > > > > +		return retval;
> > > > > > > +
> > > > > > > +	for (q = 0; q < rxRings; q++) {
> > > > > > > +		retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
> > > > > > > +						rte_eth_dev_socket_id(port),
> > > > > > > +						&rx_conf_default, mbuf_pool);
> > > > > > > +		if (retval < 0)
> > > > > > > +			return retval;
> > > > > > > +	}
> > > > > > > +
> > > > > > > +	for (q = 0; q < txRings; q++) {
> > > > > > > +		retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE,
> > > > > > > +						rte_eth_dev_socket_id(port),
> > > > > > > +						&tx_conf_default);
> > > > > > > +		if (retval < 0)
> > > > > > > +			return retval;
> > > > > > > +	}
> > > > > > > +
> > > > > > > +	retval  = rte_eth_dev_start(port);
> > > > > > > +	if (retval < 0)
> > > > > > > +		return retval;
> > > > > > > +
> > > > > > > +	struct rte_eth_link link;
> > > > > > > +	rte_eth_link_get_nowait(port, &link);
> > > > > > > +	if (!link.link_status) {
> > > > > > > +		sleep(1);
> > > > > > > +		rte_eth_link_get_nowait(port, &link);
> > > > > > > +	}
> > > > > > > +
> > > > > > > +	if (!link.link_status) {
> > > > > > > +		printf("Link down on port %"PRIu8"\n", port);
> > > > > > > +		return 0;
> > > > > > > +	}
> > > > > > > +
> > > > > > > +	struct ether_addr addr;
> > > > > > > +	rte_eth_macaddr_get(port, &addr);
> > > > > > > +	printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
> > > > > > > +			" %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
> > > > > > > +			(unsigned)port,
> > > > > > > +			addr.addr_bytes[0], addr.addr_bytes[1],
> > > > > > > +			addr.addr_bytes[2], addr.addr_bytes[3],
> > > > > > > +			addr.addr_bytes[4], addr.addr_bytes[5]);
> > > > > > > +
> > > > > > > +	rte_eth_promiscuous_enable(port);
> > > > > > > +
> > > > > > > +	return 0;
> > > > > > > +}
> > > > > > > +
> > > > > > > +struct lcore_params {
> > > > > > > +	unsigned worker_id;
> > > > > > > +	struct rte_distributor *d;
> > > > > > > +	struct rte_ring *r;
> > > > > > > +};
> > > > > > > +
> > > > > > > +static __attribute__((noreturn)) void lcore_rx(struct
> > > > > > > +lcore_params *p) {
> > > > > > > +	struct rte_distributor *d = p->d;
> > > > > > > +	struct rte_ring *r = p->r;
> > > > > > > +	const uint8_t nb_ports = rte_eth_dev_count();
> > > > > > > +	const int socket_id = rte_socket_id();
> > > > > > > +	uint8_t port;
> > > > > > > +
> > > > > > > +	for (port = 0; port < nb_ports; port++) {
> > > > > > > +		/* skip ports that are not enabled */
> > > > > > > +		if ((enabled_port_mask & (1 << port)) == 0)
> > > > > > > +			continue;
> > > > > > > +
> > > > > > > +		if (rte_eth_dev_socket_id(port) > 0 &&
> > > > > > > +				rte_eth_dev_socket_id(port) != socket_id)
> > > > > > > +			printf("WARNING, port %u is on remote NUMA node to
> > > > "
> > > > > > > +					"RX thread.\n\tPerformance will not "
> > > > > > > +					"be optimal.\n", port);
> > > > > > > +	}
> > > > > > > +
> > > > > > > +	printf("\nCore %u doing packet RX.\n", rte_lcore_id());
> > > > > > > +	port = 0;
> > > > > > > +	for (;;) {
> > > > > > > +		/* skip ports that are not enabled */
> > > > > > > +		if ((enabled_port_mask & (1 << port)) == 0) {
> > > > > > > +			if (++port == nb_ports)
> > > > > > > +				port = 0;
> > > > > > > +			continue;
> > > > > > > +		}
> > > > > > > +		struct rte_mbuf *bufs[BURST_SIZE*2];
> > > > > > > +		const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
> > > > > > > +				BURST_SIZE);
> > > > > > > +		app_stats.rx.rx_pkts += nb_rx;
> > > > > > > +
> > > > > > > +		rte_distributor_process(d, bufs, nb_rx);
> > > > > > > +		const uint16_t nb_ret = rte_distributor_returned_pkts(d,
> > > > > > > +				bufs, BURST_SIZE*2);
> > > > > > > +		app_stats.rx.returned_pkts += nb_ret;
> > > > > > > +		if (unlikely(nb_ret == 0))
> > > > > > > +			continue;
> > > > > > > +
> > > > > > > +		uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret);
> > > > > > > +		app_stats.rx.enqueued_pkts += sent;
> > > > > > > +		if (unlikely(sent < nb_ret)) {
> > > > > > > +			LOG_DEBUG(DISTRAPP, "%s:Packet loss due to full
> > > > ring\n", __func__);
> > > > > > > +			while (sent < nb_ret)
> > > > > > > +				rte_pktmbuf_free(bufs[sent++]);
> > > > > > > +		}
> > > > > > > +		if (++port == nb_ports)
> > > > > > > +			port = 0;
> > > > > > > +	}
> > > > > > > +}
> > > > > > > +
> > > > > > > +static inline void
> > > > > > > +flush_one_port(struct output_buffer *outbuf, uint8_t outp) {
> > > > > > > +	unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs,
> > > > > > > +			outbuf->count);
> > > > > > > +	app_stats.tx.tx_pkts += nb_tx;
> > > > > > > +
> > > > > > > +	if (unlikely(nb_tx < outbuf->count)) {
> > > > > > > +		LOG_DEBUG(DISTRAPP, "%s:Packet loss with tx_burst\n",
> > > > __func__);
> > > > > > > +		do {
> > > > > > > +			rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
> > > > > > > +		} while (++nb_tx < outbuf->count);
> > > > > > > +	}
> > > > > > > +	outbuf->count = 0;
> > > > > > > +}
> > > > > > > +
> > > > > > > +static inline void
> > > > > > > +flush_all_ports(struct output_buffer *tx_buffers, uint8_t
> > > > > > > +nb_ports) {
> > > > > > > +	uint8_t outp;
> > > > > > > +	for (outp = 0; outp < nb_ports; outp++) {
> > > > > > > +		/* skip ports that are not enabled */
> > > > > > > +		if ((enabled_port_mask & (1 << outp)) == 0)
> > > > > > > +			continue;
> > > > > > > +
> > > > > > > +		if (tx_buffers[outp].count == 0)
> > > > > > > +			continue;
> > > > > > > +
> > > > > > > +		flush_one_port(&tx_buffers[outp], outp);
> > > > > > > +	}
> > > > > > > +}
> > > > > > > +
> > > > > > > +static __attribute__((noreturn)) void lcore_tx(struct rte_ring
> > > > > > > +*in_r) {
> > > > > > > +	static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS];
> > > > > > > +	const uint8_t nb_ports = rte_eth_dev_count();
> > > > > > > +	const int socket_id = rte_socket_id();
> > > > > > > +	uint8_t port;
> > > > > > > +
> > > > > > > +	for (port = 0; port < nb_ports; port++) {
> > > > > > > +		/* skip ports that are not enabled */
> > > > > > > +		if ((enabled_port_mask & (1 << port)) == 0)
> > > > > > > +			continue;
> > > > > > > +
> > > > > > > +		if (rte_eth_dev_socket_id(port) > 0 &&
> > > > > > > +				rte_eth_dev_socket_id(port) != socket_id)
> > > > > > > +			printf("WARNING, port %u is on remote NUMA node to
> > > > "
> > > > > > > +					"TX thread.\n\tPerformance will not "
> > > > > > > +					"be optimal.\n", port);
> > > > > > > +	}
> > > > > > > +
> > > > > > > +	printf("\nCore %u doing packet TX.\n", rte_lcore_id());
> > > > > > > +	for (;;) {
> > > > > > > +		for (port = 0; port < nb_ports; port++) {
> > > > > > > +			/* skip ports that are not enabled */
> > > > > > > +			if ((enabled_port_mask & (1 << port)) == 0)
> > > > > > > +				continue;
> > > > > > > +
> > > > > > > +			struct rte_mbuf *bufs[BURST_SIZE];
> > > > > > > +			const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
> > > > > > > +					(void *)bufs, BURST_SIZE);
> > > > > > > +			app_stats.tx.dequeue_pkts += nb_rx;
> > > > > > > +
> > > > > > > +			/* if we get no traffic, flush anything we have */
> > > > > > > +			if (unlikely(nb_rx == 0)) {
> > > > > > > +				flush_all_ports(tx_buffers, nb_ports);
> > > > > > > +				continue;
> > > > > > > +			}
> > > > > > > +
> > > > > > > +			/* for traffic we receive, queue it up for transmit */
> > > > > > > +			uint16_t i;
> > > > > > > +			_mm_prefetch(bufs[0], 0);
> > > > > > > +			_mm_prefetch(bufs[1], 0);
> > > > > > > +			_mm_prefetch(bufs[2], 0);
> > > > > > > +			for (i = 0; i < nb_rx; i++) {
> > > > > > > +				struct output_buffer *outbuf;
> > > > > > > +				uint8_t outp;
> > > > > > > +				_mm_prefetch(bufs[i + 3], 0);
> > > > > > > +				/* workers should update in_port to hold the
> > > > > > > +				 * output port value */
> > > > > > > +				outp = bufs[i]->port;
> > > > > > > +				/* skip ports that are not enabled */
> > > > > > > +				if ((enabled_port_mask & (1 << outp)) == 0)
> > > > > > > +					continue;
> > > > > > > +
> > > > > > > +				outbuf = &tx_buffers[outp];
> > > > > > > +				outbuf->mbufs[outbuf->count++] = bufs[i];
> > > > > > > +				if (outbuf->count == BURST_SIZE)
> > > > > > > +					flush_one_port(outbuf, outp);
> > > > > > > +			}
> > > > > > > +		}
> > > > > > > +	}
> > > > > > > +}
> > > > > > > +
> > > > > > > +
> > > > > > > +static __attribute__((noreturn)) void lcore_worker(struct
> > > > > > > +lcore_params *p) {
> > > > > > > +	struct rte_distributor *d = p->d;
> > > > > > > +	const unsigned id = p->worker_id;
> > > > > > > +	/* for single port, xor_val will be zero so we won't modify the output
> > > > > > > +	 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa
> > > > > > > +	 */
> > > > > > > +	const unsigned xor_val = (rte_eth_dev_count() > 1);
> > > > > > > +	struct rte_mbuf *buf = NULL;
> > > > > > > +
> > > > > > > +	printf("\nCore %u acting as worker core.\n", rte_lcore_id());
> > > > > > > +	for (;;) {
> > > > > > > +		buf = rte_distributor_get_pkt(d, id, buf);
> > > > > > > +		buf->port ^= xor_val;
> > > > > > > +	}
> > > > > > > +}
> > > > > > > +
> > > > > > > +static void
> > > > > > > +int_handler(int sig_num)
> > > > > > > +{
> > > > > > > +	struct rte_eth_stats eth_stats;
> > > > > > > +	unsigned i;
> > > > > > > +
> > > > > > > +	printf("Exiting on signal %d\n", sig_num);
> > > > > > > +
> > > > > > > +	printf("\nRX thread stats:\n");
> > > > > > > +	printf(" - Received:    %"PRIu64"\n", app_stats.rx.rx_pkts);
> > > > > > > +	printf(" - Processed:   %"PRIu64"\n", app_stats.rx.returned_pkts);
> > > > > > > +	printf(" - Enqueued:    %"PRIu64"\n", app_stats.rx.enqueued_pkts);
> > > > > > > +
> > > > > > > +	printf("\nTX thread stats:\n");
> > > > > > > +	printf(" - Dequeued:    %"PRIu64"\n", app_stats.tx.dequeue_pkts);
> > > > > > > +	printf(" - Transmitted: %"PRIu64"\n", app_stats.tx.tx_pkts);
> > > > > > > +
> > > > > > > +	for (i = 0; i < rte_eth_dev_count(); i++) {
> > > > > > > +		rte_eth_stats_get(i, &eth_stats);
> > > > > > > +		printf("\nPort %u stats:\n", i);
> > > > > > > +		printf(" - Pkts in:   %"PRIu64"\n", eth_stats.ipackets);
> > > > > > > +		printf(" - Pkts out:  %"PRIu64"\n", eth_stats.opackets);
> > > > > > > +		printf(" - In Errs:   %"PRIu64"\n", eth_stats.ierrors);
> > > > > > > +		printf(" - Out Errs:  %"PRIu64"\n", eth_stats.oerrors);
> > > > > > > +		printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf);
> > > > > > > +	}
> > > > > > > +	exit(0);
> > > > > > rte_exit here?  Also, this is a pretty ungraceful exit strategy as
> > > > > > all the threads you've created and memory you've allocated are just
> > > > forgotten here.
> > > > > > Given that dpdk mempools are shared, this has the potential to leak
> > > > > > lots of memory if other apps are using the dpdk at the same time
> > > > > > that you run this.  You probably want to use the sigint handler to
> > > > > > raise a flag to the tx/rx threads to shutdown gracefully, and then free your
> > > > allocated memory and mempool.
> > > > > >
> > > > > > Neil
> > > > > >
> > > > >
> > > > > Unless the different processes are explicitly cooperating as
> > > > > primary/secondary, the mempools are not shared. I just don't see the
> > > > > need for this app to do more cleanup on ctrl-c signal, as it's not
> > > > > intended to be a multiprocess app, and there is little that any
> > > > > secondary process could do to work with this app, except possibly some
> > > > > resource monitoring, which would be completely unaffected by it exiting the
> > > > way it does.
> > > > >
> > > > Ah, ok, so we don't use a common shared pool between isolated processes
> > > > then, thats good.  Still though, this is a sample application, I think its lazy
> > > > programming practice to illustrate to application developers that its generally ok
> > > > to exit programs without freeing your resources.  Its about 20 lines of additional
> > > > code to change the sigint handler to flag an exit condition, and have all the
> > > > other threads join on it.
> > > >
> > > > Neil
> > >
> > > 1)I had sent v5 patch which handles graceful shutdown of rx and tx threads upon SIGINT
> > I see it and will take a look shortly, thanks.
> >
> > > 2)Worker thread graceful shutdown was not handled as of now as it needs some change in lcore_worker logic , which will be done
> in future enhancements.
> > Not sure I understand what you mean here.  Can you elaborate?
> >
> > > 3)Freeing of mempool is also not handled , as the framework support is not available.
> > Ew, I hadn't noticed that, freeing of mempools seems like something we should
> > implement.
> >
> > > 4)Cleaning of rx/tx queues not done, as it needs some extensive logic which we haven't planned as of now. Will check the
> possibility of doing it in future  enhancements    i.e in next version of sample application.
> > We can't just flush the queues after we shutdown the workers?  I presume a queue
> > flush operation exists, yes?
> > Neil
> 
> Other than code hygiene, which does have some value in itself, I can't
> really see what the practical point of such cleanup would be.
> 
> If traffic is going through the system, and the process is killed packets
> will be dropped, whatever we do, as packet reception will stop. If traffic
> is not going through the system, then there are no packets in flight and
> therefore no relevant cleanup to be done. [And if the traffic is stopped
> just before shutting down the app, at a throughput rate of a couple of
> million packets per second, the app should be flushed of packets within tiny
> fractions of a second].

I think that in theory not resetting HW at process termination can cause a problem.
Something like that:
- DPDK app has HW RX/TX queues active with armed RXDs, but the link is idle (no packets are flying). 
- DPDK app terminates abnormally.
- user deletes DPDK hugepages files.
- Hugepage memory that was used by DPDK for RXDs/data buffers are given to other app (or kernel).
- Packet arrives -a s HW is still active - it will do a write to the RXD and data-buffer. 
- Silent memory corruption.

Saying that, I don't think we can completely eliminate that problem from user-space code -
as not process signals can be handled. 
>From other side - our whole concept is to move away from custom kernel modules...
Though we probably can make it less possible to happen - create a termination handler,
that  would try to reset all active HW.
And make sure it is called atexit and all catchable signals that cause process termination.
But I don' t think it is a good idea to duplicate such code in each and every sample app.
I think it should be in the librte_eal, and yes - it is a subject of a separate patch/discussion :)

Konstantin
 
> So overall, I just don't see complicated jumping through hoops for flushing
> and cleaning up things being worth the effort. These apps are designed to
> run in a forever loop, and, in the exception case, when killed the cleanup
> done by kernel is sufficient.
> 
> regards,
> /Bruce
  
Pattan, Reshma Oct. 6, 2014, 2:16 p.m. UTC | #9
> -----Original Message-----
> From: Neil Horman [mailto:nhorman@tuxdriver.com]
> Sent: Wednesday, October 1, 2014 5:08 PM
> To: Richardson, Bruce
> Cc: Pattan, Reshma; dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app
> 
> > > >
> > > > 1)I had sent v5 patch which handles graceful shutdown of rx and tx
> > > > threads upon SIGINT
> > > I see it and will take a look shortly, thanks.
> > >
> > > > 2)Worker thread graceful shutdown was not handled as of now as it needs
> some change in lcore_worker logic , which will be done in future enhancements.
> > > Not sure I understand what you mean here.  Can you elaborate?
> > >
rte_distributor_process which runs as part of rx thread will process incoming packets and checks for any requests for the packets from worker threads .
If request is seen, it adds the packet/work  to particular workers back log and proceed with processing of  next packet.
 If no request seen the packet index will not be incremented and the while loop which is conditionally based on packet indexing  runs in a continuous loop without breaking and rx thread will not proceed with next statement execution until unless rte_distributor_process  comes out of while loop. 
This issue happens only when we enable graceful shutdown logic for both rx/worker threads, as workers threads gets killed and no request seen by rx thread and it stucks. 
Hence as of now graceful shutdown logic is provided only for rx thread. For worker threads will check what can be done in next enhancements. 

Thanks,
Reshma

> > > > 3)Freeing of mempool is also not handled , as the framework support is not
> available.
> > > Ew, I hadn't noticed that, freeing of mempools seems like something
> > > we should implement.
> > >
> > > > 4)Cleaning of rx/tx queues not done, as it needs some extensive logic
> which we haven't planned as of now. Will check the possibility of doing it in
> future  enhancements    i.e in next version of sample application.
> > > We can't just flush the queues after we shutdown the workers?  I
> > > presume a queue flush operation exists, yes?
> > > Neil
> >
> > Other than code hygiene, which does have some value in itself, I can't
> > really see what the practical point of such cleanup would be.
> >
> This is really the only assertion I'm trying to make.  I understand this application
> won't suffer from exiting uncleanly, and that makes the need for preforming
> cleanup little more than overhead.
> 
> But that said, hygine is exactly the point I'm driving at here.  These are example
> applications, that presumably people look at when writing their own apps.  If
> you don't do things properly, people looking at your code are less likely to do
> them as well.  Even if it doesn't hurt for you to exit uncleanly, it will hurt
> someone, and if they look to these examples as a source of best practices, it
> seems to me that it would be in everyones interest, if best practices were
> demonstrated.
> 
> Neil

--------------------------------------------------------------
Intel Shannon Limited
Registered in Ireland
Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
Registered Number: 308263
Business address: Dromore House, East Park, Shannon, Co. Clare

This e-mail and any attachments may contain confidential material for the sole use of the intended recipient(s). Any review or distribution by others is strictly prohibited. If you are not the intended recipient, please contact the sender and delete all copies.
  
Neil Horman Oct. 6, 2014, 2:44 p.m. UTC | #10
On Mon, Oct 06, 2014 at 02:16:22PM +0000, Pattan, Reshma wrote:
> 
> 
> > -----Original Message-----
> > From: Neil Horman [mailto:nhorman@tuxdriver.com]
> > Sent: Wednesday, October 1, 2014 5:08 PM
> > To: Richardson, Bruce
> > Cc: Pattan, Reshma; dev@dpdk.org
> > Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app
> > 
> > > > >
> > > > > 1)I had sent v5 patch which handles graceful shutdown of rx and tx
> > > > > threads upon SIGINT
> > > > I see it and will take a look shortly, thanks.
> > > >
> > > > > 2)Worker thread graceful shutdown was not handled as of now as it needs
> > some change in lcore_worker logic , which will be done in future enhancements.
> > > > Not sure I understand what you mean here.  Can you elaborate?
> > > >
> rte_distributor_process which runs as part of rx thread will process incoming packets and checks for any requests for the packets from worker threads .
> If request is seen, it adds the packet/work  to particular workers back log and proceed with processing of  next packet.
>  If no request seen the packet index will not be incremented and the while loop which is conditionally based on packet indexing  runs in a continuous loop without breaking and rx thread will not proceed with next statement execution until unless rte_distributor_process  comes out of while loop. 
> This issue happens only when we enable graceful shutdown logic for both rx/worker threads, as workers threads gets killed and no request seen by rx thread and it stucks. 
> Hence as of now graceful shutdown logic is provided only for rx thread. For worker threads will check what can be done in next enhancements. 
> 
> Thanks,
> Reshma
> 

I see what you're saying, Once you make a call to rte_distributor_get_pkt, you
have no way to gracefully shut down use of the rte_distributor_library. Not just
this application, but any application.  Thats just not sane, and suggests that
we integrated the rte_distributor library too soon.  I would suggest that you
prefix this patch with an update to the rte distributor library to allow
rte_distributor_get_pkt and friends to return NULL if the queue is emtpy.
Applications should be checking the return value for NULL anyway, and can
preform the rte_pause operation.  Then update this patch to do a clean exit.  To
say that "we will check what can be done in the next enhancements" is to say
that this won't be addressed again until a paying custmoer gripes about it.
Neil

> > > > > 3)Freeing of mempool is also not handled , as the framework support is not
> > available.
> > > > Ew, I hadn't noticed that, freeing of mempools seems like something
> > > > we should implement.
> > > >
> > > > > 4)Cleaning of rx/tx queues not done, as it needs some extensive logic
> > which we haven't planned as of now. Will check the possibility of doing it in
> > future  enhancements    i.e in next version of sample application.
> > > > We can't just flush the queues after we shutdown the workers?  I
> > > > presume a queue flush operation exists, yes?
> > > > Neil
> > >
> > > Other than code hygiene, which does have some value in itself, I can't
> > > really see what the practical point of such cleanup would be.
> > >
> > This is really the only assertion I'm trying to make.  I understand this application
> > won't suffer from exiting uncleanly, and that makes the need for preforming
> > cleanup little more than overhead.
> > 
> > But that said, hygine is exactly the point I'm driving at here.  These are example
> > applications, that presumably people look at when writing their own apps.  If
> > you don't do things properly, people looking at your code are less likely to do
> > them as well.  Even if it doesn't hurt for you to exit uncleanly, it will hurt
> > someone, and if they look to these examples as a source of best practices, it
> > seems to me that it would be in everyones interest, if best practices were
> > demonstrated.
> > 
> > Neil
> 
> --------------------------------------------------------------
> Intel Shannon Limited
> Registered in Ireland
> Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
> Registered Number: 308263
> Business address: Dromore House, East Park, Shannon, Co. Clare
> 
> This e-mail and any attachments may contain confidential material for the sole use of the intended recipient(s). Any review or distribution by others is strictly prohibited. If you are not the intended recipient, please contact the sender and delete all copies.
> 
> 
>
  
Pattan, Reshma Oct. 6, 2014, 5:34 p.m. UTC | #11
> -----Original Message-----
> From: Neil Horman [mailto:nhorman@tuxdriver.com]
> Sent: Monday, October 6, 2014 3:45 PM
> To: Pattan, Reshma
> Cc: dev@dpdk.org; Richardson, Bruce
> Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app
> 
> On Mon, Oct 06, 2014 at 02:16:22PM +0000, Pattan, Reshma wrote:
> >
> >
> > > -----Original Message-----
> > > From: Neil Horman [mailto:nhorman@tuxdriver.com]
> > > Sent: Wednesday, October 1, 2014 5:08 PM
> > > To: Richardson, Bruce
> > > Cc: Pattan, Reshma; dev@dpdk.org
> > > Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app
> > >
> > > > > >
> > > > > > 1)I had sent v5 patch which handles graceful shutdown of rx
> > > > > > and tx threads upon SIGINT
> > > > > I see it and will take a look shortly, thanks.
> > > > >
> > > > > > 2)Worker thread graceful shutdown was not handled as of now as
> > > > > > it needs
> > > some change in lcore_worker logic , which will be done in future
> enhancements.
> > > > > Not sure I understand what you mean here.  Can you elaborate?
> > > > >
> > rte_distributor_process which runs as part of rx thread will process incoming
> packets and checks for any requests for the packets from worker threads .
> > If request is seen, it adds the packet/work  to particular workers back log and
> proceed with processing of  next packet.
> >  If no request seen the packet index will not be incremented and the while loop
> which is conditionally based on packet indexing  runs in a continuous loop
> without breaking and rx thread will not proceed with next statement execution
> until unless rte_distributor_process  comes out of while loop.
> > This issue happens only when we enable graceful shutdown logic for both
> rx/worker threads, as workers threads gets killed and no request seen by rx
> thread and it stucks.
> > Hence as of now graceful shutdown logic is provided only for rx thread. For
> worker threads will check what can be done in next enhancements.
> >
> > Thanks,
> > Reshma
> >
> 
> I see what you're saying, Once you make a call to rte_distributor_get_pkt, you
> have no way to gracefully shut down use of the rte_distributor_library. Not just
> this application, but any application.  Thats just not sane, and suggests that we
> integrated the rte_distributor library too soon.  I would suggest that you prefix
> this patch with an update to the rte distributor library to allow
> rte_distributor_get_pkt and friends to return NULL if the queue is emtpy.
> Applications should be checking the return value for NULL anyway, and can
> preform the rte_pause operation.  Then update this patch to do a clean exit.  To
> say that "we will check what can be done in the next enhancements" is to say
> that this won't be addressed again until a paying custmoer gripes about it.
> Neil
> 

Hi Neil,

We have rte_distributor_request_pkt and rte_distributor_poll_pkt() in dpdk.org,  which can be used together(in place of  rte_distributor_get_pkt)
 to check empty queue condition I believe. So no separate changes needed.
Though we replace to rte_distributor_get_pkt with above two, the issue will not be solved as the thread that gets blocked is rx thread but not worker thread.
Rx thread gets blocked in rte_distributore_process due to non-availability of requests from worker. 
 How about sending dummy request_pkts upon SIGINT and allow rte_distributore_process to get to completion? 

Thanks,
Reshma

> > > > > > 3)Freeing of mempool is also not handled , as the framework
> > > > > > support is not
> > > available.
> > > > > Ew, I hadn't noticed that, freeing of mempools seems like
> > > > > something we should implement.
> > > > >
> > > > > > 4)Cleaning of rx/tx queues not done, as it needs some
> > > > > > extensive logic
> > > which we haven't planned as of now. Will check the possibility of doing it in
> > > future  enhancements    i.e in next version of sample application.
> > > > > We can't just flush the queues after we shutdown the workers?  I
> > > > > presume a queue flush operation exists, yes?
> > > > > Neil
> > > >
> > > > Other than code hygiene, which does have some value in itself, I
> > > > can't really see what the practical point of such cleanup would be.
> > > >
> > > This is really the only assertion I'm trying to make.  I understand
> > > this application won't suffer from exiting uncleanly, and that makes
> > > the need for preforming cleanup little more than overhead.
> > >
> > > But that said, hygine is exactly the point I'm driving at here.
> > > These are example applications, that presumably people look at when
> > > writing their own apps.  If you don't do things properly, people
> > > looking at your code are less likely to do them as well.  Even if it
> > > doesn't hurt for you to exit uncleanly, it will hurt someone, and if
> > > they look to these examples as a source of best practices, it seems
> > > to me that it would be in everyones interest, if best practices were
> demonstrated.
> > >
> > > Neil
> >
> > --------------------------------------------------------------
> > Intel Shannon Limited
> > Registered in Ireland
> > Registered Office: Collinstown Industrial Park, Leixlip, County
> > Kildare Registered Number: 308263 Business address: Dromore House,
> > East Park, Shannon, Co. Clare
> >
> > This e-mail and any attachments may contain confidential material for the sole
> use of the intended recipient(s). Any review or distribution by others is strictly
> prohibited. If you are not the intended recipient, please contact the sender and
> delete all copies.
> >
> >
> >
--------------------------------------------------------------
Intel Shannon Limited
Registered in Ireland
Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
Registered Number: 308263
Business address: Dromore House, East Park, Shannon, Co. Clare

This e-mail and any attachments may contain confidential material for the sole use of the intended recipient(s). Any review or distribution by others is strictly prohibited. If you are not the intended recipient, please contact the sender and delete all copies.
  
Neil Horman Oct. 6, 2014, 7:02 p.m. UTC | #12
On Mon, Oct 06, 2014 at 05:34:13PM +0000, Pattan, Reshma wrote:
> 
> 
> > -----Original Message-----
> > From: Neil Horman [mailto:nhorman@tuxdriver.com]
> > Sent: Monday, October 6, 2014 3:45 PM
> > To: Pattan, Reshma
> > Cc: dev@dpdk.org; Richardson, Bruce
> > Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app
> > 
> > On Mon, Oct 06, 2014 at 02:16:22PM +0000, Pattan, Reshma wrote:
> > >
> > >
> > > > -----Original Message-----
> > > > From: Neil Horman [mailto:nhorman@tuxdriver.com]
> > > > Sent: Wednesday, October 1, 2014 5:08 PM
> > > > To: Richardson, Bruce
> > > > Cc: Pattan, Reshma; dev@dpdk.org
> > > > Subject: Re: [dpdk-dev] [PATCH v3] distributor_app: new sample app
> > > >
> > > > > > >
> > > > > > > 1)I had sent v5 patch which handles graceful shutdown of rx
> > > > > > > and tx threads upon SIGINT
> > > > > > I see it and will take a look shortly, thanks.
> > > > > >
> > > > > > > 2)Worker thread graceful shutdown was not handled as of now as
> > > > > > > it needs
> > > > some change in lcore_worker logic , which will be done in future
> > enhancements.
> > > > > > Not sure I understand what you mean here.  Can you elaborate?
> > > > > >
> > > rte_distributor_process which runs as part of rx thread will process incoming
> > packets and checks for any requests for the packets from worker threads .
> > > If request is seen, it adds the packet/work  to particular workers back log and
> > proceed with processing of  next packet.
> > >  If no request seen the packet index will not be incremented and the while loop
> > which is conditionally based on packet indexing  runs in a continuous loop
> > without breaking and rx thread will not proceed with next statement execution
> > until unless rte_distributor_process  comes out of while loop.
> > > This issue happens only when we enable graceful shutdown logic for both
> > rx/worker threads, as workers threads gets killed and no request seen by rx
> > thread and it stucks.
> > > Hence as of now graceful shutdown logic is provided only for rx thread. For
> > worker threads will check what can be done in next enhancements.
> > >
> > > Thanks,
> > > Reshma
> > >
> > 
> > I see what you're saying, Once you make a call to rte_distributor_get_pkt, you
> > have no way to gracefully shut down use of the rte_distributor_library. Not just
> > this application, but any application.  Thats just not sane, and suggests that we
> > integrated the rte_distributor library too soon.  I would suggest that you prefix
> > this patch with an update to the rte distributor library to allow
> > rte_distributor_get_pkt and friends to return NULL if the queue is emtpy.
> > Applications should be checking the return value for NULL anyway, and can
> > preform the rte_pause operation.  Then update this patch to do a clean exit.  To
> > say that "we will check what can be done in the next enhancements" is to say
> > that this won't be addressed again until a paying custmoer gripes about it.
> > Neil
> > 
> 
> Hi Neil,
> 
> We have rte_distributor_request_pkt and rte_distributor_poll_pkt() in dpdk.org,  which can be used together(in place of  rte_distributor_get_pkt)
>  to check empty queue condition I believe. So no separate changes needed.
> Though we replace to rte_distributor_get_pkt with above two, the issue will not be solved as the thread that gets blocked is rx thread but not worker thread.
> Rx thread gets blocked in rte_distributore_process due to non-availability of requests from worker. 
>  How about sending dummy request_pkts upon SIGINT and allow rte_distributore_process to get to completion? 
> 
I suppose creating a flagged dummy packet to ensure that the worker threads can
exit their spin loops works fine, yes.

Neil

> Thanks,
> Reshma
> 
> > > > > > > 3)Freeing of mempool is also not handled , as the framework
> > > > > > > support is not
> > > > available.
> > > > > > Ew, I hadn't noticed that, freeing of mempools seems like
> > > > > > something we should implement.
> > > > > >
> > > > > > > 4)Cleaning of rx/tx queues not done, as it needs some
> > > > > > > extensive logic
> > > > which we haven't planned as of now. Will check the possibility of doing it in
> > > > future  enhancements    i.e in next version of sample application.
> > > > > > We can't just flush the queues after we shutdown the workers?  I
> > > > > > presume a queue flush operation exists, yes?
> > > > > > Neil
> > > > >
> > > > > Other than code hygiene, which does have some value in itself, I
> > > > > can't really see what the practical point of such cleanup would be.
> > > > >
> > > > This is really the only assertion I'm trying to make.  I understand
> > > > this application won't suffer from exiting uncleanly, and that makes
> > > > the need for preforming cleanup little more than overhead.
> > > >
> > > > But that said, hygine is exactly the point I'm driving at here.
> > > > These are example applications, that presumably people look at when
> > > > writing their own apps.  If you don't do things properly, people
> > > > looking at your code are less likely to do them as well.  Even if it
> > > > doesn't hurt for you to exit uncleanly, it will hurt someone, and if
> > > > they look to these examples as a source of best practices, it seems
> > > > to me that it would be in everyones interest, if best practices were
> > demonstrated.
> > > >
> > > > Neil
> > >
> > > --------------------------------------------------------------
> > > Intel Shannon Limited
> > > Registered in Ireland
> > > Registered Office: Collinstown Industrial Park, Leixlip, County
> > > Kildare Registered Number: 308263 Business address: Dromore House,
> > > East Park, Shannon, Co. Clare
> > >
> > > This e-mail and any attachments may contain confidential material for the sole
> > use of the intended recipient(s). Any review or distribution by others is strictly
> > prohibited. If you are not the intended recipient, please contact the sender and
> > delete all copies.
> > >
> > >
> > >
> --------------------------------------------------------------
> Intel Shannon Limited
> Registered in Ireland
> Registered Office: Collinstown Industrial Park, Leixlip, County Kildare
> Registered Number: 308263
> Business address: Dromore House, East Park, Shannon, Co. Clare
> 
> This e-mail and any attachments may contain confidential material for the sole use of the intended recipient(s). Any review or distribution by others is strictly prohibited. If you are not the intended recipient, please contact the sender and delete all copies.
> 
> 
>
  

Patch

diff --git a/examples/Makefile b/examples/Makefile
index 6245f83..2ba82b0 100644
--- a/examples/Makefile
+++ b/examples/Makefile
@@ -66,5 +66,6 @@  DIRS-y += vhost
 DIRS-$(CONFIG_RTE_LIBRTE_XEN_DOM0) += vhost_xen
 DIRS-y += vmdq
 DIRS-y += vmdq_dcb
+DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += distributor_app
 
 include $(RTE_SDK)/mk/rte.extsubdir.mk
diff --git a/examples/distributor_app/Makefile b/examples/distributor_app/Makefile
new file mode 100644
index 0000000..6a5bada
--- /dev/null
+++ b/examples/distributor_app/Makefile
@@ -0,0 +1,57 @@ 
+#   BSD LICENSE
+#
+#   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+#   All rights reserved.
+#
+#   Redistribution and use in source and binary forms, with or without
+#   modification, are permitted provided that the following conditions
+#   are met:
+#
+#     * Redistributions of source code must retain the above copyright
+#       notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above copyright
+#       notice, this list of conditions and the following disclaimer in
+#       the documentation and/or other materials provided with the
+#       distribution.
+#     * Neither the name of Intel Corporation nor the names of its
+#       contributors may be used to endorse or promote products derived
+#       from this software without specific prior written permission.
+#
+#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+ifeq ($(RTE_SDK),)
+$(error "Please define RTE_SDK environment variable")
+endif
+
+# Default target, can be overriden by command line or environment
+RTE_TARGET ?= x86_64-native-linuxapp-gcc
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# binary name
+APP = distributor_app
+
+# all source are stored in SRCS-y
+SRCS-y := main.c
+
+CFLAGS += $(WERROR_FLAGS)
+
+# workaround for a gcc bug with noreturn attribute
+# http://gcc.gnu.org/bugzilla/show_bug.cgi?id=12603
+ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y)
+CFLAGS_main.o += -Wno-return-type
+endif
+
+EXTRA_CFLAGS += -O3 -Wfatal-errors
+
+include $(RTE_SDK)/mk/rte.extapp.mk
diff --git a/examples/distributor_app/main.c b/examples/distributor_app/main.c
new file mode 100644
index 0000000..f555d93
--- /dev/null
+++ b/examples/distributor_app/main.c
@@ -0,0 +1,600 @@ 
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdint.h>
+#include <inttypes.h>
+#include <unistd.h>
+#include <signal.h>
+#include <getopt.h>
+
+#include <rte_eal.h>
+#include <rte_ethdev.h>
+#include <rte_cycles.h>
+#include <rte_malloc.h>
+#include <rte_debug.h>
+#include <rte_distributor.h>
+
+#include "main.h"
+
+#define RX_RING_SIZE 256
+#define RX_FREE_THRESH 32
+#define RX_PTHRESH 8
+#define RX_HTHRESH 8
+#define RX_WTHRESH 0
+
+#define TX_RING_SIZE 512
+#define TX_FREE_THRESH 32
+#define TX_PTHRESH 32
+#define TX_HTHRESH 0
+#define TX_WTHRESH 0
+#define TX_RSBIT_THRESH 32
+#define TX_Q_FLAGS (ETH_TXQ_FLAGS_NOMULTSEGS | ETH_TXQ_FLAGS_NOVLANOFFL |\
+	ETH_TXQ_FLAGS_NOXSUMSCTP | ETH_TXQ_FLAGS_NOXSUMUDP | \
+	ETH_TXQ_FLAGS_NOXSUMTCP)
+
+#define NUM_MBUFS ((64*1024)-1)
+#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
+#define MBUF_CACHE_SIZE 250
+#define BURST_SIZE 32
+#define RTE_RING_SZ 1024
+
+/* uncommnet below line to enable debug logs */
+/* #define DEBUG */
+
+#ifdef DEBUG
+#define LOG_LEVEL RTE_LOG_DEBUG
+#define LOG_DEBUG(log_type, fmt, args...) do {	\
+	RTE_LOG(DEBUG, log_type, fmt, ##args)		\
+} while (0)
+#else
+#define LOG_LEVEL RTE_LOG_INFO
+#define LOG_DEBUG(log_type, fmt, args...) do {} while (0)
+#endif
+
+#define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
+
+/* mask of enabled ports */
+static uint32_t enabled_port_mask = 0;
+
+static volatile struct app_stats {
+	struct {
+		uint64_t rx_pkts;
+		uint64_t returned_pkts;
+		uint64_t enqueued_pkts;
+	} rx __rte_cache_aligned;
+
+	struct {
+		uint64_t dequeue_pkts;
+		uint64_t tx_pkts;
+	} tx __rte_cache_aligned;
+} app_stats;
+
+static const struct rte_eth_conf port_conf_default = {
+	.rxmode = {
+		.mq_mode = ETH_MQ_RX_RSS,
+		.max_rx_pkt_len = ETHER_MAX_LEN,
+		.split_hdr_size = 0,
+		.header_split   = 0, /**< Header Split disabled */
+		.hw_ip_checksum = 0, /**< IP checksum offload enabled */
+		.hw_vlan_filter = 0, /**< VLAN filtering disabled */
+		.jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
+		.hw_strip_crc   = 0, /**< CRC stripped by hardware */
+	},
+	.txmode = {
+		.mq_mode = ETH_MQ_TX_NONE,
+	},
+	.lpbk_mode = 0,
+	.rx_adv_conf = {
+			.rss_conf = {
+				.rss_hf = ETH_RSS_IPV4 | ETH_RSS_IPV6 |
+					ETH_RSS_IPV4_TCP | ETH_RSS_IPV4_UDP |
+					ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
+			}
+	},
+};
+
+static const struct rte_eth_rxconf rx_conf_default = {
+	.rx_thresh = {
+		.pthresh = RX_PTHRESH,
+		.hthresh = RX_HTHRESH,
+		.wthresh = RX_WTHRESH,
+	},
+	.rx_free_thresh = RX_FREE_THRESH,
+	.rx_drop_en = 0,
+};
+
+static const struct rte_eth_txconf tx_conf_default = {
+	.tx_thresh = {
+		.pthresh = TX_PTHRESH,
+		.hthresh = TX_HTHRESH,
+		.wthresh = TX_WTHRESH,
+	},
+	.tx_free_thresh = TX_FREE_THRESH,
+	.tx_rs_thresh = TX_RSBIT_THRESH,
+	.txq_flags = TX_Q_FLAGS
+
+};
+
+struct output_buffer {
+	unsigned count;
+	struct rte_mbuf *mbufs[BURST_SIZE];
+};
+
+/*
+ * Initialises a given port using global settings and with the rx buffers
+ * coming from the mbuf_pool passed as parameter
+ */
+static inline int
+port_init(uint8_t port, struct rte_mempool *mbuf_pool)
+{
+	struct rte_eth_conf port_conf = port_conf_default;
+	const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
+	int retval;
+	uint16_t q;
+
+	if (port >= rte_eth_dev_count())
+		return -1;
+
+	retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf);
+	if (retval != 0)
+		return retval;
+
+	for (q = 0; q < rxRings; q++) {
+		retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
+						rte_eth_dev_socket_id(port),
+						&rx_conf_default, mbuf_pool);
+		if (retval < 0)
+			return retval;
+	}
+
+	for (q = 0; q < txRings; q++) {
+		retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE,
+						rte_eth_dev_socket_id(port),
+						&tx_conf_default);
+		if (retval < 0)
+			return retval;
+	}
+
+	retval  = rte_eth_dev_start(port);
+	if (retval < 0)
+		return retval;
+
+	struct rte_eth_link link;
+	rte_eth_link_get_nowait(port, &link);
+	if (!link.link_status) {
+		sleep(1);
+		rte_eth_link_get_nowait(port, &link);
+	}
+
+	if (!link.link_status) {
+		printf("Link down on port %"PRIu8"\n", port);
+		return 0;
+	}
+
+	struct ether_addr addr;
+	rte_eth_macaddr_get(port, &addr);
+	printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
+			" %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
+			(unsigned)port,
+			addr.addr_bytes[0], addr.addr_bytes[1],
+			addr.addr_bytes[2], addr.addr_bytes[3],
+			addr.addr_bytes[4], addr.addr_bytes[5]);
+
+	rte_eth_promiscuous_enable(port);
+
+	return 0;
+}
+
+struct lcore_params {
+	unsigned worker_id;
+	struct rte_distributor *d;
+	struct rte_ring *r;
+};
+
+static __attribute__((noreturn)) void
+lcore_rx(struct lcore_params *p)
+{
+	struct rte_distributor *d = p->d;
+	struct rte_ring *r = p->r;
+	const uint8_t nb_ports = rte_eth_dev_count();
+	const int socket_id = rte_socket_id();
+	uint8_t port;
+
+	for (port = 0; port < nb_ports; port++) {
+		/* skip ports that are not enabled */
+		if ((enabled_port_mask & (1 << port)) == 0)
+			continue;
+
+		if (rte_eth_dev_socket_id(port) > 0 &&
+				rte_eth_dev_socket_id(port) != socket_id)
+			printf("WARNING, port %u is on remote NUMA node to "
+					"RX thread.\n\tPerformance will not "
+					"be optimal.\n", port);
+	}
+
+	printf("\nCore %u doing packet RX.\n", rte_lcore_id());
+	port = 0;
+	for (;;) {
+		/* skip ports that are not enabled */
+		if ((enabled_port_mask & (1 << port)) == 0) {
+			if (++port == nb_ports)
+				port = 0;
+			continue;
+		}
+		struct rte_mbuf *bufs[BURST_SIZE*2];
+		const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
+				BURST_SIZE);
+		app_stats.rx.rx_pkts += nb_rx;
+
+		rte_distributor_process(d, bufs, nb_rx);
+		const uint16_t nb_ret = rte_distributor_returned_pkts(d,
+				bufs, BURST_SIZE*2);
+		app_stats.rx.returned_pkts += nb_ret;
+		if (unlikely(nb_ret == 0))
+			continue;
+
+		uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret);
+		app_stats.rx.enqueued_pkts += sent;
+		if (unlikely(sent < nb_ret)) {
+			LOG_DEBUG(DISTRAPP, "%s:Packet loss due to full ring\n", __func__);
+			while (sent < nb_ret)
+				rte_pktmbuf_free(bufs[sent++]);
+		}
+		if (++port == nb_ports)
+			port = 0;
+	}
+}
+
+static inline void
+flush_one_port(struct output_buffer *outbuf, uint8_t outp)
+{
+	unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs,
+			outbuf->count);
+	app_stats.tx.tx_pkts += nb_tx;
+
+	if (unlikely(nb_tx < outbuf->count)) {
+		LOG_DEBUG(DISTRAPP, "%s:Packet loss with tx_burst\n", __func__);
+		do {
+			rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
+		} while (++nb_tx < outbuf->count);
+	}
+	outbuf->count = 0;
+}
+
+static inline void
+flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports)
+{
+	uint8_t outp;
+	for (outp = 0; outp < nb_ports; outp++) {
+		/* skip ports that are not enabled */
+		if ((enabled_port_mask & (1 << outp)) == 0)
+			continue;
+
+		if (tx_buffers[outp].count == 0)
+			continue;
+
+		flush_one_port(&tx_buffers[outp], outp);
+	}
+}
+
+static __attribute__((noreturn)) void
+lcore_tx(struct rte_ring *in_r)
+{
+	static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS];
+	const uint8_t nb_ports = rte_eth_dev_count();
+	const int socket_id = rte_socket_id();
+	uint8_t port;
+
+	for (port = 0; port < nb_ports; port++) {
+		/* skip ports that are not enabled */
+		if ((enabled_port_mask & (1 << port)) == 0)
+			continue;
+
+		if (rte_eth_dev_socket_id(port) > 0 &&
+				rte_eth_dev_socket_id(port) != socket_id)
+			printf("WARNING, port %u is on remote NUMA node to "
+					"TX thread.\n\tPerformance will not "
+					"be optimal.\n", port);
+	}
+
+	printf("\nCore %u doing packet TX.\n", rte_lcore_id());
+	for (;;) {
+		for (port = 0; port < nb_ports; port++) {
+			/* skip ports that are not enabled */
+			if ((enabled_port_mask & (1 << port)) == 0)
+				continue;
+
+			struct rte_mbuf *bufs[BURST_SIZE];
+			const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
+					(void *)bufs, BURST_SIZE);
+			app_stats.tx.dequeue_pkts += nb_rx;
+
+			/* if we get no traffic, flush anything we have */
+			if (unlikely(nb_rx == 0)) {
+				flush_all_ports(tx_buffers, nb_ports);
+				continue;
+			}
+
+			/* for traffic we receive, queue it up for transmit */
+			uint16_t i;
+			_mm_prefetch(bufs[0], 0);
+			_mm_prefetch(bufs[1], 0);
+			_mm_prefetch(bufs[2], 0);
+			for (i = 0; i < nb_rx; i++) {
+				struct output_buffer *outbuf;
+				uint8_t outp;
+				_mm_prefetch(bufs[i + 3], 0);
+				/* workers should update in_port to hold the
+				 * output port value */
+				outp = bufs[i]->port;
+				/* skip ports that are not enabled */
+				if ((enabled_port_mask & (1 << outp)) == 0)
+					continue;
+
+				outbuf = &tx_buffers[outp];
+				outbuf->mbufs[outbuf->count++] = bufs[i];
+				if (outbuf->count == BURST_SIZE)
+					flush_one_port(outbuf, outp);
+			}
+		}
+	}
+}
+
+
+static __attribute__((noreturn)) void
+lcore_worker(struct lcore_params *p)
+{
+	struct rte_distributor *d = p->d;
+	const unsigned id = p->worker_id;
+	/* for single port, xor_val will be zero so we won't modify the output
+	 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa
+	 */
+	const unsigned xor_val = (rte_eth_dev_count() > 1);
+	struct rte_mbuf *buf = NULL;
+
+	printf("\nCore %u acting as worker core.\n", rte_lcore_id());
+	for (;;) {
+		buf = rte_distributor_get_pkt(d, id, buf);
+		buf->port ^= xor_val;
+	}
+}
+
+static void
+int_handler(int sig_num)
+{
+	struct rte_eth_stats eth_stats;
+	unsigned i;
+
+	printf("Exiting on signal %d\n", sig_num);
+
+	printf("\nRX thread stats:\n");
+	printf(" - Received:    %"PRIu64"\n", app_stats.rx.rx_pkts);
+	printf(" - Processed:   %"PRIu64"\n", app_stats.rx.returned_pkts);
+	printf(" - Enqueued:    %"PRIu64"\n", app_stats.rx.enqueued_pkts);
+
+	printf("\nTX thread stats:\n");
+	printf(" - Dequeued:    %"PRIu64"\n", app_stats.tx.dequeue_pkts);
+	printf(" - Transmitted: %"PRIu64"\n", app_stats.tx.tx_pkts);
+
+	for (i = 0; i < rte_eth_dev_count(); i++) {
+		rte_eth_stats_get(i, &eth_stats);
+		printf("\nPort %u stats:\n", i);
+		printf(" - Pkts in:   %"PRIu64"\n", eth_stats.ipackets);
+		printf(" - Pkts out:  %"PRIu64"\n", eth_stats.opackets);
+		printf(" - In Errs:   %"PRIu64"\n", eth_stats.ierrors);
+		printf(" - Out Errs:  %"PRIu64"\n", eth_stats.oerrors);
+		printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf);
+	}
+	exit(0);
+}
+
+/* display usage */
+static void
+print_usage(const char *prgname)
+{
+	printf("%s [EAL options] -- -p PORTMASK\n"
+			"  -p PORTMASK: hexadecimal bitmask of ports to configure\n",
+			prgname);
+}
+
+static int
+parse_portmask(const char *portmask)
+{
+	char *end = NULL;
+	unsigned long pm;
+
+	/* parse hexadecimal string */
+	pm = strtoul(portmask, &end, 16);
+	if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0'))
+		return -1;
+
+	if (pm == 0)
+		return -1;
+
+	return pm;
+}
+
+/* Parse the argument given in the command line of the application */
+static int
+parse_args(int argc, char **argv)
+{
+	int opt;
+	char **argvopt;
+	int option_index;
+	char *prgname = argv[0];
+	static struct option lgopts[] = {
+		{NULL, 0, 0, 0}
+	};
+
+	argvopt = argv;
+
+	while ((opt = getopt_long(argc, argvopt, "p:",
+			lgopts, &option_index)) != EOF) {
+
+		switch (opt) {
+			/* portmask */
+			case 'p':
+				enabled_port_mask = parse_portmask(optarg);
+				if (enabled_port_mask == 0) {
+					printf("invalid portmask\n");
+					print_usage(prgname);
+					return -1;
+				}
+				break;
+
+			default:
+				print_usage(prgname);
+				return -1;
+		}
+	}
+
+if (optind <= 1) {
+	print_usage(prgname);
+	return -1;
+}
+
+	argv[optind-1] = prgname;
+
+	optind = 0; /* reset getopt lib */
+	return 0;
+}
+
+/* Main function, does initialization and calls the per-lcore functions */
+int
+MAIN(int argc, char *argv[])
+{
+	struct rte_mempool *mbuf_pool;
+	struct rte_distributor *d;
+	struct rte_ring *output_ring;
+	unsigned lcore_id, worker_id = 0;
+	unsigned nb_ports;
+	uint8_t portid;
+	uint8_t nb_ports_available;
+
+	/* catch ctrl-c so we can print on exit */
+	signal(SIGINT, int_handler);
+
+	/* init EAL */
+	int ret = rte_eal_init(argc, argv);
+	if (ret < 0)
+		rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
+	argc -= ret;
+	argv += ret;
+
+	/* parse application arguments (after the EAL ones) */
+	ret = parse_args(argc, argv);
+	if (ret < 0)
+		rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n");
+
+	if (rte_lcore_count() < 3)
+		rte_exit(EXIT_FAILURE, "Error, This application needs at "
+				"least 3 logical cores to run:\n"
+				"1 lcore for packet RX and distribution\n"
+				"1 lcore for packet TX\n"
+				"and at least 1 lcore for worker threads\n");
+
+	if (rte_eal_pci_probe() != 0)
+		rte_exit(EXIT_FAILURE, "Error with PCI probing\n");
+
+	nb_ports = rte_eth_dev_count();
+	if (nb_ports == 0)
+		rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n");
+	if (nb_ports != 1 && (nb_ports & 1))
+		rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except "
+				"when using a single port\n");
+
+	mbuf_pool = rte_mempool_create("MBUF_POOL", NUM_MBUFS * nb_ports,
+			MBUF_SIZE, MBUF_CACHE_SIZE,
+			sizeof(struct rte_pktmbuf_pool_private),
+			rte_pktmbuf_pool_init, NULL,
+			rte_pktmbuf_init, NULL,
+			rte_socket_id(), 0);
+	if (mbuf_pool == NULL)
+		rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");
+
+	nb_ports_available = nb_ports;
+
+	/* initialize all ports */
+	for (portid = 0; portid < nb_ports; portid++) {
+		/* skip ports that are not enabled */
+		if ((enabled_port_mask & (1 << portid)) == 0) {
+			printf("\nSkipping disabled port %d\n", portid);
+			nb_ports_available--;
+			continue;
+		}
+		/* init port */
+		printf("Initializing port %u... done\n", (unsigned) portid);
+
+		if (port_init(portid, mbuf_pool) != 0)
+			rte_exit(EXIT_FAILURE, "Cannot initialize port %"PRIu8"\n",
+					portid);
+	}
+
+	if (!nb_ports_available) {
+		rte_exit(EXIT_FAILURE,
+				"All available ports are disabled. Please set portmask.\n");
+	}
+
+	d = rte_distributor_create("PKT_DIST", rte_socket_id(),
+			rte_lcore_count() - 2);
+	if (d == NULL)
+		rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
+
+	/* scheduler ring is read only by the transmitter core, but written to
+	 * by multiple threads
+	 */
+	output_ring = rte_ring_create("Output_ring", RTE_RING_SZ,
+			rte_socket_id(), RING_F_SC_DEQ);
+	if (output_ring == NULL)
+		rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
+
+	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
+		if (worker_id == rte_lcore_count() - 2)
+			rte_eal_remote_launch((lcore_function_t *)lcore_tx,
+					output_ring, lcore_id);
+		else {
+			struct lcore_params *p =
+					rte_malloc(NULL, sizeof(*p), 0);
+			if (!p)
+				rte_panic("malloc failure\n");
+			*p = (struct lcore_params){worker_id, d, output_ring};
+			rte_eal_remote_launch((lcore_function_t *)lcore_worker,
+					p, lcore_id);
+		}
+		worker_id++;
+	}
+	/* call lcore_main on master core only */
+	struct lcore_params p = { 0, d, output_ring };
+	lcore_rx(&p);
+	return 0;
+}
diff --git a/examples/distributor_app/main.h b/examples/distributor_app/main.h
new file mode 100644
index 0000000..2682d15
--- /dev/null
+++ b/examples/distributor_app/main.h
@@ -0,0 +1,46 @@ 
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _MAIN_H_
+#define _MAIN_H_
+
+
+#ifdef RTE_EXEC_ENV_BAREMETAL
+#define MAIN _main
+#else
+#define MAIN main
+#endif
+
+int MAIN(int argc, char *argv[]);
+
+#endif /* ifndef _MAIN_H_ */