From patchwork Fri Nov 24 11:23:51 2017 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Liang, Ma" X-Patchwork-Id: 31639 X-Patchwork-Delegate: jerinj@marvell.com Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id A2BB2548B; Fri, 24 Nov 2017 12:24:11 +0100 (CET) Received: from mga06.intel.com (mga06.intel.com [134.134.136.31]) by dpdk.org (Postfix) with ESMTP id 2E97D2C3F for ; Fri, 24 Nov 2017 12:24:07 +0100 (CET) Received: from fmsmga005.fm.intel.com ([10.253.24.32]) by orsmga104.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 24 Nov 2017 03:24:05 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.44,447,1505804400"; d="scan'208";a="179627939" Received: from silpixa00398162.ir.intel.com (HELO silpixa00398162.ger.corp.intel.com) ([10.237.223.171]) by fmsmga005.fm.intel.com with ESMTP; 24 Nov 2017 03:24:05 -0800 From: liang.j.ma@intel.com To: jerin.jacob@caviumnetworks.com Cc: dev@dpdk.org, harry.van.haaren@intel.com, bruce.richardson@intel.com, deepak.k.jain@intel.com, john.geary@intel.com Date: Fri, 24 Nov 2017 11:23:51 +0000 Message-Id: <1511522632-139652-7-git-send-email-liang.j.ma@intel.com> X-Mailer: git-send-email 2.7.5 In-Reply-To: <1511522632-139652-1-git-send-email-liang.j.ma@intel.com> References: <1511522632-139652-1-git-send-email-liang.j.ma@intel.com> Subject: [dpdk-dev] [PATCH 6/7] examples/eventdev_pipeline_opdl: adding example X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" From: Liang Ma This patch adds a sample app to the examples/ directory, which can be used as a reference application and for general testing. The application requires two ethdev ports and expects traffic to be flowing. The application must be run with the --vdev flags as follows to indicate to EAL that a virtual eventdev device called "evdev_opdl0" is available to be used: ./build/eventdev_pipeline_opdl_pmd --vdev=event_opdl0 The general flow of the traffic is as follows: Rx core -> Ordered Queue (4 worker) => Ordered Queue (4 worker) => TX core Signed-off-by: Liang Ma Signed-off-by: Peter, Mccarthy --- examples/eventdev_pipeline_opdl_pmd/Makefile | 49 ++ examples/eventdev_pipeline_opdl_pmd/main.c | 766 +++++++++++++++++++++++++++ 2 files changed, 815 insertions(+) create mode 100644 examples/eventdev_pipeline_opdl_pmd/Makefile create mode 100644 examples/eventdev_pipeline_opdl_pmd/main.c diff --git a/examples/eventdev_pipeline_opdl_pmd/Makefile b/examples/eventdev_pipeline_opdl_pmd/Makefile new file mode 100644 index 0000000..63aea65 --- /dev/null +++ b/examples/eventdev_pipeline_opdl_pmd/Makefile @@ -0,0 +1,49 @@ +# BSD LICENSE +# +# Copyright(c) 2016-2017 Intel Corporation. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Intel Corporation nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +ifeq ($(RTE_SDK),) +$(error "Please define RTE_SDK environment variable") +endif + +# Default target, can be overridden by command line or environment +RTE_TARGET ?= x86_64-native-linuxapp-gcc + +include $(RTE_SDK)/mk/rte.vars.mk + +# binary name +APP = eventdev_pipeline_opdl_pmd + +# all source are stored in SRCS-y +SRCS-y := main.c + +CFLAGS += -O3 +CFLAGS += $(WERROR_FLAGS) + +include $(RTE_SDK)/mk/rte.extapp.mk diff --git a/examples/eventdev_pipeline_opdl_pmd/main.c b/examples/eventdev_pipeline_opdl_pmd/main.c new file mode 100644 index 0000000..6b9e48a --- /dev/null +++ b/examples/eventdev_pipeline_opdl_pmd/main.c @@ -0,0 +1,766 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2016-2017 Intel Corporation. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define BATCH_SIZE 32 + +static struct rte_event_dev_config config = { + .nb_event_queues = 3, + .nb_event_ports = 10, + .nb_events_limit = 4096, + .nb_event_queue_flows = 1024, + .nb_event_port_dequeue_depth = 128, + .nb_event_port_enqueue_depth = 128, +}; + +static struct rte_event_port_conf wkr_p_conf = { + .dequeue_depth = 128, + .enqueue_depth = 128, + .new_event_threshold = 1024, +}; + +static struct rte_event_queue_conf wkr_q_conf = { + .event_queue_cfg = 0, + .schedule_type = RTE_SCHED_TYPE_ORDERED, + .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, + .nb_atomic_flows = 1024, + .nb_atomic_order_sequences = 1024, +}; + +static struct rte_event_port_conf tx_p_conf = { + .dequeue_depth = 32, + .enqueue_depth = 32, + .new_event_threshold = 32, +}; + +static const struct rte_event_queue_conf tx_q_conf = { + .event_queue_cfg = 0, + .schedule_type = RTE_SCHED_TYPE_ORDERED, + .nb_atomic_flows = 1024, + .nb_atomic_order_sequences = 1024, +}; + +static struct rte_event_port_conf rx_p_conf = { + .dequeue_depth = 32, + .enqueue_depth = 32, + .new_event_threshold = 1024, +}; + +struct prod_data { + uint8_t dev_id; + uint8_t port_id; + int32_t qid; + unsigned int num_nic_ports; +} __rte_cache_aligned; + +static struct prod_data producer_data; + +struct cons_data { + uint8_t dev_id; + uint8_t port_id; + struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS]; +} __rte_cache_aligned; + +static struct cons_data consumer_data; + +struct worker_data { + uint8_t dev_id; + uint8_t port_id; + uint8_t thread_id; +}; + +#define QUEUE_0_ID 0 +#define QUEUE_1_ID 1 +#define QUEUE_2_ID 2 + + +#define NUM_WORKERS 8 +static struct worker_data worker_data[NUM_WORKERS]; + +volatile int done; + +static uint8_t port_map_4[4] = { 1, 2, 3, 0 }; +static uint8_t port_map_3[3] = { 1, 2, 0 }; +static uint8_t port_map_2[2] = { 1, 0 }; +static uint8_t port_map_1[1] = { 0 }; +static uint8_t *port_map; + +__thread long long packet_num; + +static void +eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent, + void *userdata) +{ + int port_id = (uintptr_t) userdata; + unsigned int _sent = 0; + + do { + /* Note: hard-coded TX queue */ + _sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent], + unsent - _sent); + } while (_sent != unsent); +} + +/* + * Initializes a given port using global settings and with the RX buffers + * coming from the mbuf_pool passed as a parameter. + */ +static inline int +port_init(uint8_t port, struct rte_mempool *mbuf_pool) +{ + static const struct rte_eth_conf port_conf_default = { + .rxmode = { + .mq_mode = ETH_MQ_RX_RSS, + .max_rx_pkt_len = ETHER_MAX_LEN + }, + .rx_adv_conf = { + .rss_conf = { + .rss_hf = ETH_RSS_IP | + ETH_RSS_TCP | + ETH_RSS_UDP, + } + } + }; + const uint16_t rx_rings = 1, tx_rings = 1; + const uint16_t rx_ring_size = 512, tx_ring_size = 512; + struct rte_eth_conf port_conf = port_conf_default; + int retval; + uint16_t q; + + if (port >= rte_eth_dev_count()) + return -1; + + /* Configure the Ethernet device. */ + retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf); + if (retval != 0) + return retval; + + /* Allocate and set up 1 RX queue per Ethernet port. */ + for (q = 0; q < rx_rings; q++) { + retval = rte_eth_rx_queue_setup(port, q, rx_ring_size, + rte_eth_dev_socket_id(port), NULL, mbuf_pool); + if (retval < 0) + return retval; + } + + /* Allocate and set up 1 TX queue per Ethernet port. */ + for (q = 0; q < tx_rings; q++) { + retval = rte_eth_tx_queue_setup(port, q, tx_ring_size, + rte_eth_dev_socket_id(port), NULL); + if (retval < 0) + return retval; + } + + /* Start the Ethernet port. */ + retval = rte_eth_dev_start(port); + if (retval < 0) + return retval; + + /* Display the port MAC address. */ + struct ether_addr addr; + rte_eth_macaddr_get(port, &addr); + printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8 + " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n", + (unsigned int)port, + addr.addr_bytes[0], addr.addr_bytes[1], + addr.addr_bytes[2], addr.addr_bytes[3], + addr.addr_bytes[4], addr.addr_bytes[5]); + + /* Enable RX in promiscuous mode for the Ethernet device. */ + rte_eth_promiscuous_enable(port); + + return 0; +} + +static int +init_ports(unsigned int num_ports) +{ + uint8_t portid; + unsigned int i; + + struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool", + /* mbufs */ 16384 * num_ports, + /* cache_size */ 512, + /* priv_size*/ 0, + /* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE, + rte_socket_id()); + + for (portid = 0; portid < num_ports; portid++) + if (port_init(portid, mp) != 0) + rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n", + portid); + + for (i = 0; i < num_ports; i++) { + void *userdata = (void *)(uintptr_t) i; + consumer_data.tx_buf[i] = + rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(32), 0); + + rte_eth_promiscuous_enable(i); + if (consumer_data.tx_buf[i] == NULL) + rte_panic("Out of memory\n"); + rte_eth_tx_buffer_init(consumer_data.tx_buf[i], 32); + rte_eth_tx_buffer_set_err_callback(consumer_data.tx_buf[i], + eth_tx_buffer_retry, + userdata); + } + + return 0; +} + +static int +rx_thread(void *arg) +{ + uint16_t nb_rx; + + struct rte_mbuf *mbufs[BATCH_SIZE*3]; + struct rte_event ev[BATCH_SIZE*3] = {0}; + uint32_t i, j; + uint16_t nb_enqueue = 0; + + int32_t qid = producer_data.qid; + uint8_t dev_id = producer_data.dev_id; + uint8_t port_id = producer_data.port_id; + uint32_t prio_idx = 0; + uint32_t num_ports = rte_eth_dev_count(); + RTE_SET_USED(arg); + + printf("Rx thread port_id %d started\n", port_id); + + packet_num = 0; + + while (!done) { + + for (j = 0; j < num_ports; j++) { + + nb_rx = rte_eth_rx_burst(j, 0, mbufs, BATCH_SIZE); + + for (i = 0; i < nb_rx; i++) { + ev[i].flow_id = 1; + ev[i].op = RTE_EVENT_OP_NEW; + ev[i].sched_type = RTE_SCHED_TYPE_ORDERED; + ev[i].queue_id = qid; + ev[i].event_type = RTE_EVENT_TYPE_ETHDEV; + ev[i].sub_event_type = 0; + ev[i].priority = RTE_EVENT_DEV_PRIORITY_NORMAL; + ev[i].mbuf = mbufs[i]; + RTE_SET_USED(prio_idx); + } + + nb_enqueue = rte_event_enqueue_burst(dev_id, + port_id, ev, nb_rx); + + packet_num += nb_enqueue; + + if (nb_enqueue != nb_rx) { + for (i = nb_enqueue; i < nb_rx; i++) + rte_pktmbuf_free(mbufs[i]); + } + + + } + + } + + printf("Rx done, total packet num is %lld\n", packet_num); + + return 0; +} + +static inline void +work(struct rte_mbuf *m) +{ + m->port = port_map[m->port]; +} + +static int +worker_thread(void *arg) +{ + unsigned int i; + struct rte_event events[BATCH_SIZE] = {0}; + struct worker_data *w_data = (struct worker_data *)arg; + uint8_t dev_id = w_data->dev_id; + uint8_t port_id = w_data->port_id; + RTE_SET_USED(arg); + uint16_t nb_event_deq; + + printf("Worker thread %d port_id %d started\n", + w_data->thread_id, port_id); + + packet_num = 0; + + while (!done) { + nb_event_deq = rte_event_dequeue_burst(dev_id, + port_id, + events, + RTE_DIM(events), + 0); + + if (nb_event_deq == 0) { + rte_pause(); + continue; + } + + for (i = 0; i < nb_event_deq; i++) { + events[i].queue_id += 1; + events[i].op = RTE_EVENT_OP_FORWARD; + events[i].sched_type = RTE_SCHED_TYPE_ORDERED; + if (w_data->thread_id > 3) + work(events[i].mbuf); + } + + packet_num += nb_event_deq; + + rte_event_enqueue_burst(dev_id, port_id, + events, nb_event_deq); + } + + return 0; +} + +static int +tx_thread(void *arg) +{ + int j; + struct rte_event events[BATCH_SIZE]; + uint8_t dev_id = consumer_data.dev_id; + uint8_t eth_dev_count, port_id = consumer_data.port_id; + + RTE_SET_USED(arg); + + eth_dev_count = rte_eth_dev_count(); + + packet_num = 0; + + while (!done) { + + uint16_t n = rte_event_dequeue_burst(dev_id, port_id, + events, RTE_DIM(events), 0); + + packet_num += n; + + int i; + + if (n == 0) { + for (j = 0; j < eth_dev_count; j++) + rte_eth_tx_buffer_flush(j, + 0, + consumer_data.tx_buf[j]); + continue; + } + + if (events[0].queue_id != QUEUE_2_ID) + printf("ERROR TX expected q_id:[%u], " + "but received:[%u]\n", + QUEUE_2_ID, events[0].queue_id); + + for (i = 0; i < n; i++) { + uint8_t outport = events[i].mbuf->port; + rte_eth_tx_buffer(outport, + 0, + consumer_data.tx_buf[outport], + events[i].mbuf); + } + } + + return 0; +} + +static int +setup_eventdev(uint8_t dev_id) +{ + uint8_t i, num_wrk_port = 8; + uint8_t ev_port = 0, queue0 = QUEUE_0_ID; + uint8_t queue1 = QUEUE_1_ID, queue2 = QUEUE_2_ID; + + int ret, ndev = rte_event_dev_count(); + if (ndev < 1) { + printf("%d: No Eventdev Devices Found\n", __LINE__); + return -1; + } + + struct rte_event_dev_info dev_info; + ret = rte_event_dev_info_get(dev_id, &dev_info); + if (ret < 0) { + printf("%s%d: Error getting device info\n", __FILE__, __LINE__); + return -1; + } + + if (dev_info.max_event_port_dequeue_depth < + config.nb_event_port_dequeue_depth) + config.nb_event_port_dequeue_depth = + dev_info.max_event_port_dequeue_depth; + if (dev_info.max_event_port_enqueue_depth < + config.nb_event_port_enqueue_depth) + config.nb_event_port_enqueue_depth = + dev_info.max_event_port_enqueue_depth; + + ret = rte_event_dev_configure(dev_id, &config); + if (ret < 0) { + printf("%s%d: Error configuring device\n", __FILE__, __LINE__); + return -1; + } + + /* Create Queues */ + if (rte_event_queue_setup(dev_id, queue0, &wkr_q_conf) < 0) { + printf("%d: error creating qid %d\n", __LINE__, queue0); + return -1; + } + + if (rte_event_queue_setup(dev_id, queue1, &wkr_q_conf) < 0) { + printf("%d: error creating qid %d\n", __LINE__, queue1); + return -1; + } + + if (rte_event_queue_setup(dev_id, queue2, &tx_q_conf) < 0) { + printf("%d: error creating qid %d\n", __LINE__, queue2); + return -1; + } + + /* Check that port configs are valid for this device */ + + if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth) + wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth; + if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth) + wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth; + + if (tx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth) + tx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth; + if (tx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth) + tx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth; + + if (rx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth) + rx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth; + if (rx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth) + rx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth; + + /* Create rx_port */ + if (rte_event_port_setup(dev_id, ev_port, &rx_p_conf) < 0) { + printf("Error setting up rx port\n"); + return -1; + } + ev_port++; + + /* Create worker ports */ + for (i = 0; i < num_wrk_port; i++) { + if (rte_event_port_setup(dev_id, ev_port, &wkr_p_conf) < 0) { + printf("Error setting up worker port %d\n", i); + return -1; + } + ev_port++; + } + + /* Create tx_port */ + if (rte_event_port_setup(dev_id, ev_port, &tx_p_conf) < 0) { + printf("Error setting up tx port\n"); + return -1; + } + ev_port++; + + /* Link ports 1 - 4 to queue 1*/ + for (i = 1; i < 5; i++) { + if (rte_event_port_link(dev_id, + i, + &queue0, + &wkr_q_conf.priority, + 1) + != 1) { + printf("%d: error creating link for port %d\n", + __LINE__, i); + return -1; + } + } + /* Link ports 5 - 8 to queue 2*/ + for (i = 5; i < 9; i++) { + if (rte_event_port_link(dev_id, + i, + &queue1, + &wkr_q_conf.priority, + 1) + != 1) { + printf("%d: error creating link for port %d\n", + __LINE__, i); + return -1; + } + } + /* Link tx port to queue 3*/ + if (rte_event_port_link(dev_id, + i, + &queue2, + &tx_q_conf.priority, + 1) != 1) { + printf("%d: error creating link for port %d\n", __LINE__, i); + return -1; + } + + if (rte_event_dev_start(dev_id) < 0) { + printf("Error starting eventdev\n"); + return -1; + } + + rte_event_dev_dump(dev_id, stdout); + + return 0; +} + +static void print_statistics(FILE *f) +{ + int num_ports = 10; /* Hard-coded for this app */ + + for (int i = 0; i < num_ports; i++) { + int num_stats, num_stats_returned; + + num_stats = rte_event_dev_xstats_names_get(0, + RTE_EVENT_DEV_XSTATS_PORT, + i, + NULL, + NULL, + 0); + if (num_stats > 0) { + + uint32_t ids[num_stats]; + struct rte_event_dev_xstats_name names[num_stats]; + uint64_t values[num_stats]; + + num_stats_returned = rte_event_dev_xstats_names_get(0, + RTE_EVENT_DEV_XSTATS_PORT, + i, + names, + ids, + num_stats); + if (num_stats == num_stats_returned) { + num_stats_returned = rte_event_dev_xstats_get(0, + RTE_EVENT_DEV_XSTATS_PORT, + i, + ids, + values, + num_stats); + if (num_stats == num_stats_returned) { + fprintf(f, + "Port : [%02u] Statistics\n", + i); + for (int j = 0; j < num_stats; j++) { + fprintf(f, + "\t%30s = %16lu\n", + names[j].name, + values[j]); + } + } + } + } + } +} + +static void +signal_handler(int signum) +{ + + if (signum == SIGINT || signum == SIGTERM) { + printf("\n\nSignal %d received, preparing to exit...\n", + signum); + done = 1; + } +} + +static int +scheduler_thread(void *arg) +{ + uint8_t dev_id = *(uint8_t *)arg; + uint32_t evdev_service_id; + + if (rte_event_dev_service_id_get(dev_id, &evdev_service_id)) + return -1; + + while (!done) { + + /* run one iteration of the service on the calling core */ + rte_service_run_iter_on_app_lcore(evdev_service_id, true); + } + return 0; +} + + +int +main(int argc, char **argv) +{ + unsigned int num_ports; + uint8_t dev_id = 0; + int err, lcore_id, this_lcore; + int i, worker_nb = 0; + int rx_th = 0, sch_th = 0, tx_th = 0; + + signal(SIGINT, signal_handler); + signal(SIGTERM, signal_handler); + signal(SIGTSTP, signal_handler); + + err = rte_eal_init(argc, argv); + if (err < 0) + rte_panic("Invalid EAL arguments\n"); + + num_ports = rte_eth_dev_count(); + if (num_ports == 4) { + port_map = port_map_4; + } else if (num_ports == 3) { + port_map = port_map_3; + } else if (num_ports == 2) { + port_map = port_map_2; + } else if (num_ports == 1) { + port_map = port_map_1; + } else { + rte_panic("Incorrect number of ethernet ports found:[%u]\n", + num_ports); + } + + const unsigned int ndevs = rte_event_dev_count(); + if (ndevs == 0) + rte_panic("No dev_id devs found. Pass in a --vdev eventdev.\n"); + if (ndevs > 1) + fprintf(stderr, "Warning: More than one eventdev, using idx 0"); + + err = setup_eventdev(dev_id); + + if (err < 0) { + fprintf(stderr, "Error: setup_eventdev failed\n"); + return -1; + } + + struct rte_event_dev_info dev_info; + err = rte_event_dev_info_get(dev_id, &dev_info); + + if (err < 0) { + printf("%s%d: Error getting device info\n", __FILE__, __LINE__); + return -1; + } + producer_data.dev_id = dev_id; + producer_data.num_nic_ports = num_ports; + producer_data.qid = QUEUE_0_ID; + producer_data.port_id = 0; + + for (i = 0; i < NUM_WORKERS; i++) { + worker_data[i].dev_id = dev_id; + worker_data[i].port_id = i+1; + worker_data[i].thread_id = i; + } + + consumer_data.dev_id = dev_id; + consumer_data.port_id = 9; + + init_ports(num_ports); + + this_lcore = rte_lcore_id(); + + RTE_LCORE_FOREACH_SLAVE(lcore_id) { + + if ((rx_th == 0) && (this_lcore != lcore_id)) { + err = rte_eal_remote_launch(rx_thread, NULL, lcore_id); + printf("Start rx thread\n"); + if (err) { + printf("Failed to launch rx on core %d\n", + lcore_id); + } else { + rx_th = 1; + continue; + } + } + + if ((worker_nb < NUM_WORKERS) && (this_lcore != lcore_id)) { + err = rte_eal_remote_launch(worker_thread, + &worker_data[worker_nb], + lcore_id); + if (err) { + printf("Failed to launch worker on core %d\n", + lcore_id); + } else { + worker_nb++; + continue; + } + } + + if ((tx_th == 0) && (this_lcore != lcore_id)) { + printf("Start tx thread\n"); + err = rte_eal_remote_launch(tx_thread, NULL, lcore_id); + if (err) { + printf("Failed to launch tx on core %d\n", + lcore_id); + } else { + + tx_th = 1; + continue; + } + } + + if (strcmp(dev_info.driver_name, "event_opdl")) { + if ((sch_th == 0) && (this_lcore != lcore_id)) { + printf("Start SW scheduling thread\n"); + err = rte_eal_remote_launch(scheduler_thread, + &dev_id, + lcore_id); + if (err) { + printf("Failed to launch scheduler on core %d\n", + lcore_id); + } else { + + sch_th = 1; + continue; + } + } + } + } + + rte_eal_mp_wait_lcore(); + + print_statistics(stdout); + + rte_event_dev_dump(0, stdout); + + rte_event_dev_stop(dev_id); + + rte_event_dev_close(dev_id); + + return 0; +}