From patchwork Wed Nov 16 18:00:03 2016 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Van Haaren, Harry" X-Patchwork-Id: 17049 Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [IPv6:::1]) by dpdk.org (Postfix) with ESMTP id EE7C958CD; Wed, 16 Nov 2016 19:01:20 +0100 (CET) Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id 502C8567E for ; Wed, 16 Nov 2016 19:00:24 +0100 (CET) Received: from fmsmga001.fm.intel.com ([10.253.24.23]) by orsmga101.jf.intel.com with ESMTP; 16 Nov 2016 10:00:23 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos; i="5.31,649,1473145200"; d="scan'208"; a="1069396470" Received: from sie-lab-212-222.ir.intel.com (HELO silpixa00398672.ir.intel.com) ([10.237.212.222]) by fmsmga001.fm.intel.com with ESMTP; 16 Nov 2016 10:00:21 -0800 From: Harry van Haaren To: dev@dpdk.org Cc: Harry van Haaren , Gage Eads , Bruce Richardson Date: Wed, 16 Nov 2016 18:00:03 +0000 Message-Id: <1479319207-130646-4-git-send-email-harry.van.haaren@intel.com> X-Mailer: git-send-email 2.7.4 In-Reply-To: <1479319207-130646-1-git-send-email-harry.van.haaren@intel.com> References: <1479319207-130646-1-git-send-email-harry.van.haaren@intel.com> Subject: [dpdk-dev] [PATCH 3/7] event/sw: software eventdev implementation X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: patches and discussions about DPDK List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" This commit adds a software implementation of the eventdev API. The implementation here is intended to enable the community to use the eventdev API, and test if the API serves the purpose that it is designed to. It should be noted this is an RFC implementation, and hence there should be no performance expectations. Note that the code added here is based on a prototype implementation, and hence some cleanup is expected to be necessary. The main components of the implementation is three files: - sw_evdev.c Creation, configuration, etc - sw_evdev_worker.c Worker cores' enqueue (etc) functions - sw_evdev_scheduler.c Core pkt scheduling implementation This commit only adds the implementation, no existing DPDK files are modified. Signed-off-by: Gage Eads Signed-off-by: Bruce Richardson Signed-off-by: Harry van Haaren --- drivers/event/sw/Makefile | 59 +++ drivers/event/sw/event_ring.h | 142 ++++++ drivers/event/sw/iq_ring.h | 160 +++++++ drivers/event/sw/rte_pmd_evdev_sw_version.map | 3 + drivers/event/sw/sw_evdev.c | 619 ++++++++++++++++++++++++ drivers/event/sw/sw_evdev.h | 234 +++++++++ drivers/event/sw/sw_evdev_scheduler.c | 660 ++++++++++++++++++++++++++ drivers/event/sw/sw_evdev_worker.c | 218 +++++++++ 8 files changed, 2095 insertions(+) create mode 100644 drivers/event/sw/Makefile create mode 100644 drivers/event/sw/event_ring.h create mode 100644 drivers/event/sw/iq_ring.h create mode 100644 drivers/event/sw/rte_pmd_evdev_sw_version.map create mode 100644 drivers/event/sw/sw_evdev.c create mode 100644 drivers/event/sw/sw_evdev.h create mode 100644 drivers/event/sw/sw_evdev_scheduler.c create mode 100644 drivers/event/sw/sw_evdev_worker.c diff --git a/drivers/event/sw/Makefile b/drivers/event/sw/Makefile new file mode 100644 index 0000000..7fc4371 --- /dev/null +++ b/drivers/event/sw/Makefile @@ -0,0 +1,59 @@ +# BSD LICENSE +# +# Copyright(c) 2016 Intel Corporation. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Intel Corporation nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +include $(RTE_SDK)/mk/rte.vars.mk + + +# library name +LIB = librte_pmd_evdev_sw.a + +# build flags +CFLAGS += -O3 +CFLAGS += $(WERROR_FLAGS) + +# library version +LIBABIVER := 1 + +# versioning export map +EXPORT_MAP := rte_pmd_evdev_sw_version.map + +# library source files +SRCS-$(CONFIG_RTE_LIBRTE_PMD_EVDEV_SW) += sw_evdev.c +SRCS-$(CONFIG_RTE_LIBRTE_PMD_EVDEV_SW) += sw_evdev_worker.c +SRCS-$(CONFIG_RTE_LIBRTE_PMD_EVDEV_SW) += sw_evdev_scheduler.c + +# export include files +SYMLINK-y-include += + +# library dependencies +DEPDIRS-$(CONFIG_RTE_LIBRTE_PMD_EVDEV_SW) += lib/librte_eal +DEPDIRS-$(CONFIG_RTE_LIBRTE_PMD_EVDEV_SW) += lib/librte_eventdev + +include $(RTE_SDK)/mk/rte.lib.mk diff --git a/drivers/event/sw/event_ring.h b/drivers/event/sw/event_ring.h new file mode 100644 index 0000000..531fb68 --- /dev/null +++ b/drivers/event/sw/event_ring.h @@ -0,0 +1,142 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2016 Intel Corporation. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +#ifndef _EVENT_RING_ +#define _EVENT_RING_ + +#include +#include + +#include +#include +#include + +#define QE_RING_NAMESIZE 32 + +struct qe_ring { + char name[QE_RING_NAMESIZE] __rte_cache_aligned; + uint32_t ring_size; /* size of memory block allocated to the ring */ + uint32_t mask; /* mask for read/write values == ring_size -1 */ + uint32_t size; /* actual usable space in the ring */ + volatile uint32_t write_idx __rte_cache_aligned; + volatile uint32_t read_idx __rte_cache_aligned; + + struct rte_event ring[0] __rte_cache_aligned; +}; + +#ifndef force_inline +#define force_inline inline __attribute__((always_inline)) +#endif + +static inline struct qe_ring * __attribute__((cold)) +qe_ring_create(const char *name, unsigned int size, unsigned socket_id) +{ + struct qe_ring *retval; + const uint32_t ring_size = rte_align32pow2(size + 1); + size_t memsize = sizeof(*retval) + + (ring_size * sizeof(retval->ring[0])); + + retval = rte_zmalloc_socket(NULL, memsize, 0, socket_id); + if (retval == NULL) + goto end; + + snprintf(retval->name, sizeof(retval->name), "EVDEV_RG_%s", name); + retval->ring_size = ring_size; + retval->mask = ring_size - 1; + retval->size = size; +end: + return retval; +} + +static inline void +qe_ring_destroy(struct qe_ring *r) +{ + rte_free(r); +} + +static force_inline unsigned int +qe_ring_count(const struct qe_ring *r) +{ + return r->write_idx - r->read_idx; +} + +static force_inline unsigned int +qe_ring_free_count(const struct qe_ring *r) +{ + return r->size - qe_ring_count(r); +} + +static force_inline unsigned int +qe_ring_enqueue_burst(struct qe_ring *r, struct rte_event *qes, + unsigned int nb_qes, uint16_t *free_count) +{ + const uint32_t size = r->size; + const uint32_t mask = r->mask; + const uint32_t read = r->read_idx; + uint32_t write = r->write_idx; + const uint32_t space = read + size - write; + uint32_t i; + + if (space < nb_qes) + nb_qes = space; + + for (i = 0; i < nb_qes; i++, write++) + r->ring[write & mask] = qes[i]; + + r->write_idx = write; + + *free_count = space - nb_qes; + + return nb_qes; +} + +static force_inline unsigned int +qe_ring_dequeue_burst(struct qe_ring *r, struct rte_event *qes, + unsigned int nb_qes) +{ + const uint32_t mask = r->mask; + uint32_t read = r->read_idx; + const uint32_t write = r->write_idx; + const uint32_t items = write - read; + uint32_t i; + + if (items < nb_qes) + nb_qes = items; + + for (i = 0; i < nb_qes; i++, read++) + qes[i] = r->ring[read & mask]; + + r->read_idx += nb_qes; + + return nb_qes; +} + +#endif diff --git a/drivers/event/sw/iq_ring.h b/drivers/event/sw/iq_ring.h new file mode 100644 index 0000000..a870e59 --- /dev/null +++ b/drivers/event/sw/iq_ring.h @@ -0,0 +1,160 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2016 Intel Corporation. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _IQ_RING_ +#define _IQ_RING_ + +#include +#include + +#include +#include +#include +#include + +#define IQ_RING_NAMESIZE 12 +#define QID_IQ_DEPTH 128 +#define QID_IQ_MASK (uint16_t)(QID_IQ_DEPTH - 1) + +struct iq_ring { + char name[IQ_RING_NAMESIZE] __rte_cache_aligned; + uint16_t write_idx; + uint16_t read_idx; + + struct rte_event ring[QID_IQ_DEPTH]; +}; + +#ifndef force_inline +#define force_inline inline __attribute__((always_inline)) +#endif + +static inline struct iq_ring * __attribute__((cold)) +iq_ring_create(const char *name, unsigned socket_id) +{ + struct iq_ring *retval; + + retval = rte_malloc_socket(NULL, sizeof(*retval), 0, socket_id); + if (retval == NULL) + goto end; + + snprintf(retval->name, sizeof(retval->name), "%s", name); + retval->write_idx = retval->read_idx = 0; +end: + return retval; +} + +static inline void +iq_ring_destroy(struct iq_ring *r) +{ + rte_free(r); +} + +static force_inline uint16_t +iq_ring_count(const struct iq_ring *r) +{ + return r->write_idx - r->read_idx; +} + +static force_inline uint16_t +iq_ring_free_count(const struct iq_ring *r) +{ + return QID_IQ_MASK - iq_ring_count(r); +} + +static force_inline uint16_t +iq_ring_enqueue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes) +{ + const uint16_t read = r->read_idx; + uint16_t write = r->write_idx; + const uint16_t space = read + QID_IQ_MASK - write; + uint16_t i; + + if (space < nb_qes) + nb_qes = space; + + for (i = 0; i < nb_qes; i++, write++) + r->ring[write & QID_IQ_MASK] = qes[i]; + + r->write_idx = write; + + return nb_qes; +} + +static force_inline uint16_t +iq_ring_dequeue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes) +{ + uint16_t read = r->read_idx; + const uint16_t write = r->write_idx; + const uint16_t items = write - read; + uint16_t i; + + for (i = 0; i < nb_qes; i++, read++) + qes[i] = r->ring[read & QID_IQ_MASK]; + + if (items < nb_qes) + nb_qes = items; + + r->read_idx += nb_qes; + + return nb_qes; +} + +static force_inline const struct rte_event * +iq_ring_peek(const struct iq_ring *r) +{ + return &r->ring[r->read_idx & QID_IQ_MASK]; +} + +static force_inline void +iq_ring_pop(struct iq_ring *r) +{ + r->read_idx++; +} + +static force_inline int +iq_ring_enqueue(struct iq_ring *r, const struct rte_event *qe) +{ + const uint16_t read = r->read_idx; + const uint16_t write = r->write_idx; + const uint16_t space = read + QID_IQ_MASK - write; + + if (space == 0) + return -1; + + r->ring[write & QID_IQ_MASK] = *qe; + + r->write_idx = write + 1; + + return 0; +} + +#endif diff --git a/drivers/event/sw/rte_pmd_evdev_sw_version.map b/drivers/event/sw/rte_pmd_evdev_sw_version.map new file mode 100644 index 0000000..1f84b68 --- /dev/null +++ b/drivers/event/sw/rte_pmd_evdev_sw_version.map @@ -0,0 +1,3 @@ +DPDK_17.02 { + local: *; +}; diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c new file mode 100644 index 0000000..4868122 --- /dev/null +++ b/drivers/event/sw/sw_evdev.c @@ -0,0 +1,619 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2016 Intel Corporation. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include + +#include +#include +#include +#include +#include + +#include "sw_evdev.h" +#include "iq_ring.h" + +#define NUMA_NODE_ARG "numa_node" + +static int +sw_dev_stats_get(const struct rte_event_dev *dev, + struct rte_event_dev_stats *stats) +{ + const struct sw_evdev *sw = (const void *)dev; + unsigned int i; + + if (dev == NULL || stats == NULL) + return -EINVAL; + + memset(stats, 0, sizeof(*stats)); + + stats->rx_pkts = sw->stats.rx_pkts; + stats->rx_dropped = sw->stats.rx_dropped; + stats->tx_pkts = sw->stats.tx_pkts; + + for (i = 0; i < sw->port_count; i++) { + stats->port_rx_pkts[i] = sw->ports[i].stats.rx_pkts; + stats->port_rx_dropped[i] = sw->ports[i].stats.rx_dropped; + stats->port_inflight[i] = sw->ports[i].inflights; + stats->port_tx_pkts[i] = sw->ports[i].stats.tx_pkts; + } + + for (i = 0; i < sw->qid_count; i++) { + stats->queue_rx_pkts[i] = sw->qids[i].stats.rx_pkts; + stats->queue_rx_dropped[i] = sw->qids[i].stats.rx_dropped; + stats->queue_tx_pkts[i] = sw->qids[i].stats.tx_pkts; + } + return 0; +} + +static int +sw_port_link(struct rte_event_dev *dev, uint8_t port_id, + struct rte_event_queue_link link[], int num) +{ + struct sw_evdev *sw = (void *)dev; + struct sw_port *p = &sw->ports[port_id]; + int i; + + if (link == NULL) { + /* TODO: map all queues */ + rte_errno = -EDQUOT; + return 0; + } + if (port_id > sw->port_count) { + rte_errno = -EINVAL; + return 0; + } + + for (i = 0; i < num; i++) { + struct sw_qid *q; + uint32_t qid = link[i].queue_id; + if (qid >= sw->qid_count) { + break; /* error - invalid QIDs */ + } + q = &sw->qids[qid]; + + /* check for qid map overflow */ + if (q->cq_num_mapped_cqs >= RTE_DIM(q->cq_map)) + break; + + if (p->is_directed && p->num_qids_mapped > 0) + break; + + if (q->type == RTE_SCHED_TYPE_DIRECT) { + /* check directed qids only map to one port */ + if (p->num_qids_mapped > 0) + break; + /* check port only takes a directed flow */ + if (num > 1) + break; + + p->is_directed = 1; + p->num_qids_mapped = 1; + } else if (q->type == RTE_SCHED_TYPE_ORDERED) { + p->num_ordered_qids++; + p->num_qids_mapped++; + } else if (q->type == RTE_SCHED_TYPE_ATOMIC) { + p->num_qids_mapped++; + } + + q->cq_map[q->cq_num_mapped_cqs++] = port_id; + } + return i; +} + +static void +sw_dump(FILE *f, const struct rte_event_dev *dev) +{ + static const char *q_type_strings[] = {"Ordered" , "Atomic", + "Parallel", "Directed" + }; + uint32_t i; + const struct sw_evdev *sw = (const void *)dev; + fprintf(f, "EventDev %s: ports %d, qids %d\n", sw->dev.name, + sw->port_count, sw->qid_count); + + fprintf(f, "\trx %"PRIu64"\n\tdrop %"PRIu64"\n\ttx %"PRIu64"\n", + sw->stats.rx_pkts, sw->stats.rx_dropped, sw->stats.tx_pkts); + fprintf(f, "\tsched calls: %"PRIu64"\n", sw->sched_called); + fprintf(f, "\tsched cq/qid call: %"PRIu64"\n", sw->sched_cq_qid_called); + fprintf(f, "\tsched no IQ enq: %"PRIu64"\n", sw->sched_no_iq_enqueues); + fprintf(f, "\tsched no CQ enq: %"PRIu64"\n", sw->sched_no_cq_enqueues); + fprintf(f, "\toverloads %"PRIu64"\t%s\n", sw->sched_overload_counter, + sw->overloaded ? " [OVERLOADED NOW]" : ""); + +#define COL_RED "\x1b[31m" +#define COL_RESET "\x1b[0m" + + for (i = 0; i < sw->port_count; i++) { + const struct sw_port *p = &sw->ports[i]; + fprintf(f, " Port %d %s %s\n", i, + p->is_directed ? " (SingleCons)" : "", + p->overloaded ? " ["COL_RED"OVERLOAD"COL_RESET"]" : ""); + fprintf(f, "\trx %"PRIu64"\n\tdrop %"PRIu64"\n\ttx %"PRIu64"\n" + "\tinf %d\n", sw->ports[i].stats.rx_pkts, + sw->ports[i].stats.rx_dropped, + sw->ports[i].stats.tx_pkts, sw->ports[i].inflights); + + uint64_t rx_used = qe_ring_count(p->rx_worker_ring); + uint64_t rx_free = qe_ring_free_count(p->rx_worker_ring); + const char *rxcol = (rx_free == 0) ? COL_RED : COL_RESET; + fprintf(f, "\t%srx ring used: %ld\tfree: %ld"COL_RESET"\n", + rxcol, rx_used, rx_free); + + uint64_t tx_used = qe_ring_count(p->cq_worker_ring); + uint64_t tx_free = qe_ring_free_count(p->cq_worker_ring); + const char *txcol = (tx_free == 0) ? COL_RED : COL_RESET; + fprintf(f, "\t%scq ring used: %ld\tfree: %ld"COL_RESET"\n", + txcol, tx_used, tx_free); + } + + for (i = 0; i < sw->qid_count; i++) { + fprintf(f, " Queue %d (%s)\n", i, q_type_strings[sw->qids[i].type]); + fprintf(f, "\trx %"PRIu64"\n\tdrop %"PRIu64"\n\ttx %"PRIu64"\n", + sw->qids[i].stats.rx_pkts, sw->qids[i].stats.rx_dropped, + sw->qids[i].stats.tx_pkts); + uint32_t iq; + for(iq = 0; iq < SW_IQS_MAX; iq++) { + uint32_t used = iq_ring_count(sw->qids[i].iq[iq]); + uint32_t free = iq_ring_free_count(sw->qids[i].iq[iq]); + const char *col = (free == 0) ? COL_RED : COL_RESET; + fprintf(f, "\t%siq %d: Used %d\tFree %d"COL_RESET"\n", + col, iq, used, free); + } + } +} + +static int +sw_port_setup(struct rte_event_dev *dev, uint8_t port_id, + const struct rte_event_port_conf *conf) +{ + struct sw_evdev *sw = (void *)dev; + struct sw_port *p = &sw->ports[port_id]; + char buf[QE_RING_NAMESIZE]; + unsigned i; + + if (conf->enqueue_queue_depth > + dev->info.max_event_port_enqueue_queue_depth || + conf->dequeue_queue_depth > + dev->info.max_event_port_dequeue_queue_depth){ + rte_errno = EINVAL; + return -1; + } + + *p = (struct sw_port){0}; /* zero entire structure */ + p->id = port_id; + + /* TODO: how do we work with an overload scheme here? + * For now, still use a huge buffer, with per-port thresholds. + * When it fills beyond the configured max size, we throttle. + */ + snprintf(buf, sizeof(buf), "%s_%s", dev->name, "rx_worker_ring"); + p->rx_worker_ring = qe_ring_create(buf, MAX_SW_PROD_Q_DEPTH, + dev->socket_id); + if (p->rx_worker_ring == NULL) + return -1; + + /* threshold is number of free spaces that are left in ring + * before overload should kick in. QE ring returns free_count, + * so storing this way makes more sense than actual depth + */ + uint32_t requested = MAX_SW_PROD_Q_DEPTH - conf->new_event_threshold; + p->overload_threshold = requested > 255 ? 255 : requested; + + snprintf(buf, sizeof(buf), "%s_%s", dev->name, "cq_worker_ring"); + p->cq_worker_ring = qe_ring_create(buf, conf->dequeue_queue_depth, + dev->socket_id); + if (p->cq_worker_ring == NULL) { + qe_ring_destroy(p->rx_worker_ring); + return -1; + } + sw->cq_ring_space[port_id] = conf->dequeue_queue_depth; + + /* set hist list contents to empty */ + for (i = 0; i < SW_PORT_HIST_LIST; i++) { + p->hist_list[i].fid = -1; + p->hist_list[i].qid = -1; + } + + return 0; +} + +static int +sw_port_cleanup(struct sw_evdev *sw, uint8_t port_id) +{ + struct sw_port *p = &sw->ports[port_id]; + + qe_ring_destroy(p->rx_worker_ring); + qe_ring_destroy(p->cq_worker_ring); + memset(p, 0, sizeof(*p)); + + return 0; +} + +static uint8_t +sw_port_count(struct rte_event_dev *dev) +{ + struct sw_evdev *sw = (void *)dev; + return sw->port_count; +} + + +static uint16_t +sw_queue_count(struct rte_event_dev *dev) +{ + struct sw_evdev *sw = (void *)dev; + return sw->qid_count; +} + +static int32_t +qid_cleanup(struct sw_evdev *sw, uint32_t idx) +{ + struct sw_qid *qid = &sw->qids[idx]; + uint32_t i; + + for (i = 0; i < SW_IQS_MAX; i++) { + iq_ring_destroy(qid->iq[i]); + } + + if (qid->type == RTE_SCHED_TYPE_ORDERED) { + rte_free(qid->reorder_buffer); + rte_ring_free(qid->reorder_buffer_freelist); + } + memset(qid, 0, sizeof(*qid)); + + return 0; +} + +static int32_t +qid_init(struct sw_evdev *sw, unsigned idx, int type, + const struct rte_event_queue_conf *queue_conf) +{ + int i; + int socket_id = sw->dev.socket_id; + char buf[IQ_RING_NAMESIZE]; + struct sw_qid *qid = &sw->qids[idx]; + + for (i = 0; i < SW_IQS_MAX; i++) { + snprintf(buf, sizeof(buf), "q_%u_iq_%d", idx, i); + qid->iq[i] = iq_ring_create(buf, socket_id); + if (!qid->iq[i]) { + SW_LOG_DBG("ring create failed"); + goto cleanup; + } + } + + /* Initialize the iq packet mask to 1, as __builtin_clz() is undefined + * if the value passed in is zero. + */ + qid->iq_pkt_mask = 1; + + /* Initialize the FID structures to no pinning (-1), and zero packets */ + struct sw_fid_t fid = {.cq = -1, .count = 0}; + for (i = 0; i < SW_QID_NUM_FIDS; i++) + qid->fids[i] = fid; + + qid->id = idx; + qid->type = type; + qid->priority = queue_conf->priority; + + if (qid->type == RTE_SCHED_TYPE_ORDERED) { + uint32_t window_size; + + /* rte_ring and window_size_mask require require window_size to + * be a power-of-2. + */ + window_size = rte_align32pow2( + queue_conf->nb_atomic_order_sequences); + + qid->window_size = window_size - 1; + + if (!window_size) { + SW_LOG_DBG("invalid reorder_window_size for ordered queue\n"); + goto cleanup; + } + + snprintf(buf, sizeof(buf), "%s_iq_%d_rob", sw->dev.name, i); + qid->reorder_buffer = rte_zmalloc_socket(buf, + window_size * sizeof(qid->reorder_buffer[0]), + 0, socket_id); + if (!qid->reorder_buffer) { + SW_LOG_DBG("reorder_buffer malloc failed\n"); + goto cleanup; + } + + memset(&qid->reorder_buffer[0], + 0, + window_size * sizeof(qid->reorder_buffer[0])); + + snprintf(buf, sizeof(buf), "%s_iq_%d_freelist", sw->dev.name, i); + qid->reorder_buffer_freelist = rte_ring_create(buf, + window_size, + socket_id, + RING_F_SP_ENQ | RING_F_SC_DEQ); + if (!qid->reorder_buffer_freelist) { + SW_LOG_DBG("freelist ring create failed"); + goto cleanup; + } + + /* Populate the freelist with reorder buffer entries. Enqueue + * 'window_size - 1' entries because the rte_ring holds only + * that many. + */ + for (i = 0; i < (int) window_size - 1; i++) { + if (rte_ring_sp_enqueue(qid->reorder_buffer_freelist, + &qid->reorder_buffer[i]) < 0) + goto cleanup; + } + + qid->reorder_buffer_index = 0; + qid->cq_next_tx = 0; + } + + return 0; + +cleanup: + for (i = 0; i < SW_IQS_MAX; i++) { + if (qid->iq[i]) + iq_ring_destroy(qid->iq[i]); + } + + if (qid->reorder_buffer) { + rte_free(qid->reorder_buffer); + qid->reorder_buffer = NULL; + } + + if (qid->reorder_buffer_freelist) { + rte_ring_free(qid->reorder_buffer_freelist); + qid->reorder_buffer_freelist = NULL; + } + + return -EINVAL; +} + +static int +sw_queue_setup(struct rte_event_dev *dev, + uint8_t queue_id, + const struct rte_event_queue_conf *conf) +{ + int type; + if (conf->nb_atomic_flows > 0 && + conf ->nb_atomic_order_sequences > 0) + return -1; + + if (conf->event_queue_cfg & RTE_EVENT_QUEUE_CFG_SINGLE_CONSUMER) + type = RTE_SCHED_TYPE_DIRECT; + else if (conf->nb_atomic_flows > 0) + type = RTE_SCHED_TYPE_ATOMIC; + else if (conf->nb_atomic_order_sequences > 0) + type = RTE_SCHED_TYPE_ORDERED; + else + type = RTE_SCHED_TYPE_PARALLEL; + + return qid_init((void *)dev, queue_id, type, conf); +} + +static int +sw_dev_configure(struct rte_event_dev *dev, + struct rte_event_dev_config *config) +{ + struct sw_evdev *se = (void *)dev; + + if (config->nb_event_queues > dev->info.max_event_queues || + config->nb_event_ports > dev->info.max_event_ports) + return -1; + + se->qid_count = config->nb_event_queues; + se->port_count = config->nb_event_ports; + return 0; +} + +static int +assign_numa_node(const char *key __rte_unused, const char *value, void *opaque) +{ + int *socket_id = opaque; + *socket_id = atoi(value); + if (*socket_id > RTE_MAX_NUMA_NODES) + return -1; + return 0; +} + +static inline void +swap_ptr(void *a, void *b) +{ + void *tmp = a; + a = b; + b= tmp; +} + +static int +sw_start(struct rte_event_dev *dev) +{ + unsigned int i, j; + struct sw_evdev *sw = (void *)dev; + /* check all ports are set up */ + for (i = 0; i < sw->port_count; i++) + if (sw->ports[i].rx_worker_ring == NULL) + return -1; + + /* check all queues are configured and mapped to ports*/ + for (i = 0; i < sw->qid_count; i++) + if (sw->qids[i].iq[0] == NULL || + sw->qids[i].cq_num_mapped_cqs == 0) + return -1; + + /* build up our prioritized array of qids */ + /* We don't use qsort here, as if all/multiple entries have the same + * priority, the result is non-deterministic. From "man 3 qsort": + * "If two members compare as equal, their order in the sorted + * array is undefined." + */ + for (i = 0; i < sw->qid_count; i++) { + sw->qids_prioritized[i] = &sw->qids[i]; + for (j = i; j > 0; j--) + if (sw->qids_prioritized[j]->priority < + sw->qids_prioritized[j-1]->priority) + swap_ptr(sw->qids_prioritized[j], + sw->qids_prioritized[j-1]); + } + sw->started = 1; + return 0; +} + +static void +sw_stop(struct rte_event_dev *dev) +{ + struct sw_evdev *sw = (void *)dev; + sw->started = 0; +} +static int +sw_close(struct rte_event_dev *dev) +{ + struct sw_evdev *sw = (void *)dev; + uint32_t i; + + for(i = 0; i < sw->qid_count; i++) { + qid_cleanup(sw, i); + } + sw->qid_count = 0; + + for (i = 0; i < sw->port_count; i++) { + sw_port_cleanup(sw, i); + } + sw->port_count = 0; + + memset(&sw->stats, 0, sizeof(sw->stats)); + + return 0; +} + +static int +sw_probe(const char *name, const char *params) +{ + static const struct rte_event_dev_ops evdev_sw_ops = { + .configure = sw_dev_configure, + .queue_setup = sw_queue_setup, + .queue_count = sw_queue_count, + .port_setup = sw_port_setup, + .port_link = sw_port_link, + .port_count = sw_port_count, + .start = sw_start, + .stop = sw_stop, + .close = sw_close, + .stats_get = sw_dev_stats_get, + .dump = sw_dump, + + .enqueue = sw_event_enqueue, + .enqueue_burst = sw_event_enqueue_burst, + .dequeue = sw_event_dequeue, + .dequeue_burst = sw_event_dequeue_burst, + .release = sw_event_release, + .schedule = sw_event_schedule, + }; + static const char *args[] = { NUMA_NODE_ARG, NULL }; + const struct rte_memzone *mz; + struct sw_evdev *se; + struct rte_event_dev_info evdev_sw_info = { + .driver_name = PMD_NAME, + .max_event_queues = SW_QIDS_MAX, + .max_event_queue_flows = SW_QID_NUM_FIDS, + .max_event_queue_priority_levels = SW_Q_PRIORITY_MAX, + .max_event_priority_levels = SW_IQS_MAX, + .max_event_ports = SW_PORTS_MAX, + .max_event_port_dequeue_queue_depth = MAX_SW_CONS_Q_DEPTH, + .max_event_port_enqueue_queue_depth = MAX_SW_PROD_Q_DEPTH, + /* for event limits, there is no hard limit, but it + * depends on number of Queues configured and depth of + * producer/consumer queues + */ + .max_num_events = -1, + .event_dev_cap = (RTE_EVENT_DEV_CAP_QUEUE_QOS | + RTE_EVENT_DEV_CAP_EVENT_QOS), + }; + int socket_id = 0; + + if (params != NULL && params[0] != '\0') { + struct rte_kvargs *kvlist = rte_kvargs_parse(params, args); + + if (!kvlist) { + RTE_LOG(INFO, PMD, + "Ignoring unsupported parameters when creating device '%s'\n", + name); + } else { + int ret = rte_kvargs_process(kvlist, NUMA_NODE_ARG, + assign_numa_node, &socket_id); + rte_kvargs_free(kvlist); + + if (ret != 0) { + RTE_LOG(ERR, PMD, + "%s: Error parsing numa node parameter", + name); + return ret; + } + } + } + + RTE_LOG(INFO, PMD, "Creating eventdev sw device %s, on numa node %d\n", + name, socket_id); + + mz = rte_memzone_reserve(name, sizeof(*se), socket_id, 0); + if (mz == NULL) + return -1; /* memzone_reserve sets rte_errno on error */ + + se = mz->addr; + se->mz = mz; + snprintf(se->dev.name, sizeof(se->dev.name), "%s", name); + se->dev.configured = false; + se->dev.info = evdev_sw_info; + se->dev.ops = &evdev_sw_ops; + se->dev.socket_id = socket_id; + + return rte_event_dev_register(&se->dev); +} + +static int +sw_remove(const char *name) +{ + if (name == NULL) + return -EINVAL; + + RTE_LOG(INFO, PMD, "Closing eventdev sw device %s\n", name); + /* TODO unregister eventdev and release memzone */ + + return 0; +} + +static struct rte_vdev_driver evdev_sw_pmd_drv = { + .probe = sw_probe, + .remove = sw_remove +}; + +RTE_PMD_REGISTER_VDEV(evdev_sw, evdev_sw_pmd_drv); +RTE_PMD_REGISTER_PARAM_STRING(evdev_sw,"numa_node="); diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h new file mode 100644 index 0000000..534e078 --- /dev/null +++ b/drivers/event/sw/sw_evdev.h @@ -0,0 +1,234 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2016 Intel Corporation. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _SW_EVDEV_H_ +#define _SW_EVDEV_H_ + +#include +#include +#include "event_ring.h" + +#define PMD_NAME "evdev_sw" + +#define SW_QIDS_MAX 128 +#define SW_QID_NUM_FIDS 16384 +#define SW_IQS_MAX 4 +#define SW_Q_PRIORITY_MAX 255 +#define SW_PORTS_MAX 128 +#define MAX_SW_CONS_Q_DEPTH 255 + +/* allow for lots of over-provisioning */ +#define MAX_SW_PROD_Q_DEPTH 4096 + +#define SW_FRAGMENTS_MAX 16 +#define PORT_DEQUEUE_BURST_SIZE 16 +#define SW_PORT_HIST_LIST (MAX_SW_PROD_Q_DEPTH + (MAX_SW_CONS_Q_DEPTH*2)) + +#define SW_PORT_OVERLOAD_THRES (512) + +#define RTE_SCHED_TYPE_DIRECT (RTE_SCHED_TYPE_PARALLEL + 1) + +#ifdef RTE_LIBRTE_PMD_EVDEV_SW_DEBUG +#define SW_LOG_INFO(fmt, args...) \ + RTE_LOG(INFO, PMD, "[%s] %s() line %u: " fmt "\n", \ + PMD_NAME, \ + __func__, __LINE__, ## args) + +#define SW_LOG_DBG(fmt, args...) \ + RTE_LOG(DEBUG, PMD, "[%s] %s() line %u: " fmt "\n", \ + PMD_NAME, \ + __func__, __LINE__, ## args) +#else +#define SW_LOG_INFO(fmt, args...) +#define SW_LOG_DBG(fmt, args...) +#endif + +enum { + QE_FLAG_VALID_SHIFT = 0, + QE_FLAG_COMPLETE_SHIFT, + QE_FLAG_NOT_EOP_SHIFT, + _QE_FLAG_COUNT +}; + +#define QE_FLAG_VALID (1 << QE_FLAG_VALID_SHIFT) /* set for NEW, FWD, FRAG */ +#define QE_FLAG_COMPLETE (1 << QE_FLAG_COMPLETE_SHIFT) /* set for FWD, DROP */ +#define QE_FLAG_NOT_EOP (1 << QE_FLAG_NOT_EOP_SHIFT) /* set for FRAG only */ + +static const uint8_t sw_qe_flag_map[] = { + QE_FLAG_VALID /* RTE_QEENT_OP_NEW */, + QE_FLAG_VALID | QE_FLAG_COMPLETE /* RTE_QEENT_OP_FWD */, + QE_FLAG_COMPLETE /* RTE_QEENT_OP_DROP */, + QE_FLAG_VALID | QE_FLAG_COMPLETE | QE_FLAG_NOT_EOP, +}; + +/* Records basic event stats at a given point. Used in port and qid structs */ +struct sw_point_stats { + uint64_t rx_pkts; + uint64_t rx_dropped; + uint64_t tx_pkts; +}; + +struct reorder_buffer_entry { + uint16_t num_fragments; /**< Number of packet fragments */ + uint16_t fragment_index; /**< Points to the oldest valid frag */ + uint8_t ready; /**< Entry is ready to be reordered */ + struct rte_event fragments[SW_FRAGMENTS_MAX]; +}; + +struct sw_hist_list_entry { + int32_t qid; + int32_t fid; + struct reorder_buffer_entry *rob_entry; +}; + +struct sw_port { + /* A numeric ID for the port. This should be used to access the + * statistics as returned by *rte_event_dev_stats_get*, and in other + * places where the API requires accessing a port by integer. It is not + * valid to assume that ports will be allocated in a linear sequence. + */ + uint8_t id; + + /** Indicates if this port is overloaded, and we need to throttle input */ + uint8_t overloaded; + uint8_t overload_threshold; + + int16_t is_directed; /** Takes from a single directed QID */ + int16_t num_ordered_qids; /** For loadbalanced we can optimise pulling + packets from producers if there is no reordering + involved */ + + /* track packets in and out of this port */ + struct sw_point_stats stats; + + /** Ring and buffer for pulling events from workers for scheduling */ + struct qe_ring *rx_worker_ring __rte_cache_aligned; + uint32_t pp_buf_start; + uint32_t pp_buf_count; + struct rte_event pp_buf[PORT_DEQUEUE_BURST_SIZE]; + + + /** Ring and buffer for pushing packets to workers after scheduling */ + struct qe_ring *cq_worker_ring __rte_cache_aligned; + uint16_t cq_buf_count; + uint16_t outstanding_releases; /* num releases yet to be completed */ + struct rte_event cq_buf[MAX_SW_CONS_Q_DEPTH]; + + /* History list structs, containing info on pkts egressed to worker */ + uint16_t hist_head __rte_cache_aligned; + uint16_t hist_tail; + uint16_t inflights; + struct sw_hist_list_entry hist_list[SW_PORT_HIST_LIST]; + + uint8_t num_qids_mapped; +}; + +struct sw_fid_t { + /* which CQ this FID is currently pinned to */ + uint32_t cq; + /* number of packets gone to the CQ with this FID */ + uint32_t count; +}; + +struct sw_qid { + /* The type of this QID */ + int type; + /* Integer ID representing the queue. This is used in history lists, + * to identify the stage of processing. */ + uint32_t id; + struct sw_point_stats stats; + + /* Internal priority rings for packets */ + struct iq_ring *iq[SW_IQS_MAX]; + uint32_t iq_pkt_mask; /* A mask to indicate packets in an IQ */ + uint64_t iq_pkt_count[SW_IQS_MAX]; + + /* Information on what CQs are polling this IQ */ + uint32_t cq_num_mapped_cqs; + uint32_t cq_next_tx; /* cq to write next (non-atomic) packet */ + uint32_t cq_map[SW_PORTS_MAX]; + + /* Track flow ids for atomic load balancing */ + struct sw_fid_t fids[SW_QID_NUM_FIDS]; + + /* Track packet order for reordering when needed */ + struct reorder_buffer_entry *reorder_buffer; /* packets awaiting reordering */ + struct rte_ring *reorder_buffer_freelist; /* available reorder slots */ + uint32_t reorder_buffer_index; /* oldest valid reorder buffer entry */ + uint32_t window_size; /* Used to wrap reorder_buffer_index */ + + uint8_t priority; +}; + +struct sw_evdev { + /* must be the first item in the private dev struct */ + struct rte_event_dev dev; + + const struct rte_memzone *mz; + + /* Contains all ports - load balanced and directed */ + struct sw_port ports[SW_PORTS_MAX]; + uint32_t port_count; + uint16_t cq_ring_space[SW_PORTS_MAX]; /* How many packets are in the cq */ + + /* All qids - allocated in one slab for vectorization */ + struct sw_qid qids[SW_QIDS_MAX]; + uint32_t qid_count; + + /* Array of pointers to load-balanced QIDs sorted by priority level */ + struct sw_qid *qids_prioritized[SW_QIDS_MAX]; + + /* Stats */ + struct sw_point_stats stats __rte_cache_aligned; + uint64_t sched_called; + uint64_t sched_no_iq_enqueues; + uint64_t sched_no_cq_enqueues; + uint64_t sched_cq_qid_called; + uint64_t sched_overload_counter; + + uint8_t started; + + uint32_t overloaded __rte_cache_aligned; +}; + +int sw_event_enqueue(struct rte_event_dev *dev, uint8_t port_id, + struct rte_event *ev, bool pin_event); +int sw_event_enqueue_burst(struct rte_event_dev *dev, uint8_t port_id, + struct rte_event ev[], int num, bool pin_event); +bool sw_event_dequeue(struct rte_event_dev *dev, uint8_t port_id, + struct rte_event *ev, uint64_t wait); +int sw_event_dequeue_burst(struct rte_event_dev *dev, uint8_t port_id, + struct rte_event *ev, int num, uint64_t wait); +void sw_event_release(struct rte_event_dev *dev, uint8_t port_id, uint8_t index); +int sw_event_schedule(struct rte_event_dev *dev); + +#endif /* _SW_EVDEV_H_ */ diff --git a/drivers/event/sw/sw_evdev_scheduler.c b/drivers/event/sw/sw_evdev_scheduler.c new file mode 100644 index 0000000..02831d2 --- /dev/null +++ b/drivers/event/sw/sw_evdev_scheduler.c @@ -0,0 +1,660 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2016 Intel Corporation. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include "sw_evdev.h" +#include "iq_ring.h" + +#define SW_IQS_MASK (SW_IQS_MAX-1) + +/* Retrieve the highest priority IQ or -1 if no pkts available. Doing the + * CLZ twice is faster than caching the value due to data dependencies + */ +#define PKT_MASK_TO_IQ(pkts) \ + (__builtin_ctz(pkts | (1 << SW_IQS_MAX))) + +/* Clamp the highest priorities to the max value as allowed by + * the mask. Assums MASK is (powerOfTwo - 1). Priority 0 (highest) are shifted + * into leftmost IQ so that clz() reads it first on dequeue + */ +#define PRIO_TO_IQ(prio) (prio > SW_IQS_MASK ? SW_IQS_MASK : prio) + +static inline uint32_t +sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, + uint32_t iq_num, unsigned int count) +{ + uint32_t i; + + if(count == 0) + return 0; + + /* This is the QID ID. The QID ID is static, hence it can be + * used to identify the stage of processing in history lists etc */ + uint32_t qid_id = qid->id; + + for (i = 0; i < count; i++) { + const struct rte_event *qe = iq_ring_peek(qid->iq[iq_num]); + struct sw_fid_t *fid = &qid->fids[qe->flow_id]; + int cq = fid->cq; + + /* If no CQ is assigned, pick one */ + if (cq < 0) { + /* select CQ based on least inflights, + * defaulting to the first mapped CQ + */ + uint32_t cq_idx = qid->cq_next_tx++; + if (qid->cq_next_tx == qid->cq_num_mapped_cqs) + qid->cq_next_tx = 0; + cq = qid->cq_map[cq_idx]; + int cq_free_cnt = sw->cq_ring_space[cq]; + + for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs; cq_idx++) { + int test_cq = qid->cq_map[cq_idx]; + int test_cq_free = sw->cq_ring_space[test_cq]; + + if (test_cq_free > cq_free_cnt) + cq = test_cq, cq_free_cnt = test_cq_free; + } + } + + struct sw_port *p = &sw->ports[cq]; + + /* If the destination CQ or its history list is full, move on + * to the next queue. + */ + if (sw->cq_ring_space[cq] == 0 || + p->inflights == SW_PORT_HIST_LIST) { + struct qe_ring *worker = sw->ports[cq].cq_worker_ring; + qe_ring_enqueue_burst(worker, sw->ports[cq].cq_buf, + sw->ports[cq].cq_buf_count, + &sw->cq_ring_space[cq]); + sw->ports[cq].cq_buf_count = 0; +#if 0 + printf("%s cq %d was 0, now %d\n", __func__, + cq, sw->cq_ring_space[cq]); +#endif + if(sw->cq_ring_space[cq] == 0) + break; + } + + sw->cq_ring_space[cq]--; + + /* store which CQ this FID is active on, + * for future pkts of the same flow + */ + fid->cq = cq; + fid->count++; + + qid->stats.tx_pkts++; + sw->ports[cq].inflights++; + + int head = (p->hist_head & (SW_PORT_HIST_LIST-1)); + + p->hist_list[head].fid = qe->flow_id; + p->hist_list[head].qid = qid_id; + + p->hist_head++; + p->stats.tx_pkts++; + sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe; + iq_ring_pop(qid->iq[iq_num]); + } + return i; +} + +static inline uint32_t +sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, + uint32_t iq_num, unsigned int count, int keep_order) +{ + uint32_t i; + uint32_t cq_idx = qid->cq_next_tx; + + /* This is the QID ID. The QID ID is static, hence it can be + * used to identify the stage of processing in history lists etc */ + uint32_t qid_id = qid->id; + + + if (keep_order) + /* only schedule as many as we have reorder buffer entries */ + count = RTE_MIN(count, rte_ring_count(qid->reorder_buffer_freelist)); + + for (i = 0; i < count; i++) { + const struct rte_event *qe = iq_ring_peek(qid->iq[iq_num]); + uint32_t cq_check_count = 0; + uint32_t cq; + + /* + * for parallel, just send to next available CQ in round-robin + * fashion. So scan for an available CQ. If all CQs are full + * just return and move on to next QID + */ + do { + if (++cq_check_count > qid->cq_num_mapped_cqs) + goto exit; + cq = qid->cq_map[cq_idx]; + if (++cq_idx == qid->cq_num_mapped_cqs) + cq_idx = 0; + } while (qe_ring_free_count(sw->ports[cq].cq_worker_ring) == 0 || + sw->ports[cq].inflights == SW_PORT_HIST_LIST); + + struct sw_port *p = &sw->ports[cq]; + if (sw->cq_ring_space[cq] == 0 || + p->inflights == SW_PORT_HIST_LIST) + break; + + sw->cq_ring_space[cq]--; + + qid->stats.tx_pkts++; + + const int head = (p->hist_head & (SW_PORT_HIST_LIST-1)); + + p->hist_list[head].fid = qe->flow_id; + p->hist_list[head].qid = qid_id; + + if (keep_order) + rte_ring_sc_dequeue(qid->reorder_buffer_freelist, + (void *)&p->hist_list[head].rob_entry); + + sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe; + iq_ring_pop(qid->iq[iq_num]); + + rte_compiler_barrier(); + p->inflights++; + p->stats.tx_pkts++; + p->hist_head++; + } +exit: + qid->cq_next_tx = cq_idx; + return i; +} + +static uint32_t +sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, + uint32_t iq_num, unsigned int count) +{ + uint32_t cq_id = qid->cq_map[0]; + struct sw_port *port = &sw->ports[cq_id]; + + /* get max burst enq size for cq_ring */ + uint32_t count_free = sw->cq_ring_space[cq_id]; + if (count == 0 || count_free == 0) + return 0; + + /* burst dequeue from the QID IQ ring */ + struct iq_ring *ring = qid->iq[iq_num]; + uint32_t ret = iq_ring_dequeue_burst(ring, + &port->cq_buf[port->cq_buf_count], count_free); + port->cq_buf_count += ret; + + /* Update QID, Port and Total TX stats */ + qid->stats.tx_pkts += ret; + port->stats.tx_pkts += ret; + + /* Subtract credits from cached value */ + sw->cq_ring_space[cq_id] -= ret; + + return ret; +} + +static uint32_t +sw_schedule_qid_to_cq(struct sw_evdev *sw) +{ + uint32_t pkts = 0; + uint32_t qid_idx; + + sw->sched_cq_qid_called++; + + for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) { + /* make the QID lookup here be based on priority of the QID */ + struct sw_qid *qid = sw->qids_prioritized[qid_idx]; + + int type = qid->type; + int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask); + + /* zero mapped CQs indicates directed */ + if (iq_num >= SW_IQS_MAX) + continue; + + unsigned int count = iq_ring_count(qid->iq[iq_num]); + uint32_t pkts_done = 0; + + if (type == RTE_SCHED_TYPE_DIRECT) + pkts_done += sw_schedule_dir_to_cq(sw, qid, + iq_num, count); + else if (type == RTE_SCHED_TYPE_ATOMIC) + pkts_done += sw_schedule_atomic_to_cq(sw, qid, + iq_num, count); + else + pkts_done += sw_schedule_parallel_to_cq(sw, qid, + iq_num, count, + (type == RTE_SCHED_TYPE_ORDERED)); + + /* Check if the IQ that was polled is now empty, and unset it + * in the IQ mask if its empty. + */ + int all_done = (pkts_done == count); + + qid->iq_pkt_mask &= ~(all_done << (iq_num)); + pkts += pkts_done; + } + + return pkts; +} + +/* This function will perform re-ordering of packets, and injecting into + * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT* + * contiguous in that array, this function accepts a "range" of QIDs to scan. + */ +static uint16_t +sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end) +{ + /* Perform egress reordering */ + struct rte_event *qe; + uint32_t pkts_iter = 0; + + for (; qid_start < qid_end; qid_start++) { + struct sw_qid *qid = &sw->qids[qid_start]; + int i, num_entries_in_use; + + if (qid->type != RTE_SCHED_TYPE_ORDERED) + continue; + + num_entries_in_use = rte_ring_free_count( + qid->reorder_buffer_freelist); + + for (i = 0; i < num_entries_in_use; i++) { + struct reorder_buffer_entry *entry; + int j; + + entry = &qid->reorder_buffer[qid->reorder_buffer_index]; + + if (!entry->ready) + break; + + for (j = 0; j < entry->num_fragments; j++) { + uint16_t dest_qid; + uint16_t dest_iq; + + qe = &entry->fragments[entry->fragment_index + j]; + + dest_qid = qe->flow_id; + dest_iq = PRIO_TO_IQ(qe->priority); + + if(dest_qid >= sw->qid_count) { + sw->stats.rx_dropped++; + continue; + } + + struct sw_qid *dest_qid_ptr = &sw->qids[dest_qid]; + const struct iq_ring *dest_iq_ptr = dest_qid_ptr->iq[dest_iq]; + if (iq_ring_free_count(dest_iq_ptr) == 0) + break; + + pkts_iter++; + + struct sw_qid *q = &sw->qids[dest_qid]; + struct iq_ring *r = q->iq[dest_iq]; + + /* we checked for space above, so enqueue must + * succeed + */ + iq_ring_enqueue(r, qe); + q->iq_pkt_mask |= (1 << (dest_iq)); + q->iq_pkt_count[dest_iq]++; + q->stats.rx_pkts++; + } + + entry->ready = (j != entry->num_fragments); + entry->num_fragments -= j; + entry->fragment_index += j; + + if (!entry->ready) { + entry->fragment_index = 0; + + rte_ring_sp_enqueue(qid->reorder_buffer_freelist, + entry); + + qid->reorder_buffer_index++; + qid->reorder_buffer_index %= qid->window_size; + } + } + } + return pkts_iter; +} + +static uint32_t +sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id) +{ + uint32_t pkts_iter = 0; + struct sw_port *port = &sw->ports[port_id]; + struct qe_ring *worker = port->rx_worker_ring; + + /* If shadow ring has 0 pkts, pull from worker ring */ + if(port->pp_buf_count == 0) { + port->pp_buf_start = 0; + port->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf, + RTE_DIM(port->pp_buf)); + + if (port->overloaded && + qe_ring_count(worker) < SW_PORT_OVERLOAD_THRES/2) { + port->overloaded = 0; + sw->sched_overload_counter++; + rte_atomic32_dec((void *)&sw->overloaded); + } + } + + while (port->pp_buf_count) { + const struct rte_event *qe = &port->pp_buf[port->pp_buf_start]; + struct sw_hist_list_entry *hist_entry = NULL; + uint8_t flags = qe->operation; + const uint16_t eop = !(flags & QE_FLAG_NOT_EOP); + int needs_reorder = 0; + + static const struct reorder_buffer_entry dummy_rob; + + /* + * if we don't have space for this packet in an IQ, + * then move on to next queue. Technically, for a + * packet that needs reordering, we don't need to check + * here, but it simplifies things not to special-case + */ + uint32_t iq_num = PRIO_TO_IQ(qe->priority); + struct sw_qid *qid = &sw->qids[qe->queue_id]; + struct iq_ring *iq_ring = qid->iq[iq_num]; + + if ((flags & QE_FLAG_VALID) && + iq_ring_free_count(iq_ring) == 0) + break; + + /* now process based on flags. Note that for directed + * queues, the enqueue_flush masks off all but the + * valid flag. This makes FWD and partial enqueues just + * NEW type, and makes DROPS no-op calls. + */ + if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) { + const uint32_t hist_tail = port->hist_tail & + (SW_PORT_HIST_LIST - 1); + + hist_entry = &port->hist_list[hist_tail]; + const uint32_t hist_qid = hist_entry->qid; + const uint32_t hist_fid = hist_entry->fid; + + struct sw_fid_t *fid = &sw->qids[hist_qid].fids[hist_fid]; + fid->count -= eop; + if (fid->count == 0) + fid->cq = -1; + + /* set reorder ready if an ordered QID */ + uintptr_t rob_ptr = (uintptr_t)hist_entry->rob_entry; + const uintptr_t valid = (rob_ptr != 0); + needs_reorder = valid; + rob_ptr |= ((valid - 1) & (uintptr_t)&dummy_rob); + ((struct reorder_buffer_entry*)rob_ptr)->ready = + eop * needs_reorder; + + port->inflights -= eop; + port->hist_tail += eop; + } + if (flags & QE_FLAG_VALID) { + port->stats.rx_pkts++; + + if (needs_reorder) { + struct reorder_buffer_entry *rob_entry = + hist_entry->rob_entry; + + //TODO: How do we alert the user that they've exceeded max frags? + if (rob_entry->num_fragments == SW_FRAGMENTS_MAX) + sw->stats.rx_dropped++; + else + rob_entry->fragments[rob_entry->num_fragments++] = *qe; + goto end_qe; + } + + /* Use the iq_num from above to push the QE + * into the qid at the right priority + */ + + qid->iq_pkt_mask |= (1 << (iq_num)); + iq_ring_enqueue(iq_ring, qe); + qid->iq_pkt_count[iq_num]++; + qid->stats.rx_pkts++; + pkts_iter++; + } + + end_qe: + port->pp_buf_start++; + port->pp_buf_count--; + } /* while (avail_qes) */ + + return pkts_iter; +} + +static uint32_t +sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id) +{ + uint32_t pkts_iter = 0; + struct sw_port *port = &sw->ports[port_id]; + struct qe_ring *worker = port->rx_worker_ring; + + /* If shadow ring has 0 pkts, pull from worker ring */ + if (port->pp_buf_count == 0) { + port->pp_buf_start = 0; + port->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf, + RTE_DIM(port->pp_buf)); + + if (port->overloaded && + qe_ring_count(worker) < SW_PORT_OVERLOAD_THRES/2) { + port->overloaded = 0; + sw->sched_overload_counter++; + rte_atomic32_dec((void *)&sw->overloaded); + } + } + + while (port->pp_buf_count) { + const struct rte_event *qe = &port->pp_buf[port->pp_buf_start]; + uint8_t flags = qe->operation; + + if ((flags & QE_FLAG_VALID) == 0) + goto end_qe; + + uint32_t iq_num = PRIO_TO_IQ(qe->priority); + struct sw_qid *qid = &sw->qids[qe->queue_id]; + struct iq_ring *iq_ring = qid->iq[iq_num]; + + if (iq_ring_free_count(iq_ring) == 0) + break; /* move to next port */ + + port->stats.rx_pkts++; + + /* Use the iq_num from above to push the QE + * into the qid at the right priority + */ + qid->iq_pkt_mask |= (1 << (iq_num)); + iq_ring_enqueue(iq_ring, qe); + qid->iq_pkt_count[iq_num]++; + qid->stats.rx_pkts++; + pkts_iter++; + + end_qe: + port->pp_buf_start++; + port->pp_buf_count--; + } /* while port->pp_buf_count */ + + return pkts_iter; +} + +static uint32_t +sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id) +{ + uint32_t pkts_iter = 0; + struct sw_port *port = &sw->ports[port_id]; + struct qe_ring *worker = port->rx_worker_ring; + + if (port->pp_buf_count == 0) { + port->pp_buf_start = 0; + port->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf, + RTE_DIM(port->pp_buf)); + + if (port->overloaded && + qe_ring_count(worker) < SW_PORT_OVERLOAD_THRES/2) { + port->overloaded = 0; + sw->sched_overload_counter++; + rte_atomic32_dec((void *)&sw->overloaded); + } + } + + while (port->pp_buf_count) { + const struct rte_event *ev = &port->pp_buf[port->pp_buf_start]; + struct sw_hist_list_entry *hist_entry = NULL; + uint8_t flags = ev->operation; + + /* for fragments, ignore completion + * NOTE: if not_eop flag is set, completion flag must + * also be set so we can use xor */ + flags ^= !(flags & QE_FLAG_NOT_EOP) >> + (QE_FLAG_NOT_EOP_SHIFT - QE_FLAG_COMPLETE_SHIFT); + + /* + * if we don't have space for this packet in an IQ, + * then move on to next queue. + */ + uint32_t iq_num = PRIO_TO_IQ(ev->priority); + struct sw_qid *qid = &sw->qids[ev->queue_id]; + struct iq_ring *iq_ring = qid->iq[iq_num]; + + if ((flags & QE_FLAG_VALID) && + iq_ring_free_count(iq_ring) == 0) + break; + + /* now process based on flags. Note that for directed + * queues, the enqueue_flush masks off all but the + * valid flag. This makes FWD and partial enqueues just + * NEW type, and makes DROPS no-op calls. + */ + if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) { + const uint32_t hist_tail = port->hist_tail & + (SW_PORT_HIST_LIST - 1); + + hist_entry = &port->hist_list[hist_tail]; + const uint32_t hist_qid = hist_entry->qid; + const uint32_t hist_fid = hist_entry->fid; + + struct sw_fid_t *fid = &sw->qids[hist_qid].fids[hist_fid]; + fid->count--; + if (fid->count == 0) + fid->cq = -1; + + port->inflights --; + port->hist_tail ++; + } + if (flags & QE_FLAG_VALID) { + port->stats.rx_pkts++; + + /* Use the iq_num from above to push the QE + * into the qid at the right priority + */ + + qid->iq_pkt_mask |= (1 << (iq_num)); + iq_ring_enqueue(iq_ring, ev); + qid->iq_pkt_count[iq_num]++; + qid->stats.rx_pkts++; + pkts_iter++; + } + + port->pp_buf_start++; + port->pp_buf_count--; + } /* while (avail_qes) */ + + return pkts_iter; +} + +int +sw_event_schedule(struct rte_event_dev *dev) +{ + static const uint32_t num_pkts = 256; + struct sw_evdev *sw = (struct sw_evdev *)dev; + uint32_t in_pkts, out_pkts; + uint32_t out_pkts_total = 0, in_pkts_total = 0; + uint32_t i; + + sw->sched_called++; + if (!sw->started) + return -1; + + do { + uint32_t in_pkts_this_iteration = 0; + + /* Pull from rx_ring for ports */ + do { + in_pkts = 0; + for (i = 0; i < sw->port_count; i++) + /* TODO: use a function pointer in the port itself */ + if (sw->ports[i].is_directed) + in_pkts += sw_schedule_pull_port_dir(sw, i); + else if (sw->ports[i].num_ordered_qids > 0) + in_pkts += sw_schedule_pull_port_lb(sw, i); + else + in_pkts += sw_schedule_pull_port_no_reorder(sw, i); + + /* QID scan for re-ordered */ + in_pkts += sw_schedule_reorder(sw, 0, + sw->qid_count); + in_pkts_this_iteration += in_pkts; + } while (in_pkts > 0 && in_pkts_this_iteration < num_pkts); + + out_pkts = 0; + out_pkts += sw_schedule_qid_to_cq(sw); + out_pkts_total += out_pkts; + in_pkts_total += in_pkts_this_iteration; + + if (in_pkts == 0 && out_pkts == 0) + break; + } while (out_pkts_total < num_pkts); + + /* push all the internal buffered QEs in port->cq_ring to the + * worker cores: aka, do the ring transfers batched. + */ + for(i = 0; i < sw->port_count; i++) { + struct qe_ring *worker = sw->ports[i].cq_worker_ring; + qe_ring_enqueue_burst(worker, sw->ports[i].cq_buf, + sw->ports[i].cq_buf_count, + &sw->cq_ring_space[i]); + sw->ports[i].cq_buf_count = 0; + } + + sw->stats.tx_pkts += out_pkts_total; + sw->stats.rx_pkts += in_pkts_total; + + sw->sched_no_iq_enqueues += (in_pkts_total == 0); + sw->sched_no_cq_enqueues += (out_pkts_total == 0); + + return out_pkts_total; +} diff --git a/drivers/event/sw/sw_evdev_worker.c b/drivers/event/sw/sw_evdev_worker.c new file mode 100644 index 0000000..1b055cc --- /dev/null +++ b/drivers/event/sw/sw_evdev_worker.c @@ -0,0 +1,218 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2016 Intel Corporation. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "sw_evdev.h" + +#include +#include + +#define FLOWID_MASK (SW_QID_NUM_FIDS-1) + +static inline void +sw_overload_check_and_set(struct sw_evdev *sw, struct sw_port *p, + uint16_t free_count) +{ + if (!p->overloaded && + free_count < MAX_SW_PROD_Q_DEPTH - p->overload_threshold) { + p->overloaded = 1; + rte_atomic32_inc((void *)&sw->overloaded); + } +} + +int +sw_event_enqueue(struct rte_event_dev *dev, uint8_t port_id, struct rte_event *ev, + bool pin_event) +{ + RTE_SET_USED(pin_event); + uint16_t free_count; + struct sw_evdev *sw = (void *)dev; + + if(port_id >= sw->port_count) + return -1; + + struct sw_port *p = &sw->ports[port_id]; + /* TODO: Concider optimization: keep port overloaded in flat array in + * sw instance, do a lookup and just one return branch together with + * port_id check above */ + if(sw->overloaded && ev->operation == RTE_EVENT_OP_NEW) + return -ENOSPC; + + ev->operation = sw_qe_flag_map[ev->operation]; + const uint8_t invalid_qid = (ev[0].queue_id >= sw->qid_count); + ev[0].operation &= ~(invalid_qid << QE_FLAG_VALID_SHIFT); + /* mask flowID to valid range after a crc to jumble bits */ + ev[0].flow_id = FLOWID_MASK & rte_hash_crc_4byte(ev[0].flow_id, -1); + + if(invalid_qid) { + p->stats.rx_dropped++; + } + + unsigned int num_enq = qe_ring_enqueue_burst(p->rx_worker_ring, + ev, 1, &free_count); + + sw_overload_check_and_set(sw, p, free_count); + + /* TODO: Discuss on ML and fix this inconsistency in API: + * num_enq is the number of packet enqueued, so + * 0 = no packets + * 1 = got a packet + * This is different to how currently documented in API. + */ + return num_enq; +} + +int +sw_event_enqueue_burst(struct rte_event_dev *dev, uint8_t port_id, + struct rte_event ev[], int num, bool pin_event) +{ + /* TODO: change enqueue API to uint32_t for num? */ + int32_t i; + uint16_t free_count; + struct sw_evdev *sw = (void *)dev; + + if(port_id >= sw->port_count) + return 0; + + struct sw_port *p = &sw->ports[port_id]; + RTE_SET_USED(pin_event); + + for (i = 0; i < num; i++) { + /* optimize to two loops, with and without overload */ + if(sw->overloaded && ev[i].operation == RTE_EVENT_OP_NEW) + return -ENOSPC; + + ev[i].operation = sw_qe_flag_map[ev[i].operation]; + const uint8_t invalid_qid = (ev[i].queue_id >= sw->qid_count); + ev[i].operation &= ~(invalid_qid << QE_FLAG_VALID_SHIFT); + ev[i].flow_id = FLOWID_MASK & rte_hash_crc_4byte(ev[i].flow_id, -1); + + if(invalid_qid) { + p->stats.rx_dropped++; + } + } + + /* returns number of events actually enqueued */ + uint32_t deq = qe_ring_enqueue_burst(p->rx_worker_ring, ev, num, + &free_count); + sw_overload_check_and_set(sw, p, free_count); + return deq; +} + +bool +sw_event_dequeue(struct rte_event_dev *dev, uint8_t port_id, + struct rte_event *ev, uint64_t wait) +{ + RTE_SET_USED(wait); + struct sw_evdev *sw = (void *)dev; + + if(port_id >= sw->port_count) + return 0; + + struct sw_port *p = &sw->ports[port_id]; + struct qe_ring *ring = p->cq_worker_ring; + + /* check that all previous dequeus have been released */ + uint16_t out_rels = p->outstanding_releases; + uint16_t i; + for(i = 0; i < out_rels; i++) { + sw_event_release(dev, port_id, i); + } + + /* Intel modification: may not be in final API */ + if(ev == 0) + return 0; + + /* returns number of events actually dequeued, after storing */ + uint32_t ndeq = qe_ring_dequeue_burst(ring, ev, 1); + p->outstanding_releases = ndeq; + return ndeq; +} + +int +sw_event_dequeue_burst(struct rte_event_dev *dev, uint8_t port_id, + struct rte_event *ev, int num, uint64_t wait) +{ + RTE_SET_USED(wait); + struct sw_evdev *sw = (void *)dev; + + if(port_id >= sw->port_count) + return 0; + + struct sw_port *p = &sw->ports[port_id]; + struct qe_ring *ring = p->cq_worker_ring; + + /* check that all previous dequeus have been released */ + if (!p->is_directed) { + uint16_t out_rels = p->outstanding_releases; + uint16_t i; + for(i = 0; i < out_rels; i++) { + sw_event_release(dev, port_id, i); + } + } + + /* Intel modification: may not be in final API */ + if(ev == 0) + return 0; + + /* returns number of events actually dequeued */ + uint32_t ndeq = qe_ring_dequeue_burst(ring, ev, num); + p->outstanding_releases = ndeq; + return ndeq; +} + +void +sw_event_release(struct rte_event_dev *dev, uint8_t port_id, uint8_t index) +{ + struct sw_evdev *sw = (void *)dev; + struct sw_port *p = &sw->ports[port_id]; + RTE_SET_USED(p); + RTE_SET_USED(index); + + /* This function "hints" the scheduler that packet *index* of the + * previous burst: + * (Atomic) has completed is critical section + * (Ordered) is ready for egress + * + * It is not mandatory to implement this functionality, but it may + * improve load-balancing / parallelism in the packet flows. + */ + + /* create drop message */ + struct rte_event ev = { + .operation = sw_qe_flag_map[RTE_EVENT_OP_DROP], + }; + + uint16_t free_count; + qe_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count); + + p->outstanding_releases--; +}