From patchwork Wed Nov 16 18:00:07 2016 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Van Haaren, Harry" X-Patchwork-Id: 17053 Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [IPv6:::1]) by dpdk.org (Postfix) with ESMTP id 441726893; Wed, 16 Nov 2016 19:01:35 +0100 (CET) Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id 84D3D567E for ; Wed, 16 Nov 2016 19:00:29 +0100 (CET) Received: from fmsmga001.fm.intel.com ([10.253.24.23]) by orsmga101.jf.intel.com with ESMTP; 16 Nov 2016 10:00:28 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos; i="5.31,649,1473145200"; d="scan'208"; a="1069396502" Received: from sie-lab-212-222.ir.intel.com (HELO silpixa00398672.ir.intel.com) ([10.237.212.222]) by fmsmga001.fm.intel.com with ESMTP; 16 Nov 2016 10:00:27 -0800 From: Harry van Haaren To: dev@dpdk.org Cc: Harry van Haaren , Gage Eads , Bruce Richardson Date: Wed, 16 Nov 2016 18:00:07 +0000 Message-Id: <1479319207-130646-8-git-send-email-harry.van.haaren@intel.com> X-Mailer: git-send-email 2.7.4 In-Reply-To: <1479319207-130646-1-git-send-email-harry.van.haaren@intel.com> References: <1479319207-130646-1-git-send-email-harry.van.haaren@intel.com> Subject: [dpdk-dev] [PATCH 7/7] examples/eventdev_pipeline: adding example X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: patches and discussions about DPDK List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" This patch adds a sample app to the examples/ directory, which can be used as a reference application and for general testing. The application requires two ethdev ports and expects traffic to be flowing. The application must be run with the --vdev flags as follows to indicate to EAL that a virtual eventdev device called "evdev_sw0" is available to be used: ./build/eventdev_pipeline --vdev evdev_sw0 The general flow of the traffic is as follows: Rx core -> Atomic Queue => 4 worker cores => TX core A scheduler core is required to do the packet scheduling, making this configuration require 7 cores (Rx, Tx, Scheduler, and 4 workers). Finally a master core brings the core count to 8 for this configuration. The application can be configured for various numbers of flows and worker cores. Run the application with -h for details. Signed-off-by: Gage Eads Signed-off-by: Bruce Richardson Signed-off-by: Harry van Haaren --- examples/eventdev_pipeline/Makefile | 49 +++ examples/eventdev_pipeline/main.c | 718 ++++++++++++++++++++++++++++++++++++ 2 files changed, 767 insertions(+) create mode 100644 examples/eventdev_pipeline/Makefile create mode 100644 examples/eventdev_pipeline/main.c diff --git a/examples/eventdev_pipeline/Makefile b/examples/eventdev_pipeline/Makefile new file mode 100644 index 0000000..bab8916 --- /dev/null +++ b/examples/eventdev_pipeline/Makefile @@ -0,0 +1,49 @@ +# BSD LICENSE +# +# Copyright(c) 2016 Intel Corporation. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Intel Corporation nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +ifeq ($(RTE_SDK),) +$(error "Please define RTE_SDK environment variable") +endif + +# Default target, can be overriden by command line or environment +RTE_TARGET ?= x86_64-native-linuxapp-gcc + +include $(RTE_SDK)/mk/rte.vars.mk + +# binary name +APP = eventdev_pipeline + +# all source are stored in SRCS-y +SRCS-y := main.c + +CFLAGS += -O3 +CFLAGS += $(WERROR_FLAGS) + +include $(RTE_SDK)/mk/rte.extapp.mk diff --git a/examples/eventdev_pipeline/main.c b/examples/eventdev_pipeline/main.c new file mode 100644 index 0000000..6a8052c --- /dev/null +++ b/examples/eventdev_pipeline/main.c @@ -0,0 +1,718 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2016 Intel Corporation. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#define BATCH_SIZE 32 + +static unsigned int num_workers = 4; +static unsigned long num_packets = (1L << 25); /* do ~32M packets */ +static unsigned int num_fids = 16; +static unsigned int num_priorities = 1; +static int sched_type = RTE_SCHED_TYPE_ATOMIC; + +struct prod_data { + uint8_t event_dev_id; + uint8_t event_port_id; + int32_t qid; + unsigned num_ports; +}; + +struct cons_data { + uint8_t event_dev_id; + uint8_t event_port_id; +}; + +struct worker_data { + uint8_t event_dev_id; + int event_port_id; + int32_t qid; +}; + +static volatile int done = 0; +static int quiet = 0; +struct rte_mempool *mp; + +static int +worker(void *arg) +{ + struct rte_event rcv_events[BATCH_SIZE]; + + struct worker_data *data = (struct worker_data *)arg; + uint8_t event_dev_id = data->event_dev_id; + uint8_t event_port_id = data->event_port_id; + int32_t qid = data->qid; + size_t sent = 0, received = 0; + + while (!done) { + uint16_t i; + + uint16_t n = rte_event_dequeue_burst(event_dev_id, + event_port_id, + rcv_events, + RTE_DIM(rcv_events), + false); + if (n == 0){ + rte_pause(); + /* Flush any buffered events */ + rte_event_dequeue(event_dev_id, + event_port_id, + NULL, + false); + continue; + } + received += n; + + for (i = 0; i < n; i++) { + struct ether_hdr *eth; + struct ether_addr addr; + struct rte_event *ev = &rcv_events[i]; + + ev->queue_id = qid; + ev->flow_id = 0; + ev->priority = 0; + ev->sched_type = RTE_SCHED_TYPE_ATOMIC; + ev->operation = RTE_EVENT_OP_FORWARD; + + uint64_t now = rte_rdtsc(); + while(now + 750 > rte_rdtsc()) {} + + /* change mac addresses on packet */ + eth = rte_pktmbuf_mtod(ev->mbuf, struct ether_hdr *); + ether_addr_copy(ð->d_addr, &addr); + ether_addr_copy(ð->s_addr, ð->d_addr); + ether_addr_copy(&addr, ð->s_addr); + } + int ret = rte_event_enqueue_burst(event_dev_id, event_port_id, + rcv_events, n, false); + if (ret != n) + rte_panic("worker %u thread failed to enqueue event\n", + rte_lcore_id()); + } + + /* Flush the buffered events */ + rte_event_dequeue(event_dev_id, event_port_id, NULL, false); + + if (!quiet) + printf(" worker %u thread done. RX=%zu TX=%zu\n", + rte_lcore_id(), received, sent); + + return 0; +} + +static int +scheduler(void *arg) +{ + RTE_SET_USED(arg); + size_t loops = 0; + + while (!done) { + /* Assumes an event dev ID of 0 */ + rte_event_schedule(0); + loops++; + } + + printf(" scheduler thread done. loops=%zu\n", loops); + + return 0; +} + +static int +consumer(void *arg) +{ + struct rte_event events[BATCH_SIZE]; + + struct cons_data *data = (struct cons_data *)arg; + uint8_t event_dev_id = data->event_dev_id; + uint8_t event_port_id = data->event_port_id; + struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS]; + size_t npackets = num_packets; + size_t received = 0; + size_t received_printed = 0; /* tracks when we last printed receive count */ + uint64_t start_time = 0; + uint64_t freq_khz = rte_get_timer_hz() / 1000; + uint64_t dropped = 0; + unsigned i; + + for (i = 0; i < rte_eth_dev_count(); i++) { + tx_buf[i] = rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(32), 0); + if (tx_buf[i] == NULL) + rte_panic("Out of memory\n"); + rte_eth_tx_buffer_init(tx_buf[i], 32); + rte_eth_tx_buffer_set_err_callback(tx_buf[i], + rte_eth_tx_buffer_count_callback, &dropped); + } + + while (!done) { + uint16_t i; + uint16_t n = rte_event_dequeue_burst(event_dev_id, + event_port_id, + events, + RTE_DIM(events), + false); + + if (n == 0){ + rte_pause(); + continue; + } + if (start_time == 0) + start_time = rte_get_timer_cycles(); + + received += n; + for (i = 0; i < n; i++) { + uint8_t outport = events[i].mbuf->port; + rte_eth_tx_buffer(outport, 0, tx_buf[outport], events[i].mbuf); + } + + if (!quiet && received >= received_printed + (1<<22)) { + printf("# consumer RX=%zu, time %"PRIu64"ms\n", + received, + (rte_get_timer_cycles() - start_time) / freq_khz); + received_printed = received; + } + + if (num_packets > 0 && npackets > 0) { + npackets -= n; + if (npackets == 0) + done = 1; + } + } + + for (i = 0; i < rte_eth_dev_count(); i++) + rte_eth_tx_buffer_flush(i, 0, tx_buf[i]); + + printf(" consumer done! RX=%zu, time %"PRIu64"ms\n", + received, + (rte_get_timer_cycles() - start_time) / freq_khz); + + return 0; +} + +static int +producer(void *arg) +{ + + struct prod_data *data = (struct prod_data *)arg; + size_t npackets = num_packets; + unsigned i; + uint64_t mbuf_seqno = 0; + size_t sent = 0; + uint8_t eth_port = 0; + uint8_t event_dev_id = data->event_dev_id; + uint8_t event_port_id = data->event_port_id; + int fid_counter = 0; + + while (!done) { + int ret; + unsigned num_ports = data->num_ports; + int32_t qid = data->qid; + struct rte_event events[BATCH_SIZE]; + struct rte_mbuf *mbufs[BATCH_SIZE]; + + uint16_t nb_rx = rte_eth_rx_burst(eth_port, 0, mbufs, BATCH_SIZE); + if (++eth_port == num_ports) + eth_port = 0; + if (nb_rx == 0) { + rte_pause(); + /* Flush any buffered events */ + rte_event_dequeue(event_dev_id, + event_port_id, + NULL, + false); + continue; + } + + for (i = 0; i < nb_rx; i++) { + struct rte_mbuf *m = mbufs[i]; + struct rte_event *ev = &events[i]; + + ev->queue_id = qid; + ev->flow_id = fid_counter++ % 6; + ev->priority = 0; + m->udata64 = mbuf_seqno++; + ev->mbuf = m; + ev->sched_type = sched_type; + ev->operation = RTE_EVENT_OP_NEW; + } + + do { + ret = rte_event_enqueue_burst(event_dev_id, + event_port_id, + events, + nb_rx, + false); + } while (ret == -ENOSPC); + if (ret != nb_rx) + rte_panic("producer thread failed to enqueue *all* events\n"); + + sent += nb_rx; + + if (num_packets > 0 && npackets > 0) { + npackets -= nb_rx; + if (npackets == 0) + break; + } + } + + /* Flush any buffered events */ + while (!done) + rte_event_dequeue(event_dev_id, event_port_id, NULL, false); + + printf(" prod thread done! TX=%zu across %u flows\n", sent, num_fids); + + return 0; +} + +static struct option long_options[] = { + {"workers", required_argument, 0, 'w'}, + {"packets", required_argument, 0, 'n'}, + {"atomic-flows", required_argument, 0, 'f'}, + {"priority", required_argument, 0, 'p'}, + {"ordered", no_argument, 0, 'o'}, + {"quiet", no_argument, 0, 'q'}, + {0, 0, 0, 0} +}; + +static void +usage(void) +{ + const char *usage_str = + " Usage: eventdev_pipeline [options]\n" + " Options:\n" + " -w, --workers=N Use N workers (default 4)\n" + " -n, --packets=N Send N packets (default ~32M), 0 implies no limit\n" + " -f, --atomic-flows=N Use N random flows from 1 to N (default 16)\n" + " -p, --priority=N Use N number of priorities (default 1)\n" + " -o, --ordered Use ordered scheduling\n" + " -q, --quiet Minimize printed output\n" + "\n"; + + fprintf(stderr, "%s", usage_str); + exit(1); +} + +static void +parse_app_args(int argc, char** argv) +{ + /* Parse cli options*/ + int option_index; + int c; + opterr = 0; + + for (;;) { + c = getopt_long(argc, argv, "w:n:f:p:oq", long_options, + &option_index); + if (c == -1) + break; + + switch (c) { + case 'w': + num_workers = (unsigned int)atoi(optarg); + break; + case 'n': + num_packets = (unsigned long )atol(optarg); + break; + case 'f': + num_fids = (unsigned int)atoi(optarg); + break; + case 'p': + num_priorities = (unsigned int)atoi(optarg); + break; + case 'o': + sched_type = RTE_SCHED_TYPE_ORDERED; + break; + case 'q': + quiet = 1; + break; + default: + usage(); + } + } + if (num_workers == 0) + usage(); +} + +/* + * Initializes a given port using global settings and with the RX buffers + * coming from the mbuf_pool passed as a parameter. + */ +static inline int +port_init(uint8_t port, struct rte_mempool *mbuf_pool) +{ + static const struct rte_eth_conf port_conf_default = { + .rxmode = { .max_rx_pkt_len = ETHER_MAX_LEN } + }; + const uint16_t rx_rings = 1, tx_rings = 1; + const uint16_t rx_ring_size = 512, tx_ring_size = 512; + struct rte_eth_conf port_conf = port_conf_default; + int retval; + uint16_t q; + + if (port >= rte_eth_dev_count()) + return -1; + + /* Configure the Ethernet device. */ + retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf); + if (retval != 0) + return retval; + + /* Allocate and set up 1 RX queue per Ethernet port. */ + for (q = 0; q < rx_rings; q++) { + retval = rte_eth_rx_queue_setup(port, q, rx_ring_size, + rte_eth_dev_socket_id(port), NULL, mbuf_pool); + if (retval < 0) + return retval; + } + + /* Allocate and set up 1 TX queue per Ethernet port. */ + for (q = 0; q < tx_rings; q++) { + retval = rte_eth_tx_queue_setup(port, q, tx_ring_size, + rte_eth_dev_socket_id(port), NULL); + if (retval < 0) + return retval; + } + + /* Start the Ethernet port. */ + retval = rte_eth_dev_start(port); + if (retval < 0) + return retval; + + /* Display the port MAC address. */ + struct ether_addr addr; + rte_eth_macaddr_get(port, &addr); + printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8 + " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n", + (unsigned)port, + addr.addr_bytes[0], addr.addr_bytes[1], + addr.addr_bytes[2], addr.addr_bytes[3], + addr.addr_bytes[4], addr.addr_bytes[5]); + + /* Enable RX in promiscuous mode for the Ethernet device. */ + rte_eth_promiscuous_enable(port); + + return 0; +} + +static int +init_ports(unsigned num_ports) +{ + uint8_t portid; + + mp = rte_pktmbuf_pool_create("packet_pool", + /* mbufs */ 16384 * num_ports, + /* cache_size */ 512, + /* priv_size*/ 0, + /* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE, + rte_socket_id()); + + for (portid = 0; portid < num_ports; portid++) + if (port_init(portid, mp) != 0) + rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n", + portid); + return 0; +} + +static uint8_t +setup_event_dev(struct prod_data *prod_data, + struct cons_data *cons_data, + struct worker_data *worker_data) +{ + struct rte_event_dev_config config; + struct rte_event_queue_conf queue_config; + struct rte_event_port_conf port_config; + struct rte_event_queue_link link; + int prod_port; + int cons_port; + int qid0; + int cons_qid; + int prod_qid; + unsigned i; + int ret; + int8_t id; + + const char *dev_name = "evdev_sw0"; + id = rte_event_dev_get_dev_id(dev_name); + if (id < 0) + rte_panic("Failed to get %s device ID\n", dev_name); + + config.nb_event_queues = 3; + config.nb_event_ports = num_workers + 2; + config.nb_events_limit = 256; + config.dequeue_wait_ns = 0; + + ret = rte_event_dev_configure(id, &config); + if (ret) + rte_panic("Failed to configure the event dev\n"); + + /* Create queues */ + queue_config.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY; + queue_config.priority = 0; + + qid0 = 0; + ret = rte_event_queue_setup(id, qid0, &queue_config); + if (ret < 0) + rte_panic("Failed to create the scheduled QID\n"); + + queue_config.event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_CONSUMER; + queue_config.priority = 0; + + cons_qid = 1; + ret = rte_event_queue_setup(id, cons_qid, &queue_config); + if (ret < 0) + rte_panic("Failed to create the cons directed QID\n"); + + queue_config.event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_CONSUMER; + queue_config.priority = 0; + + prod_qid = 2; + ret = rte_event_queue_setup(id, prod_qid, &queue_config); + if (ret < 0) + rte_panic("Failed to create the prod directed QID\n"); + + /* Create ports */ +#define LB_PORT_DEPTH 16 +#define DIR_PORT_DEPTH 32 + port_config.enqueue_queue_depth = LB_PORT_DEPTH; + port_config.dequeue_queue_depth = LB_PORT_DEPTH; + port_config.new_event_threshold = 255; + + prod_port = 0; + ret = rte_event_port_setup(id, prod_port, &port_config); + if (ret < 0) + rte_panic("Failed to create the producer port\n"); + + cons_port = 1; + port_config.enqueue_queue_depth = DIR_PORT_DEPTH; + port_config.dequeue_queue_depth = DIR_PORT_DEPTH; + ret = rte_event_port_setup(id, cons_port, &port_config); + if (ret < 0) + rte_panic("Failed to create the consumer port\n"); + + port_config.enqueue_queue_depth = LB_PORT_DEPTH; + port_config.dequeue_queue_depth = LB_PORT_DEPTH; + for (i = 0; i < num_workers; i++) { + worker_data[i].event_port_id = i + 2; + ret = rte_event_port_setup(id, worker_data[i].event_port_id, &port_config); + if (ret < 0) + rte_panic("Failed to create worker port #%d\n", i); + } + + /* Map ports/qids */ + for (i = 0; i < num_workers; i++) { + link.queue_id = qid0; + link.priority = 0; + + ret = rte_event_port_link(id, worker_data[i].event_port_id, &link, 1); + if (ret != 1) + rte_panic("Failed to map worker%d port to qid0\n", i); + } + + /* Link consumer port to its QID */ + link.queue_id = cons_qid; + link.priority = 0; + + ret = rte_event_port_link(id, cons_port, &link, 1); + if (ret != 1) + rte_panic("Failed to map consumer port to cons_qid\n"); + + /* Link producer port to its QID */ + link.queue_id = prod_qid; + link.priority = 0; + + ret = rte_event_port_link(id, prod_port, &link, 1); + if (ret != 1) + rte_panic("Failed to map producer port to prod_qid\n"); + + /* Dispatch to slaves */ + *prod_data = (struct prod_data){.event_dev_id = id, + .event_port_id = prod_port, + .qid = qid0}; + *cons_data = (struct cons_data){.event_dev_id = id, + .event_port_id = cons_port}; + + for (i = 0; i < num_workers; i++) { + struct worker_data *w = &worker_data[i]; + w->event_dev_id = id; + w->qid = cons_qid; + } + + if (rte_event_dev_start(id) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + return (uint8_t) id; +} + +static void sighndlr(int sig) +{ + /* Ctlr-Z to dump stats */ + if(sig == SIGTSTP) { + rte_mempool_dump(stdout, mp); + rte_event_dev_dump(stdout, 0); + } + /* Ctlr-C to exit */ + if(sig == SIGINT) + rte_exit(0, "sigint arrived, quitting\n"); +} + +int +main(int argc, char **argv) +{ + signal(SIGINT , sighndlr); + signal(SIGTSTP, sighndlr); + + struct prod_data prod_data = {0}; + struct cons_data cons_data = {0}; + struct worker_data *worker_data; + unsigned nworkers = 0; + unsigned num_ports; + int lcore_id; + int err; + int has_prod = 0; + int has_cons = 0; + int has_scheduler = 0; + + err = rte_eal_init(argc, argv); + if (err < 0) + rte_panic("Invalid EAL arguments\n"); + + argc -= err; + argv += err; + + /* Parse cli options*/ + parse_app_args(argc, argv); + + num_ports = rte_eth_dev_count(); + if (num_ports == 0) + rte_panic("No ethernet ports found\n"); + + if (!quiet) { + printf(" Config:\n"); + printf("\tports: %u\n", num_ports); + printf("\tworkers: %u\n", num_workers); + printf("\tpackets: %lu\n", num_packets); + printf("\tflows: %u\n", num_fids); + printf("\tpriorities: %u\n", num_priorities); + if (sched_type == RTE_SCHED_TYPE_ORDERED) + printf("\tqid0 type: ordered\n"); + if (sched_type == RTE_SCHED_TYPE_ATOMIC) + printf("\tqid0 type: atomic\n"); + printf("\n"); + } + + const unsigned cores_needed = num_workers + + /*main*/1 + + /*sched*/1 + + /*TX*/1 + + /*RX*/1; + + if (!quiet) { + printf("Number of cores available: %u\n", rte_lcore_count()); + printf("Number of cores to be used: %u\n", cores_needed); + } + + if (rte_lcore_count() < cores_needed) + rte_panic("Too few cores\n"); + + const uint8_t ndevs = rte_event_dev_count(); + if (ndevs == 0) + rte_panic("No event devs found. Do you need to pass in a --vdev flag?\n"); + if (ndevs > 1) + fprintf(stderr, "Warning: More than one event dev, using idx 0"); + + worker_data = rte_calloc(0, num_workers, sizeof(worker_data[0]), 0); + if (worker_data == NULL) + rte_panic("rte_calloc failed\n"); + + uint8_t id = setup_event_dev(&prod_data, &cons_data, worker_data); + RTE_SET_USED(id); + + prod_data.num_ports = num_ports; + init_ports(num_ports); + + RTE_LCORE_FOREACH_SLAVE(lcore_id) { + if (has_prod && has_cons && has_scheduler && nworkers == num_workers) + break; + + if (!has_scheduler) { + err = rte_eal_remote_launch(scheduler, NULL, lcore_id); + if (err) + rte_panic("Failed to launch scheduler\n"); + + has_scheduler = 1; + continue; + } + + if (nworkers < num_workers) { + err = rte_eal_remote_launch(worker, &worker_data[nworkers], lcore_id); + if (err) + rte_panic("Failed to launch worker%d\n", nworkers); + nworkers++; + continue; + } + + if (!has_cons) { + err = rte_eal_remote_launch(consumer, &cons_data, lcore_id); + if (err) + rte_panic("Failed to launch consumer\n"); + has_cons = 1; + continue; + } + + if (!has_prod) { + err = rte_eal_remote_launch(producer, &prod_data, lcore_id); + if (err) + rte_panic("Failed to launch producer\n"); + has_prod = 1; + continue; + } + } + + rte_eal_mp_wait_lcore(); + + /* Cleanup done automatically by kernel on app exit */ + + return 0; +}