get:
Show a patch.

patch:
Update a patch.

put:
Update a patch.

GET /api/patches/17049/?format=api
HTTP 200 OK
Allow: GET, PUT, PATCH, HEAD, OPTIONS
Content-Type: application/json
Vary: Accept

{
    "id": 17049,
    "url": "https://patches.dpdk.org/api/patches/17049/?format=api",
    "web_url": "https://patches.dpdk.org/project/dpdk/patch/1479319207-130646-4-git-send-email-harry.van.haaren@intel.com/",
    "project": {
        "id": 1,
        "url": "https://patches.dpdk.org/api/projects/1/?format=api",
        "name": "DPDK",
        "link_name": "dpdk",
        "list_id": "dev.dpdk.org",
        "list_email": "dev@dpdk.org",
        "web_url": "http://core.dpdk.org",
        "scm_url": "git://dpdk.org/dpdk",
        "webscm_url": "http://git.dpdk.org/dpdk",
        "list_archive_url": "https://inbox.dpdk.org/dev",
        "list_archive_url_format": "https://inbox.dpdk.org/dev/{}",
        "commit_url_format": ""
    },
    "msgid": "<1479319207-130646-4-git-send-email-harry.van.haaren@intel.com>",
    "list_archive_url": "https://inbox.dpdk.org/dev/1479319207-130646-4-git-send-email-harry.van.haaren@intel.com",
    "date": "2016-11-16T18:00:03",
    "name": "[dpdk-dev,3/7] event/sw: software eventdev implementation",
    "commit_ref": null,
    "pull_url": null,
    "state": "superseded",
    "archived": true,
    "hash": "bad2ef10db8de2a2eb681415cf35898bb8fcfa15",
    "submitter": {
        "id": 317,
        "url": "https://patches.dpdk.org/api/people/317/?format=api",
        "name": "Van Haaren, Harry",
        "email": "harry.van.haaren@intel.com"
    },
    "delegate": null,
    "mbox": "https://patches.dpdk.org/project/dpdk/patch/1479319207-130646-4-git-send-email-harry.van.haaren@intel.com/mbox/",
    "series": [],
    "comments": "https://patches.dpdk.org/api/patches/17049/comments/",
    "check": "warning",
    "checks": "https://patches.dpdk.org/api/patches/17049/checks/",
    "tags": {},
    "related": [],
    "headers": {
        "Return-Path": "<dev-bounces@dpdk.org>",
        "X-Original-To": "patchwork@dpdk.org",
        "Delivered-To": "patchwork@dpdk.org",
        "Received": [
            "from [92.243.14.124] (localhost [IPv6:::1])\n\tby dpdk.org (Postfix) with ESMTP id EE7C958CD;\n\tWed, 16 Nov 2016 19:01:20 +0100 (CET)",
            "from mga02.intel.com (mga02.intel.com [134.134.136.20])\n\tby dpdk.org (Postfix) with ESMTP id 502C8567E\n\tfor <dev@dpdk.org>; Wed, 16 Nov 2016 19:00:24 +0100 (CET)",
            "from fmsmga001.fm.intel.com ([10.253.24.23])\n\tby orsmga101.jf.intel.com with ESMTP; 16 Nov 2016 10:00:23 -0800",
            "from sie-lab-212-222.ir.intel.com (HELO\n\tsilpixa00398672.ir.intel.com) ([10.237.212.222])\n\tby fmsmga001.fm.intel.com with ESMTP; 16 Nov 2016 10:00:21 -0800"
        ],
        "X-ExtLoop1": "1",
        "X-IronPort-AV": "E=Sophos; i=\"5.31,649,1473145200\"; d=\"scan'208\";\n\ta=\"1069396470\"",
        "From": "Harry van Haaren <harry.van.haaren@intel.com>",
        "To": "dev@dpdk.org",
        "Cc": "Harry van Haaren <harry.van.haaren@intel.com>,\n\tGage Eads <gage.eads@intel.com>,\n\tBruce Richardson <bruce.richardson@intel.com>",
        "Date": "Wed, 16 Nov 2016 18:00:03 +0000",
        "Message-Id": "<1479319207-130646-4-git-send-email-harry.van.haaren@intel.com>",
        "X-Mailer": "git-send-email 2.7.4",
        "In-Reply-To": "<1479319207-130646-1-git-send-email-harry.van.haaren@intel.com>",
        "References": "<1479319207-130646-1-git-send-email-harry.van.haaren@intel.com>",
        "Subject": "[dpdk-dev] [PATCH 3/7] event/sw: software eventdev implementation",
        "X-BeenThere": "dev@dpdk.org",
        "X-Mailman-Version": "2.1.15",
        "Precedence": "list",
        "List-Id": "patches and discussions about DPDK <dev.dpdk.org>",
        "List-Unsubscribe": "<http://dpdk.org/ml/options/dev>,\n\t<mailto:dev-request@dpdk.org?subject=unsubscribe>",
        "List-Archive": "<http://dpdk.org/ml/archives/dev/>",
        "List-Post": "<mailto:dev@dpdk.org>",
        "List-Help": "<mailto:dev-request@dpdk.org?subject=help>",
        "List-Subscribe": "<http://dpdk.org/ml/listinfo/dev>,\n\t<mailto:dev-request@dpdk.org?subject=subscribe>",
        "Errors-To": "dev-bounces@dpdk.org",
        "Sender": "\"dev\" <dev-bounces@dpdk.org>"
    },
    "content": "This commit adds a software implementation of the eventdev API. The\nimplementation here is intended to enable the community to use the eventdev\nAPI, and test if the API serves the purpose that it is designed to. It\nshould be noted this is an RFC implementation, and hence there should be no\nperformance expectations. Note that the code added here is based on a\nprototype implementation, and hence some cleanup is expected to be\nnecessary.\n\nThe main components of the implementation is three files:\n  - sw_evdev.c              Creation, configuration, etc\n  - sw_evdev_worker.c       Worker cores' enqueue (etc) functions\n  - sw_evdev_scheduler.c    Core pkt scheduling implementation\n\nThis commit only adds the implementation, no existing DPDK files are modified.\n\nSigned-off-by: Gage Eads <gage.eads@intel.com>\nSigned-off-by: Bruce Richardson <bruce.richardson@intel.com>\nSigned-off-by: Harry van Haaren <harry.van.haaren@intel.com>\n---\n drivers/event/sw/Makefile                     |  59 +++\n drivers/event/sw/event_ring.h                 | 142 ++++++\n drivers/event/sw/iq_ring.h                    | 160 +++++++\n drivers/event/sw/rte_pmd_evdev_sw_version.map |   3 +\n drivers/event/sw/sw_evdev.c                   | 619 ++++++++++++++++++++++++\n drivers/event/sw/sw_evdev.h                   | 234 +++++++++\n drivers/event/sw/sw_evdev_scheduler.c         | 660 ++++++++++++++++++++++++++\n drivers/event/sw/sw_evdev_worker.c            | 218 +++++++++\n 8 files changed, 2095 insertions(+)\n create mode 100644 drivers/event/sw/Makefile\n create mode 100644 drivers/event/sw/event_ring.h\n create mode 100644 drivers/event/sw/iq_ring.h\n create mode 100644 drivers/event/sw/rte_pmd_evdev_sw_version.map\n create mode 100644 drivers/event/sw/sw_evdev.c\n create mode 100644 drivers/event/sw/sw_evdev.h\n create mode 100644 drivers/event/sw/sw_evdev_scheduler.c\n create mode 100644 drivers/event/sw/sw_evdev_worker.c",
    "diff": "diff --git a/drivers/event/sw/Makefile b/drivers/event/sw/Makefile\nnew file mode 100644\nindex 0000000..7fc4371\n--- /dev/null\n+++ b/drivers/event/sw/Makefile\n@@ -0,0 +1,59 @@\n+#   BSD LICENSE\n+#\n+#   Copyright(c) 2016 Intel Corporation. All rights reserved.\n+#\n+#   Redistribution and use in source and binary forms, with or without\n+#   modification, are permitted provided that the following conditions\n+#   are met:\n+#\n+#     * Redistributions of source code must retain the above copyright\n+#       notice, this list of conditions and the following disclaimer.\n+#     * Redistributions in binary form must reproduce the above copyright\n+#       notice, this list of conditions and the following disclaimer in\n+#       the documentation and/or other materials provided with the\n+#       distribution.\n+#     * Neither the name of Intel Corporation nor the names of its\n+#       contributors may be used to endorse or promote products derived\n+#       from this software without specific prior written permission.\n+#\n+#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n+#   \"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n+#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n+#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n+#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n+#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n+#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n+#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n+#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n+#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n+#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n+\n+include $(RTE_SDK)/mk/rte.vars.mk\n+\n+\n+# library name\n+LIB = librte_pmd_evdev_sw.a\n+\n+# build flags\n+CFLAGS += -O3\n+CFLAGS += $(WERROR_FLAGS)\n+\n+# library version\n+LIBABIVER := 1\n+\n+# versioning export map\n+EXPORT_MAP := rte_pmd_evdev_sw_version.map\n+\n+# library source files\n+SRCS-$(CONFIG_RTE_LIBRTE_PMD_EVDEV_SW) += sw_evdev.c\n+SRCS-$(CONFIG_RTE_LIBRTE_PMD_EVDEV_SW) += sw_evdev_worker.c\n+SRCS-$(CONFIG_RTE_LIBRTE_PMD_EVDEV_SW) += sw_evdev_scheduler.c\n+\n+# export include files\n+SYMLINK-y-include +=\n+\n+# library dependencies\n+DEPDIRS-$(CONFIG_RTE_LIBRTE_PMD_EVDEV_SW) += lib/librte_eal\n+DEPDIRS-$(CONFIG_RTE_LIBRTE_PMD_EVDEV_SW) += lib/librte_eventdev\n+\n+include $(RTE_SDK)/mk/rte.lib.mk\ndiff --git a/drivers/event/sw/event_ring.h b/drivers/event/sw/event_ring.h\nnew file mode 100644\nindex 0000000..531fb68\n--- /dev/null\n+++ b/drivers/event/sw/event_ring.h\n@@ -0,0 +1,142 @@\n+/*-\n+ *   BSD LICENSE\n+ *\n+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.\n+ *\n+ *   Redistribution and use in source and binary forms, with or without\n+ *   modification, are permitted provided that the following conditions\n+ *   are met:\n+ *\n+ *     * Redistributions of source code must retain the above copyright\n+ *       notice, this list of conditions and the following disclaimer.\n+ *     * Redistributions in binary form must reproduce the above copyright\n+ *       notice, this list of conditions and the following disclaimer in\n+ *       the documentation and/or other materials provided with the\n+ *       distribution.\n+ *     * Neither the name of Intel Corporation nor the names of its\n+ *       contributors may be used to endorse or promote products derived\n+ *       from this software without specific prior written permission.\n+ *\n+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n+ *   \"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n+ */\n+#ifndef _EVENT_RING_\n+#define _EVENT_RING_\n+\n+#include <stdint.h>\n+#include <x86intrin.h>\n+\n+#include <rte_common.h>\n+#include <rte_memory.h>\n+#include <rte_malloc.h>\n+\n+#define QE_RING_NAMESIZE 32\n+\n+struct qe_ring {\n+\tchar name[QE_RING_NAMESIZE] __rte_cache_aligned;\n+\tuint32_t ring_size; /* size of memory block allocated to the ring */\n+\tuint32_t mask;      /* mask for read/write values == ring_size -1 */\n+\tuint32_t size;      /* actual usable space in the ring */\n+\tvolatile uint32_t write_idx __rte_cache_aligned;\n+\tvolatile uint32_t read_idx __rte_cache_aligned;\n+\n+\tstruct rte_event ring[0] __rte_cache_aligned;\n+};\n+\n+#ifndef force_inline\n+#define force_inline inline __attribute__((always_inline))\n+#endif\n+\n+static inline struct qe_ring * __attribute__((cold))\n+qe_ring_create(const char *name, unsigned int size, unsigned socket_id)\n+{\n+\tstruct qe_ring *retval;\n+\tconst uint32_t ring_size = rte_align32pow2(size + 1);\n+\tsize_t memsize = sizeof(*retval) +\n+\t\t\t(ring_size * sizeof(retval->ring[0]));\n+\n+\tretval = rte_zmalloc_socket(NULL, memsize, 0, socket_id);\n+\tif (retval == NULL)\n+\t\tgoto end;\n+\n+\tsnprintf(retval->name, sizeof(retval->name), \"EVDEV_RG_%s\", name);\n+\tretval->ring_size = ring_size;\n+\tretval->mask = ring_size - 1;\n+\tretval->size = size;\n+end:\n+\treturn retval;\n+}\n+\n+static inline void\n+qe_ring_destroy(struct qe_ring *r)\n+{\n+\trte_free(r);\n+}\n+\n+static force_inline unsigned int\n+qe_ring_count(const struct qe_ring *r)\n+{\n+\treturn r->write_idx - r->read_idx;\n+}\n+\n+static force_inline unsigned int\n+qe_ring_free_count(const struct qe_ring *r)\n+{\n+\treturn r->size - qe_ring_count(r);\n+}\n+\n+static force_inline unsigned int\n+qe_ring_enqueue_burst(struct qe_ring *r, struct rte_event *qes,\n+\t\tunsigned int nb_qes, uint16_t *free_count)\n+{\n+\tconst uint32_t size = r->size;\n+\tconst uint32_t mask = r->mask;\n+\tconst uint32_t read = r->read_idx;\n+\tuint32_t write = r->write_idx;\n+\tconst uint32_t space = read + size - write;\n+\tuint32_t i;\n+\n+\tif (space < nb_qes)\n+\t\tnb_qes = space;\n+\n+\tfor (i = 0; i < nb_qes; i++, write++)\n+\t\tr->ring[write & mask] = qes[i];\n+\n+\tr->write_idx = write;\n+\n+\t*free_count = space - nb_qes;\n+\n+\treturn nb_qes;\n+}\n+\n+static force_inline unsigned int\n+qe_ring_dequeue_burst(struct qe_ring *r, struct rte_event *qes,\n+\t\tunsigned int nb_qes)\n+{\n+\tconst uint32_t mask = r->mask;\n+\tuint32_t read = r->read_idx;\n+\tconst uint32_t write = r->write_idx;\n+\tconst uint32_t items = write - read;\n+\tuint32_t i;\n+\n+\tif (items < nb_qes)\n+\t\tnb_qes = items;\n+\n+\tfor (i = 0; i < nb_qes; i++, read++)\n+\t\tqes[i] = r->ring[read & mask];\n+\n+\tr->read_idx += nb_qes;\n+\n+\treturn nb_qes;\n+}\n+\n+#endif\ndiff --git a/drivers/event/sw/iq_ring.h b/drivers/event/sw/iq_ring.h\nnew file mode 100644\nindex 0000000..a870e59\n--- /dev/null\n+++ b/drivers/event/sw/iq_ring.h\n@@ -0,0 +1,160 @@\n+/*-\n+ *   BSD LICENSE\n+ *\n+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.\n+ *\n+ *   Redistribution and use in source and binary forms, with or without\n+ *   modification, are permitted provided that the following conditions\n+ *   are met:\n+ *\n+ *     * Redistributions of source code must retain the above copyright\n+ *       notice, this list of conditions and the following disclaimer.\n+ *     * Redistributions in binary form must reproduce the above copyright\n+ *       notice, this list of conditions and the following disclaimer in\n+ *       the documentation and/or other materials provided with the\n+ *       distribution.\n+ *     * Neither the name of Intel Corporation nor the names of its\n+ *       contributors may be used to endorse or promote products derived\n+ *       from this software without specific prior written permission.\n+ *\n+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n+ *   \"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n+ */\n+\n+#ifndef _IQ_RING_\n+#define _IQ_RING_\n+\n+#include <stdint.h>\n+#include <x86intrin.h>\n+\n+#include <rte_common.h>\n+#include <rte_memory.h>\n+#include <rte_malloc.h>\n+#include <rte_eventdev.h>\n+\n+#define IQ_RING_NAMESIZE 12\n+#define QID_IQ_DEPTH 128\n+#define QID_IQ_MASK (uint16_t)(QID_IQ_DEPTH - 1)\n+\n+struct iq_ring {\n+\tchar name[IQ_RING_NAMESIZE] __rte_cache_aligned;\n+\tuint16_t write_idx;\n+\tuint16_t read_idx;\n+\n+\tstruct rte_event ring[QID_IQ_DEPTH];\n+};\n+\n+#ifndef force_inline\n+#define force_inline inline __attribute__((always_inline))\n+#endif\n+\n+static inline struct iq_ring * __attribute__((cold))\n+iq_ring_create(const char *name, unsigned socket_id)\n+{\n+\tstruct iq_ring *retval;\n+\n+\tretval = rte_malloc_socket(NULL, sizeof(*retval), 0, socket_id);\n+\tif (retval == NULL)\n+\t\tgoto end;\n+\n+\tsnprintf(retval->name, sizeof(retval->name), \"%s\", name);\n+\tretval->write_idx = retval->read_idx = 0;\n+end:\n+\treturn retval;\n+}\n+\n+static inline void\n+iq_ring_destroy(struct iq_ring *r)\n+{\n+\trte_free(r);\n+}\n+\n+static force_inline uint16_t\n+iq_ring_count(const struct iq_ring *r)\n+{\n+\treturn r->write_idx - r->read_idx;\n+}\n+\n+static force_inline uint16_t\n+iq_ring_free_count(const struct iq_ring *r)\n+{\n+\treturn QID_IQ_MASK - iq_ring_count(r);\n+}\n+\n+static force_inline uint16_t\n+iq_ring_enqueue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)\n+{\n+\tconst uint16_t read = r->read_idx;\n+\tuint16_t write = r->write_idx;\n+\tconst uint16_t space = read + QID_IQ_MASK - write;\n+\tuint16_t i;\n+\n+\tif (space < nb_qes)\n+\t\tnb_qes = space;\n+\n+\tfor (i = 0; i < nb_qes; i++, write++)\n+\t\tr->ring[write & QID_IQ_MASK] = qes[i];\n+\n+\tr->write_idx = write;\n+\n+\treturn nb_qes;\n+}\n+\n+static force_inline uint16_t\n+iq_ring_dequeue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)\n+{\n+\tuint16_t read = r->read_idx;\n+\tconst uint16_t write = r->write_idx;\n+\tconst uint16_t items = write - read;\n+\tuint16_t i;\n+\n+\tfor (i = 0; i < nb_qes; i++, read++)\n+\t\tqes[i] = r->ring[read & QID_IQ_MASK];\n+\n+\tif (items < nb_qes)\n+\t\tnb_qes = items;\n+\n+\tr->read_idx += nb_qes;\n+\n+\treturn nb_qes;\n+}\n+\n+static force_inline const struct rte_event *\n+iq_ring_peek(const struct iq_ring *r)\n+{\n+\treturn &r->ring[r->read_idx & QID_IQ_MASK];\n+}\n+\n+static force_inline void\n+iq_ring_pop(struct iq_ring *r)\n+{\n+\tr->read_idx++;\n+}\n+\n+static force_inline int\n+iq_ring_enqueue(struct iq_ring *r, const struct rte_event *qe)\n+{\n+\tconst uint16_t read = r->read_idx;\n+\tconst uint16_t write = r->write_idx;\n+\tconst uint16_t space = read + QID_IQ_MASK - write;\n+\n+\tif (space == 0)\n+\t\treturn -1;\n+\n+\tr->ring[write & QID_IQ_MASK] = *qe;\n+\n+\tr->write_idx = write + 1;\n+\n+\treturn 0;\n+}\n+\n+#endif\ndiff --git a/drivers/event/sw/rte_pmd_evdev_sw_version.map b/drivers/event/sw/rte_pmd_evdev_sw_version.map\nnew file mode 100644\nindex 0000000..1f84b68\n--- /dev/null\n+++ b/drivers/event/sw/rte_pmd_evdev_sw_version.map\n@@ -0,0 +1,3 @@\n+DPDK_17.02 {\n+\tlocal: *;\n+};\ndiff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c\nnew file mode 100644\nindex 0000000..4868122\n--- /dev/null\n+++ b/drivers/event/sw/sw_evdev.c\n@@ -0,0 +1,619 @@\n+/*-\n+ *   BSD LICENSE\n+ *\n+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.\n+ *\n+ *   Redistribution and use in source and binary forms, with or without\n+ *   modification, are permitted provided that the following conditions\n+ *   are met:\n+ *\n+ *     * Redistributions of source code must retain the above copyright\n+ *       notice, this list of conditions and the following disclaimer.\n+ *     * Redistributions in binary form must reproduce the above copyright\n+ *       notice, this list of conditions and the following disclaimer in\n+ *       the documentation and/or other materials provided with the\n+ *       distribution.\n+ *     * Neither the name of Intel Corporation nor the names of its\n+ *       contributors may be used to endorse or promote products derived\n+ *       from this software without specific prior written permission.\n+ *\n+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n+ *   \"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n+ */\n+\n+#include <string.h>\n+\n+#include <rte_vdev.h>\n+#include <rte_memzone.h>\n+#include <rte_kvargs.h>\n+#include <rte_ring.h>\n+#include <rte_eventdev_pmd.h>\n+\n+#include \"sw_evdev.h\"\n+#include \"iq_ring.h\"\n+\n+#define NUMA_NODE_ARG \"numa_node\"\n+\n+static int\n+sw_dev_stats_get(const struct rte_event_dev *dev,\n+\t\tstruct rte_event_dev_stats *stats)\n+{\n+\tconst struct sw_evdev *sw = (const void *)dev;\n+\tunsigned int i;\n+\n+\tif (dev == NULL || stats == NULL)\n+\t\treturn -EINVAL;\n+\n+\tmemset(stats, 0, sizeof(*stats));\n+\n+\tstats->rx_pkts = sw->stats.rx_pkts;\n+\tstats->rx_dropped = sw->stats.rx_dropped;\n+\tstats->tx_pkts = sw->stats.tx_pkts;\n+\n+\tfor (i = 0; i < sw->port_count; i++) {\n+\t\tstats->port_rx_pkts[i] = sw->ports[i].stats.rx_pkts;\n+\t\tstats->port_rx_dropped[i] = sw->ports[i].stats.rx_dropped;\n+\t\tstats->port_inflight[i] = sw->ports[i].inflights;\n+\t\tstats->port_tx_pkts[i] = sw->ports[i].stats.tx_pkts;\n+\t}\n+\n+\tfor (i = 0; i < sw->qid_count; i++) {\n+\t\tstats->queue_rx_pkts[i] = sw->qids[i].stats.rx_pkts;\n+\t\tstats->queue_rx_dropped[i] = sw->qids[i].stats.rx_dropped;\n+\t\tstats->queue_tx_pkts[i] = sw->qids[i].stats.tx_pkts;\n+\t}\n+\treturn 0;\n+}\n+\n+static int\n+sw_port_link(struct rte_event_dev *dev, uint8_t port_id,\n+\t\tstruct rte_event_queue_link link[], int num)\n+{\n+\tstruct sw_evdev *sw = (void *)dev;\n+\tstruct sw_port *p = &sw->ports[port_id];\n+\tint i;\n+\n+\tif (link == NULL) {\n+\t\t/* TODO: map all queues */\n+\t\trte_errno = -EDQUOT;\n+\t\treturn 0;\n+\t}\n+\tif (port_id > sw->port_count) {\n+\t\trte_errno = -EINVAL;\n+\t\treturn 0;\n+\t}\n+\n+\tfor (i = 0; i < num; i++) {\n+\t\tstruct sw_qid *q;\n+\t\tuint32_t qid = link[i].queue_id;\n+\t\tif (qid >= sw->qid_count) {\n+\t\t\tbreak; /* error - invalid QIDs */\n+\t\t}\n+\t\tq = &sw->qids[qid];\n+\n+\t\t/* check for qid map overflow */\n+\t\tif (q->cq_num_mapped_cqs >= RTE_DIM(q->cq_map))\n+\t\t\tbreak;\n+\n+\t\tif (p->is_directed && p->num_qids_mapped > 0)\n+\t\t\tbreak;\n+\n+\t\tif (q->type == RTE_SCHED_TYPE_DIRECT) {\n+\t\t\t/* check directed qids only map to one port */\n+\t\t\tif (p->num_qids_mapped > 0)\n+\t\t\t\tbreak;\n+\t\t\t/* check port only takes a directed flow */\n+\t\t\tif (num > 1)\n+\t\t\t\tbreak;\n+\n+\t\t\tp->is_directed = 1;\n+\t\t\tp->num_qids_mapped = 1;\n+\t\t} else if (q->type == RTE_SCHED_TYPE_ORDERED) {\n+\t\t\tp->num_ordered_qids++;\n+\t\t\tp->num_qids_mapped++;\n+\t\t} else if (q->type == RTE_SCHED_TYPE_ATOMIC) {\n+\t\t\tp->num_qids_mapped++;\n+\t\t}\n+\n+\t\tq->cq_map[q->cq_num_mapped_cqs++] = port_id;\n+\t}\n+\treturn i;\n+}\n+\n+static void\n+sw_dump(FILE *f, const struct rte_event_dev *dev)\n+{\n+\tstatic const char *q_type_strings[] = {\"Ordered\" , \"Atomic\",\n+\t\t\t\"Parallel\", \"Directed\"\n+\t};\n+\tuint32_t i;\n+\tconst struct sw_evdev *sw = (const void *)dev;\n+\tfprintf(f, \"EventDev %s: ports %d, qids %d\\n\", sw->dev.name,\n+\t\t\tsw->port_count, sw->qid_count);\n+\n+\tfprintf(f, \"\\trx   %\"PRIu64\"\\n\\tdrop %\"PRIu64\"\\n\\ttx   %\"PRIu64\"\\n\",\n+\t\tsw->stats.rx_pkts, sw->stats.rx_dropped, sw->stats.tx_pkts);\n+\tfprintf(f, \"\\tsched calls: %\"PRIu64\"\\n\", sw->sched_called);\n+\tfprintf(f, \"\\tsched cq/qid call: %\"PRIu64\"\\n\", sw->sched_cq_qid_called);\n+\tfprintf(f, \"\\tsched no IQ enq: %\"PRIu64\"\\n\", sw->sched_no_iq_enqueues);\n+\tfprintf(f, \"\\tsched no CQ enq: %\"PRIu64\"\\n\", sw->sched_no_cq_enqueues);\n+\tfprintf(f, \"\\toverloads %\"PRIu64\"\\t%s\\n\", sw->sched_overload_counter,\n+\t\t\tsw->overloaded ? \" [OVERLOADED NOW]\" : \"\");\n+\n+#define COL_RED \"\\x1b[31m\"\n+#define COL_RESET \"\\x1b[0m\"\n+\n+\tfor (i = 0; i < sw->port_count; i++) {\n+\t\tconst struct sw_port *p = &sw->ports[i];\n+\t\tfprintf(f, \"  Port %d %s %s\\n\", i,\n+\t\t\t\tp->is_directed ? \" (SingleCons)\" : \"\",\n+\t\t\t\tp->overloaded ? \" [\"COL_RED\"OVERLOAD\"COL_RESET\"]\" : \"\");\n+\t\tfprintf(f, \"\\trx   %\"PRIu64\"\\n\\tdrop %\"PRIu64\"\\n\\ttx   %\"PRIu64\"\\n\"\n+\t\t\t\"\\tinf %d\\n\", sw->ports[i].stats.rx_pkts,\n+\t\t\tsw->ports[i].stats.rx_dropped,\n+\t\t\tsw->ports[i].stats.tx_pkts, sw->ports[i].inflights);\n+\n+\t\tuint64_t rx_used = qe_ring_count(p->rx_worker_ring);\n+\t\tuint64_t rx_free = qe_ring_free_count(p->rx_worker_ring);\n+\t\tconst char *rxcol = (rx_free == 0) ? COL_RED : COL_RESET;\n+\t\tfprintf(f, \"\\t%srx ring used: %ld\\tfree: %ld\"COL_RESET\"\\n\",\n+\t\t\t\trxcol, rx_used, rx_free);\n+\n+\t\tuint64_t tx_used = qe_ring_count(p->cq_worker_ring);\n+\t\tuint64_t tx_free = qe_ring_free_count(p->cq_worker_ring);\n+\t\tconst char *txcol = (tx_free == 0) ? COL_RED : COL_RESET;\n+\t\tfprintf(f, \"\\t%scq ring used: %ld\\tfree: %ld\"COL_RESET\"\\n\",\n+\t\t\t\ttxcol, tx_used, tx_free);\n+\t}\n+\n+\tfor (i = 0; i < sw->qid_count; i++) {\n+\t\tfprintf(f, \"  Queue %d (%s)\\n\", i, q_type_strings[sw->qids[i].type]);\n+\t\tfprintf(f, \"\\trx   %\"PRIu64\"\\n\\tdrop %\"PRIu64\"\\n\\ttx   %\"PRIu64\"\\n\",\n+\t\t\tsw->qids[i].stats.rx_pkts, sw->qids[i].stats.rx_dropped,\n+\t\t\tsw->qids[i].stats.tx_pkts);\n+\t\tuint32_t iq;\n+\t\tfor(iq = 0; iq < SW_IQS_MAX; iq++) {\n+\t\t\tuint32_t used = iq_ring_count(sw->qids[i].iq[iq]);\n+\t\t\tuint32_t free = iq_ring_free_count(sw->qids[i].iq[iq]);\n+\t\t\tconst char *col = (free == 0) ? COL_RED : COL_RESET;\n+\t\t\tfprintf(f, \"\\t%siq %d: Used %d\\tFree %d\"COL_RESET\"\\n\",\n+\t\t\t\t\tcol, iq, used, free);\n+\t\t}\n+\t}\n+}\n+\n+static int\n+sw_port_setup(struct rte_event_dev *dev, uint8_t port_id,\n+\t\tconst struct rte_event_port_conf *conf)\n+{\n+\tstruct sw_evdev *sw = (void *)dev;\n+\tstruct sw_port *p = &sw->ports[port_id];\n+\tchar buf[QE_RING_NAMESIZE];\n+\tunsigned i;\n+\n+\tif (conf->enqueue_queue_depth >\n+\t\t\t\tdev->info.max_event_port_enqueue_queue_depth ||\n+\t\t\tconf->dequeue_queue_depth >\n+\t\t\t\tdev->info.max_event_port_dequeue_queue_depth){\n+\t\trte_errno = EINVAL;\n+\t\treturn -1;\n+\t}\n+\n+\t*p = (struct sw_port){0}; /* zero entire structure */\n+\tp->id = port_id;\n+\n+\t/* TODO: how do we work with an overload scheme here?\n+\t * For now, still use a huge buffer, with per-port thresholds.\n+\t * When it fills beyond the configured max size, we throttle.\n+\t */\n+\tsnprintf(buf, sizeof(buf), \"%s_%s\", dev->name, \"rx_worker_ring\");\n+\tp->rx_worker_ring = qe_ring_create(buf, MAX_SW_PROD_Q_DEPTH,\n+\t\t\tdev->socket_id);\n+\tif (p->rx_worker_ring == NULL)\n+\t\treturn -1;\n+\n+\t/* threshold is number of free spaces that are left in ring\n+\t * before overload should kick in. QE ring returns free_count,\n+\t * so storing this way makes more sense than actual depth\n+\t */\n+\tuint32_t requested = MAX_SW_PROD_Q_DEPTH - conf->new_event_threshold;\n+\tp->overload_threshold = requested > 255 ? 255 : requested;\n+\n+\tsnprintf(buf, sizeof(buf), \"%s_%s\", dev->name, \"cq_worker_ring\");\n+\tp->cq_worker_ring = qe_ring_create(buf, conf->dequeue_queue_depth,\n+\t\t\tdev->socket_id);\n+\tif (p->cq_worker_ring == NULL) {\n+\t\tqe_ring_destroy(p->rx_worker_ring);\n+\t\treturn -1;\n+\t}\n+\tsw->cq_ring_space[port_id] = conf->dequeue_queue_depth;\n+\n+\t/* set hist list contents to empty */\n+\tfor (i = 0; i < SW_PORT_HIST_LIST; i++) {\n+\t\tp->hist_list[i].fid = -1;\n+\t\tp->hist_list[i].qid = -1;\n+\t}\n+\n+\treturn 0;\n+}\n+\n+static int\n+sw_port_cleanup(struct sw_evdev *sw, uint8_t port_id)\n+{\n+\tstruct sw_port *p = &sw->ports[port_id];\n+\n+\tqe_ring_destroy(p->rx_worker_ring);\n+\tqe_ring_destroy(p->cq_worker_ring);\n+\tmemset(p, 0, sizeof(*p));\n+\n+\treturn 0;\n+}\n+\n+static uint8_t\n+sw_port_count(struct rte_event_dev *dev)\n+{\n+\tstruct sw_evdev *sw = (void *)dev;\n+\treturn sw->port_count;\n+}\n+\n+\n+static uint16_t\n+sw_queue_count(struct rte_event_dev *dev)\n+{\n+\tstruct sw_evdev *sw = (void *)dev;\n+\treturn sw->qid_count;\n+}\n+\n+static int32_t\n+qid_cleanup(struct sw_evdev *sw, uint32_t idx)\n+{\n+\tstruct sw_qid *qid = &sw->qids[idx];\n+\tuint32_t i;\n+\n+\tfor (i = 0; i < SW_IQS_MAX; i++) {\n+\t\tiq_ring_destroy(qid->iq[i]);\n+\t}\n+\n+\tif (qid->type == RTE_SCHED_TYPE_ORDERED) {\n+\t\trte_free(qid->reorder_buffer);\n+\t\trte_ring_free(qid->reorder_buffer_freelist);\n+\t}\n+\tmemset(qid, 0, sizeof(*qid));\n+\n+\treturn 0;\n+}\n+\n+static int32_t\n+qid_init(struct sw_evdev *sw, unsigned idx, int type,\n+\t\tconst struct rte_event_queue_conf *queue_conf)\n+{\n+\tint i;\n+\tint socket_id = sw->dev.socket_id;\n+\tchar buf[IQ_RING_NAMESIZE];\n+\tstruct sw_qid *qid = &sw->qids[idx];\n+\n+\tfor (i = 0; i < SW_IQS_MAX; i++) {\n+\t\tsnprintf(buf, sizeof(buf), \"q_%u_iq_%d\", idx, i);\n+\t\tqid->iq[i] = iq_ring_create(buf, socket_id);\n+\t\tif (!qid->iq[i]) {\n+\t\t\tSW_LOG_DBG(\"ring create failed\");\n+\t\t\tgoto cleanup;\n+\t\t}\n+\t}\n+\n+\t/* Initialize the iq packet mask to 1, as __builtin_clz() is undefined\n+\t * if the value passed in is zero.\n+\t */\n+\tqid->iq_pkt_mask = 1;\n+\n+\t/* Initialize the FID structures to no pinning (-1), and zero packets */\n+\tstruct sw_fid_t fid = {.cq = -1, .count = 0};\n+\tfor (i = 0; i < SW_QID_NUM_FIDS; i++)\n+\t\tqid->fids[i] = fid;\n+\n+\tqid->id = idx;\n+\tqid->type = type;\n+\tqid->priority = queue_conf->priority;\n+\n+\tif (qid->type == RTE_SCHED_TYPE_ORDERED) {\n+\t\tuint32_t window_size;\n+\n+\t\t/* rte_ring and window_size_mask require require window_size to\n+\t\t * be a power-of-2.\n+\t\t */\n+\t\twindow_size = rte_align32pow2(\n+\t\t\t\tqueue_conf->nb_atomic_order_sequences);\n+\n+\t\tqid->window_size = window_size - 1;\n+\n+\t\tif (!window_size) {\n+\t\t\tSW_LOG_DBG(\"invalid reorder_window_size for ordered queue\\n\");\n+\t\t\tgoto cleanup;\n+\t\t}\n+\n+\t\tsnprintf(buf, sizeof(buf), \"%s_iq_%d_rob\", sw->dev.name, i);\n+\t\tqid->reorder_buffer = rte_zmalloc_socket(buf,\n+\t\t\t\twindow_size * sizeof(qid->reorder_buffer[0]),\n+\t\t\t\t0, socket_id);\n+\t\tif (!qid->reorder_buffer) {\n+\t\t\tSW_LOG_DBG(\"reorder_buffer malloc failed\\n\");\n+\t\t\tgoto cleanup;\n+\t\t}\n+\n+\t\tmemset(&qid->reorder_buffer[0],\n+\t\t       0,\n+\t\t       window_size * sizeof(qid->reorder_buffer[0]));\n+\n+\t\tsnprintf(buf, sizeof(buf), \"%s_iq_%d_freelist\", sw->dev.name, i);\n+\t\tqid->reorder_buffer_freelist = rte_ring_create(buf,\n+\t\t\t\twindow_size,\n+\t\t\t\tsocket_id,\n+\t\t\t\tRING_F_SP_ENQ | RING_F_SC_DEQ);\n+\t\tif (!qid->reorder_buffer_freelist) {\n+\t\t\tSW_LOG_DBG(\"freelist ring create failed\");\n+\t\t\tgoto cleanup;\n+\t\t}\n+\n+\t\t/* Populate the freelist with reorder buffer entries. Enqueue\n+\t\t * 'window_size - 1' entries because the rte_ring holds only\n+\t\t * that many.\n+\t\t */\n+\t\tfor (i = 0; i < (int) window_size - 1; i++) {\n+\t\t\tif (rte_ring_sp_enqueue(qid->reorder_buffer_freelist,\n+\t\t\t\t\t\t&qid->reorder_buffer[i]) < 0)\n+\t\t\t\tgoto cleanup;\n+\t\t}\n+\n+\t\tqid->reorder_buffer_index = 0;\n+\t\tqid->cq_next_tx = 0;\n+\t}\n+\n+\treturn 0;\n+\n+cleanup:\n+\tfor (i = 0; i < SW_IQS_MAX; i++) {\n+\t\tif (qid->iq[i])\n+\t\t\tiq_ring_destroy(qid->iq[i]);\n+\t}\n+\n+\tif (qid->reorder_buffer) {\n+\t\trte_free(qid->reorder_buffer);\n+\t\tqid->reorder_buffer = NULL;\n+\t}\n+\n+\tif (qid->reorder_buffer_freelist) {\n+\t\trte_ring_free(qid->reorder_buffer_freelist);\n+\t\tqid->reorder_buffer_freelist = NULL;\n+\t}\n+\n+\treturn -EINVAL;\n+}\n+\n+static int\n+sw_queue_setup(struct rte_event_dev *dev,\n+\t\tuint8_t queue_id,\n+\t\tconst struct rte_event_queue_conf *conf)\n+{\n+\tint type;\n+\tif (conf->nb_atomic_flows > 0 &&\n+\t\t\tconf ->nb_atomic_order_sequences > 0)\n+\t\treturn -1;\n+\n+\tif (conf->event_queue_cfg & RTE_EVENT_QUEUE_CFG_SINGLE_CONSUMER)\n+\t\ttype = RTE_SCHED_TYPE_DIRECT;\n+\telse if (conf->nb_atomic_flows > 0)\n+\t\ttype = RTE_SCHED_TYPE_ATOMIC;\n+\telse if (conf->nb_atomic_order_sequences > 0)\n+\t\ttype = RTE_SCHED_TYPE_ORDERED;\n+\telse\n+\t\ttype = RTE_SCHED_TYPE_PARALLEL;\n+\n+\treturn qid_init((void *)dev, queue_id, type, conf);\n+}\n+\n+static int\n+sw_dev_configure(struct rte_event_dev *dev,\n+\t\t\tstruct rte_event_dev_config *config)\n+{\n+\tstruct sw_evdev *se = (void *)dev;\n+\n+\tif (config->nb_event_queues > dev->info.max_event_queues ||\n+\t\t\tconfig->nb_event_ports > dev->info.max_event_ports)\n+\t\treturn -1;\n+\n+\tse->qid_count = config->nb_event_queues;\n+\tse->port_count = config->nb_event_ports;\n+\treturn 0;\n+}\n+\n+static int\n+assign_numa_node(const char *key __rte_unused, const char *value, void *opaque)\n+{\n+\tint *socket_id = opaque;\n+\t*socket_id = atoi(value);\n+\tif (*socket_id > RTE_MAX_NUMA_NODES)\n+\t\treturn -1;\n+\treturn 0;\n+}\n+\n+static inline void\n+swap_ptr(void *a, void *b)\n+{\n+\tvoid *tmp = a;\n+\ta = b;\n+\tb= tmp;\n+}\n+\n+static int\n+sw_start(struct rte_event_dev *dev)\n+{\n+\tunsigned int i, j;\n+\tstruct sw_evdev *sw = (void *)dev;\n+\t/* check all ports are set up */\n+\tfor (i = 0; i < sw->port_count; i++)\n+\t\tif (sw->ports[i].rx_worker_ring == NULL)\n+\t\t\treturn -1;\n+\n+\t/* check all queues are configured and mapped to ports*/\n+\tfor (i = 0; i < sw->qid_count; i++)\n+\t\tif (sw->qids[i].iq[0] == NULL ||\n+\t\t\t\tsw->qids[i].cq_num_mapped_cqs == 0)\n+\t\t\treturn -1;\n+\n+\t/* build up our prioritized array of qids */\n+\t/* We don't use qsort here, as if all/multiple entries have the same\n+\t * priority, the result is non-deterministic. From \"man 3 qsort\":\n+\t * \"If two members compare as equal, their order in the sorted\n+\t * array is undefined.\"\n+\t */\n+\tfor (i = 0; i < sw->qid_count; i++) {\n+\t\tsw->qids_prioritized[i] = &sw->qids[i];\n+\t\tfor (j = i; j > 0; j--)\n+\t\t\tif (sw->qids_prioritized[j]->priority <\n+\t\t\t\t\tsw->qids_prioritized[j-1]->priority)\n+\t\t\t\tswap_ptr(sw->qids_prioritized[j],\n+\t\t\t\t\t\tsw->qids_prioritized[j-1]);\n+\t}\n+\tsw->started = 1;\n+\treturn 0;\n+}\n+\n+static void\n+sw_stop(struct rte_event_dev *dev)\n+{\n+\tstruct sw_evdev *sw = (void *)dev;\n+\tsw->started = 0;\n+}\n+static int\n+sw_close(struct rte_event_dev *dev)\n+{\n+\tstruct sw_evdev *sw = (void *)dev;\n+\tuint32_t i;\n+\n+\tfor(i = 0; i < sw->qid_count; i++) {\n+\t\tqid_cleanup(sw, i);\n+\t}\n+\tsw->qid_count = 0;\n+\n+\tfor (i = 0; i < sw->port_count; i++) {\n+\t\tsw_port_cleanup(sw, i);\n+\t}\n+\tsw->port_count = 0;\n+\n+\tmemset(&sw->stats, 0, sizeof(sw->stats));\n+\n+\treturn 0;\n+}\n+\n+static int\n+sw_probe(const char *name, const char *params)\n+{\n+\tstatic const struct rte_event_dev_ops evdev_sw_ops = {\n+\t\t\t.configure = sw_dev_configure,\n+\t\t\t.queue_setup = sw_queue_setup,\n+\t\t\t.queue_count = sw_queue_count,\n+\t\t\t.port_setup = sw_port_setup,\n+\t\t\t.port_link = sw_port_link,\n+\t\t\t.port_count = sw_port_count,\n+\t\t\t.start = sw_start,\n+\t\t\t.stop = sw_stop,\n+\t\t\t.close = sw_close,\n+\t\t\t.stats_get = sw_dev_stats_get,\n+\t\t\t.dump = sw_dump,\n+\n+\t\t\t.enqueue = sw_event_enqueue,\n+\t\t\t.enqueue_burst = sw_event_enqueue_burst,\n+\t\t\t.dequeue = sw_event_dequeue,\n+\t\t\t.dequeue_burst = sw_event_dequeue_burst,\n+\t\t\t.release = sw_event_release,\n+\t\t\t.schedule = sw_event_schedule,\n+\t};\n+\tstatic const char *args[] = { NUMA_NODE_ARG, NULL };\n+\tconst struct rte_memzone *mz;\n+\tstruct sw_evdev *se;\n+\tstruct rte_event_dev_info evdev_sw_info = {\n+\t\t\t.driver_name = PMD_NAME,\n+\t\t\t.max_event_queues = SW_QIDS_MAX,\n+\t\t\t.max_event_queue_flows = SW_QID_NUM_FIDS,\n+\t\t\t.max_event_queue_priority_levels = SW_Q_PRIORITY_MAX,\n+\t\t\t.max_event_priority_levels = SW_IQS_MAX,\n+\t\t\t.max_event_ports = SW_PORTS_MAX,\n+\t\t\t.max_event_port_dequeue_queue_depth = MAX_SW_CONS_Q_DEPTH,\n+\t\t\t.max_event_port_enqueue_queue_depth = MAX_SW_PROD_Q_DEPTH,\n+\t\t\t/* for event limits, there is no hard limit, but it\n+\t\t\t * depends on number of Queues configured and depth of\n+\t\t\t * producer/consumer queues\n+\t\t\t */\n+\t\t\t.max_num_events = -1,\n+\t\t\t.event_dev_cap = (RTE_EVENT_DEV_CAP_QUEUE_QOS |\n+\t\t\t\t\tRTE_EVENT_DEV_CAP_EVENT_QOS),\n+\t};\n+\tint socket_id = 0;\n+\n+\tif (params != NULL && params[0] != '\\0') {\n+\t\tstruct rte_kvargs *kvlist = rte_kvargs_parse(params, args);\n+\n+\t\tif (!kvlist) {\n+\t\t\tRTE_LOG(INFO, PMD,\n+\t\t\t\t\"Ignoring unsupported parameters when creating device '%s'\\n\",\n+\t\t\t\tname);\n+\t\t} else {\n+\t\t\tint ret = rte_kvargs_process(kvlist, NUMA_NODE_ARG,\n+\t\t\t\t\tassign_numa_node, &socket_id);\n+\t\t\trte_kvargs_free(kvlist);\n+\n+\t\t\tif (ret != 0) {\n+\t\t\t\tRTE_LOG(ERR, PMD,\n+\t\t\t\t\t\"%s: Error parsing numa node parameter\",\n+\t\t\t\t\tname);\n+\t\t\t\treturn ret;\n+\t\t\t}\n+\t\t}\n+\t}\n+\n+\tRTE_LOG(INFO, PMD, \"Creating eventdev sw device %s, on numa node %d\\n\",\n+\t\t\tname, socket_id);\n+\n+\tmz = rte_memzone_reserve(name, sizeof(*se), socket_id, 0);\n+\tif (mz == NULL)\n+\t\treturn -1; /* memzone_reserve sets rte_errno on error */\n+\n+\tse = mz->addr;\n+\tse->mz = mz;\n+\tsnprintf(se->dev.name, sizeof(se->dev.name), \"%s\", name);\n+\tse->dev.configured = false;\n+\tse->dev.info = evdev_sw_info;\n+\tse->dev.ops = &evdev_sw_ops;\n+\tse->dev.socket_id = socket_id;\n+\n+\treturn rte_event_dev_register(&se->dev);\n+}\n+\n+static int\n+sw_remove(const char *name)\n+{\n+\tif (name == NULL)\n+\t\treturn -EINVAL;\n+\n+\tRTE_LOG(INFO, PMD, \"Closing eventdev sw device %s\\n\", name);\n+\t/* TODO unregister eventdev and release memzone */\n+\n+\treturn 0;\n+}\n+\n+static struct rte_vdev_driver evdev_sw_pmd_drv = {\n+\t.probe = sw_probe,\n+\t.remove = sw_remove\n+};\n+\n+RTE_PMD_REGISTER_VDEV(evdev_sw, evdev_sw_pmd_drv);\n+RTE_PMD_REGISTER_PARAM_STRING(evdev_sw,\"numa_node=<int>\");\ndiff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h\nnew file mode 100644\nindex 0000000..534e078\n--- /dev/null\n+++ b/drivers/event/sw/sw_evdev.h\n@@ -0,0 +1,234 @@\n+/*-\n+ *   BSD LICENSE\n+ *\n+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.\n+ *\n+ *   Redistribution and use in source and binary forms, with or without\n+ *   modification, are permitted provided that the following conditions\n+ *   are met:\n+ *\n+ *     * Redistributions of source code must retain the above copyright\n+ *       notice, this list of conditions and the following disclaimer.\n+ *     * Redistributions in binary form must reproduce the above copyright\n+ *       notice, this list of conditions and the following disclaimer in\n+ *       the documentation and/or other materials provided with the\n+ *       distribution.\n+ *     * Neither the name of Intel Corporation nor the names of its\n+ *       contributors may be used to endorse or promote products derived\n+ *       from this software without specific prior written permission.\n+ *\n+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n+ *   \"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n+ */\n+\n+#ifndef _SW_EVDEV_H_\n+#define _SW_EVDEV_H_\n+\n+#include <rte_eventdev.h>\n+#include <rte_eventdev_pmd.h>\n+#include \"event_ring.h\"\n+\n+#define PMD_NAME \"evdev_sw\"\n+\n+#define SW_QIDS_MAX 128\n+#define SW_QID_NUM_FIDS 16384\n+#define SW_IQS_MAX 4\n+#define SW_Q_PRIORITY_MAX 255\n+#define SW_PORTS_MAX 128\n+#define MAX_SW_CONS_Q_DEPTH 255\n+\n+/* allow for lots of over-provisioning */\n+#define MAX_SW_PROD_Q_DEPTH 4096\n+\n+#define SW_FRAGMENTS_MAX 16\n+#define PORT_DEQUEUE_BURST_SIZE 16\n+#define SW_PORT_HIST_LIST (MAX_SW_PROD_Q_DEPTH + (MAX_SW_CONS_Q_DEPTH*2))\n+\n+#define SW_PORT_OVERLOAD_THRES (512)\n+\n+#define RTE_SCHED_TYPE_DIRECT (RTE_SCHED_TYPE_PARALLEL + 1)\n+\n+#ifdef RTE_LIBRTE_PMD_EVDEV_SW_DEBUG\n+#define SW_LOG_INFO(fmt, args...) \\\n+\tRTE_LOG(INFO, PMD, \"[%s] %s() line %u: \" fmt \"\\n\", \\\n+\t\t\tPMD_NAME, \\\n+\t\t\t__func__, __LINE__, ## args)\n+\n+#define SW_LOG_DBG(fmt, args...) \\\n+\tRTE_LOG(DEBUG, PMD, \"[%s] %s() line %u: \" fmt \"\\n\", \\\n+\t\t\tPMD_NAME, \\\n+\t\t\t__func__, __LINE__, ## args)\n+#else\n+#define SW_LOG_INFO(fmt, args...)\n+#define SW_LOG_DBG(fmt, args...)\n+#endif\n+\n+enum {\n+\tQE_FLAG_VALID_SHIFT = 0,\n+\tQE_FLAG_COMPLETE_SHIFT,\n+\tQE_FLAG_NOT_EOP_SHIFT,\n+\t_QE_FLAG_COUNT\n+};\n+\n+#define QE_FLAG_VALID    (1 << QE_FLAG_VALID_SHIFT)  /* set for NEW, FWD, FRAG */\n+#define QE_FLAG_COMPLETE (1 << QE_FLAG_COMPLETE_SHIFT)  /* set for FWD, DROP */\n+#define QE_FLAG_NOT_EOP  (1 << QE_FLAG_NOT_EOP_SHIFT)  /* set for FRAG only */\n+\n+static const uint8_t sw_qe_flag_map[] = {\n+\t\tQE_FLAG_VALID /* RTE_QEENT_OP_NEW */,\n+\t\tQE_FLAG_VALID | QE_FLAG_COMPLETE /* RTE_QEENT_OP_FWD */,\n+\t\tQE_FLAG_COMPLETE /* RTE_QEENT_OP_DROP */,\n+\t\tQE_FLAG_VALID | QE_FLAG_COMPLETE | QE_FLAG_NOT_EOP,\n+};\n+\n+/* Records basic event stats at a given point. Used in port and qid structs */\n+struct sw_point_stats {\n+\tuint64_t rx_pkts;\n+\tuint64_t rx_dropped;\n+\tuint64_t tx_pkts;\n+};\n+\n+struct reorder_buffer_entry {\n+\tuint16_t num_fragments;\t\t/**< Number of packet fragments */\n+\tuint16_t fragment_index;\t/**< Points to the oldest valid frag */\n+\tuint8_t ready;\t\t\t/**< Entry is ready to be reordered */\n+\tstruct rte_event fragments[SW_FRAGMENTS_MAX];\n+};\n+\n+struct sw_hist_list_entry {\n+\tint32_t qid;\n+\tint32_t fid;\n+\tstruct reorder_buffer_entry *rob_entry;\n+};\n+\n+struct sw_port {\n+\t/* A numeric ID for the port. This should be used to access the\n+\t * statistics as returned by *rte_event_dev_stats_get*, and in other\n+\t * places where the API requires accessing a port by integer. It is not\n+\t * valid to assume that ports will be allocated in a linear sequence.\n+\t */\n+\tuint8_t id;\n+\n+\t/** Indicates if this port is overloaded, and we need to throttle input */\n+\tuint8_t overloaded;\n+\tuint8_t overload_threshold;\n+\n+\tint16_t is_directed; /** Takes from a single directed QID */\n+\tint16_t num_ordered_qids; /** For loadbalanced we can optimise pulling\n+\t\t\t    packets from producers if there is no reordering\n+\t\t\t    involved */\n+\n+\t/* track packets in and out of this port */\n+\tstruct sw_point_stats stats;\n+\n+\t/** Ring and buffer for pulling events from workers for scheduling */\n+\tstruct qe_ring *rx_worker_ring __rte_cache_aligned;\n+\tuint32_t pp_buf_start;\n+\tuint32_t pp_buf_count;\n+\tstruct rte_event pp_buf[PORT_DEQUEUE_BURST_SIZE];\n+\n+\n+\t/** Ring and buffer for pushing packets to workers after scheduling */\n+\tstruct qe_ring *cq_worker_ring __rte_cache_aligned;\n+\tuint16_t cq_buf_count;\n+\tuint16_t outstanding_releases; /* num releases yet to be completed */\n+\tstruct rte_event cq_buf[MAX_SW_CONS_Q_DEPTH];\n+\n+\t/* History list structs, containing info on pkts egressed to worker */\n+\tuint16_t hist_head __rte_cache_aligned;\n+\tuint16_t hist_tail;\n+\tuint16_t inflights;\n+\tstruct sw_hist_list_entry hist_list[SW_PORT_HIST_LIST];\n+\n+\tuint8_t num_qids_mapped;\n+};\n+\n+struct sw_fid_t {\n+\t/* which CQ this FID is currently pinned to */\n+\tuint32_t cq;\n+\t/* number of packets gone to the CQ with this FID */\n+\tuint32_t count;\n+};\n+\n+struct sw_qid {\n+\t/* The type of this QID */\n+\tint type;\n+\t/* Integer ID representing the queue. This is used in history lists,\n+\t * to identify the stage of processing. */\n+\tuint32_t id;\n+\tstruct sw_point_stats stats;\n+\n+\t/* Internal priority rings for packets */\n+\tstruct iq_ring *iq[SW_IQS_MAX];\n+\tuint32_t iq_pkt_mask; \t/* A mask to indicate packets in an IQ */\n+\tuint64_t iq_pkt_count[SW_IQS_MAX];\n+\n+\t/* Information on what CQs are polling this IQ */\n+\tuint32_t cq_num_mapped_cqs;\n+\tuint32_t cq_next_tx; /* cq to write next (non-atomic) packet */\n+\tuint32_t cq_map[SW_PORTS_MAX];\n+\n+\t/* Track flow ids for atomic load balancing */\n+\tstruct sw_fid_t fids[SW_QID_NUM_FIDS];\n+\n+\t/* Track packet order for reordering when needed */\n+\tstruct reorder_buffer_entry *reorder_buffer; /* packets awaiting reordering */\n+\tstruct rte_ring *reorder_buffer_freelist; /* available reorder slots */\n+\tuint32_t reorder_buffer_index; /* oldest valid reorder buffer entry */\n+\tuint32_t window_size;          /* Used to wrap reorder_buffer_index */\n+\n+\tuint8_t priority;\n+};\n+\n+struct sw_evdev {\n+\t/* must be the first item in the private dev struct */\n+\tstruct rte_event_dev dev;\n+\n+\tconst struct rte_memzone *mz;\n+\n+\t/* Contains all ports - load balanced and directed */\n+\tstruct sw_port ports[SW_PORTS_MAX];\n+\tuint32_t port_count;\n+\tuint16_t cq_ring_space[SW_PORTS_MAX]; /* How many packets are in the cq */\n+\n+\t/* All qids - allocated in one slab for vectorization */\n+\tstruct sw_qid qids[SW_QIDS_MAX];\n+\tuint32_t qid_count;\n+\n+\t/* Array of pointers to load-balanced QIDs sorted by priority level */\n+\tstruct sw_qid *qids_prioritized[SW_QIDS_MAX];\n+\n+\t/* Stats */\n+\tstruct sw_point_stats stats __rte_cache_aligned;\n+\tuint64_t sched_called;\n+\tuint64_t sched_no_iq_enqueues;\n+\tuint64_t sched_no_cq_enqueues;\n+\tuint64_t sched_cq_qid_called;\n+\tuint64_t sched_overload_counter;\n+\n+\tuint8_t started;\n+\n+\tuint32_t overloaded __rte_cache_aligned;\n+};\n+\n+int  sw_event_enqueue(struct rte_event_dev *dev, uint8_t port_id,\n+\t\t     struct rte_event *ev, bool pin_event);\n+int  sw_event_enqueue_burst(struct rte_event_dev *dev, uint8_t port_id,\n+\t\t\t   struct rte_event ev[], int num, bool pin_event);\n+bool sw_event_dequeue(struct rte_event_dev *dev, uint8_t port_id,\n+\t\t      struct rte_event *ev, uint64_t wait);\n+int  sw_event_dequeue_burst(struct rte_event_dev *dev, uint8_t port_id,\n+\t\t\t   struct rte_event *ev, int num, uint64_t wait);\n+void sw_event_release(struct rte_event_dev *dev, uint8_t port_id, uint8_t index);\n+int  sw_event_schedule(struct rte_event_dev *dev);\n+\n+#endif /* _SW_EVDEV_H_ */\ndiff --git a/drivers/event/sw/sw_evdev_scheduler.c b/drivers/event/sw/sw_evdev_scheduler.c\nnew file mode 100644\nindex 0000000..02831d2\n--- /dev/null\n+++ b/drivers/event/sw/sw_evdev_scheduler.c\n@@ -0,0 +1,660 @@\n+/*-\n+ *   BSD LICENSE\n+ *\n+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.\n+ *\n+ *   Redistribution and use in source and binary forms, with or without\n+ *   modification, are permitted provided that the following conditions\n+ *   are met:\n+ *\n+ *     * Redistributions of source code must retain the above copyright\n+ *       notice, this list of conditions and the following disclaimer.\n+ *     * Redistributions in binary form must reproduce the above copyright\n+ *       notice, this list of conditions and the following disclaimer in\n+ *       the documentation and/or other materials provided with the\n+ *       distribution.\n+ *     * Neither the name of Intel Corporation nor the names of its\n+ *       contributors may be used to endorse or promote products derived\n+ *       from this software without specific prior written permission.\n+ *\n+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n+ *   \"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n+ */\n+\n+#include <rte_ring.h>\n+#include \"sw_evdev.h\"\n+#include \"iq_ring.h\"\n+\n+#define SW_IQS_MASK (SW_IQS_MAX-1)\n+\n+/* Retrieve the highest priority IQ or -1 if no pkts available. Doing the\n+ * CLZ twice is faster than caching the value due to data dependencies\n+ */\n+#define PKT_MASK_TO_IQ(pkts) \\\n+\t(__builtin_ctz(pkts | (1 << SW_IQS_MAX)))\n+\n+/* Clamp the highest priorities to the max value as allowed by\n+ * the mask. Assums MASK is (powerOfTwo - 1). Priority 0 (highest) are shifted\n+ * into leftmost IQ so that clz() reads it first on dequeue\n+ */\n+#define PRIO_TO_IQ(prio) (prio > SW_IQS_MASK ? SW_IQS_MASK : prio)\n+\n+static inline uint32_t\n+sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,\n+\t\t    uint32_t iq_num, unsigned int count)\n+{\n+\tuint32_t i;\n+\n+\tif(count == 0)\n+\t\treturn 0;\n+\n+\t/* This is the QID ID. The QID ID is static, hence it can be\n+\t * used to identify the stage of processing in history lists etc */\n+\tuint32_t qid_id = qid->id;\n+\n+\tfor (i = 0; i < count; i++) {\n+\t\tconst struct rte_event *qe = iq_ring_peek(qid->iq[iq_num]);\n+\t\tstruct sw_fid_t *fid = &qid->fids[qe->flow_id];\n+\t\tint cq = fid->cq;\n+\n+\t\t/* If no CQ is assigned, pick one */\n+\t\tif (cq < 0) {\n+\t\t\t/* select CQ based on least inflights,\n+\t\t\t * defaulting to the first mapped CQ\n+\t\t\t */\n+\t\t\tuint32_t cq_idx = qid->cq_next_tx++;\n+\t\t\tif (qid->cq_next_tx == qid->cq_num_mapped_cqs)\n+\t\t\t\tqid->cq_next_tx = 0;\n+\t\t\tcq = qid->cq_map[cq_idx];\n+\t\t\tint cq_free_cnt = sw->cq_ring_space[cq];\n+\n+\t\t\tfor (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs; cq_idx++) {\n+\t\t\t\tint test_cq = qid->cq_map[cq_idx];\n+\t\t\t\tint test_cq_free = sw->cq_ring_space[test_cq];\n+\n+\t\t\t\tif (test_cq_free > cq_free_cnt)\n+\t\t\t\t\tcq = test_cq, cq_free_cnt = test_cq_free;\n+\t\t\t}\n+\t\t}\n+\n+\t\tstruct sw_port *p = &sw->ports[cq];\n+\n+\t\t/* If the destination CQ or its history list is full, move on\n+\t\t* to the next queue.\n+\t\t*/\n+\t\tif (sw->cq_ring_space[cq] == 0 ||\n+\t\t\t\tp->inflights == SW_PORT_HIST_LIST) {\n+\t\t\tstruct qe_ring *worker = sw->ports[cq].cq_worker_ring;\n+\t\t\tqe_ring_enqueue_burst(worker, sw->ports[cq].cq_buf,\n+\t\t\t\t\tsw->ports[cq].cq_buf_count,\n+\t\t\t\t\t&sw->cq_ring_space[cq]);\n+\t\t\tsw->ports[cq].cq_buf_count = 0;\n+#if 0\n+\t\t\tprintf(\"%s cq %d was 0, now %d\\n\", __func__,\n+\t\t\t\t\tcq, sw->cq_ring_space[cq]);\n+#endif\n+\t\t\tif(sw->cq_ring_space[cq] == 0)\n+\t\t\t\tbreak;\n+\t\t}\n+\n+\t\tsw->cq_ring_space[cq]--;\n+\n+\t\t/* store which CQ this FID is active on,\n+\t\t * for future pkts of the same flow\n+\t\t */\n+\t\tfid->cq = cq;\n+\t\tfid->count++;\n+\n+\t\tqid->stats.tx_pkts++;\n+\t\tsw->ports[cq].inflights++;\n+\n+\t\tint head = (p->hist_head & (SW_PORT_HIST_LIST-1));\n+\n+\t\tp->hist_list[head].fid = qe->flow_id;\n+\t\tp->hist_list[head].qid = qid_id;\n+\n+\t\tp->hist_head++;\n+\t\tp->stats.tx_pkts++;\n+\t\tsw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;\n+\t\tiq_ring_pop(qid->iq[iq_num]);\n+\t}\n+\treturn i;\n+}\n+\n+static inline uint32_t\n+sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,\n+\t\t      uint32_t iq_num, unsigned int count, int keep_order)\n+{\n+\tuint32_t i;\n+\tuint32_t cq_idx = qid->cq_next_tx;\n+\n+\t/* This is the QID ID. The QID ID is static, hence it can be\n+\t * used to identify the stage of processing in history lists etc */\n+\tuint32_t qid_id = qid->id;\n+\n+\n+\tif (keep_order)\n+\t\t/* only schedule as many as we have reorder buffer entries */\n+\t\tcount = RTE_MIN(count, rte_ring_count(qid->reorder_buffer_freelist));\n+\n+\tfor (i = 0; i < count; i++) {\n+\t\tconst struct rte_event *qe = iq_ring_peek(qid->iq[iq_num]);\n+\t\tuint32_t cq_check_count = 0;\n+\t\tuint32_t cq;\n+\n+\t\t/*\n+\t\t *  for parallel, just send to next available CQ in round-robin\n+\t\t * fashion. So scan for an available CQ. If all CQs are full\n+\t\t * just return and move on to next QID\n+\t\t */\n+\t\tdo {\n+\t\t\tif (++cq_check_count > qid->cq_num_mapped_cqs)\n+\t\t\t\tgoto exit;\n+\t\t\tcq = qid->cq_map[cq_idx];\n+\t\t\tif (++cq_idx == qid->cq_num_mapped_cqs)\n+\t\t\t\tcq_idx = 0;\n+\t\t} while (qe_ring_free_count(sw->ports[cq].cq_worker_ring) == 0 ||\n+\t\t\t\tsw->ports[cq].inflights == SW_PORT_HIST_LIST);\n+\n+\t\tstruct sw_port *p = &sw->ports[cq];\n+\t\tif (sw->cq_ring_space[cq] == 0 ||\n+\t\t\t\tp->inflights == SW_PORT_HIST_LIST)\n+\t\t\tbreak;\n+\n+\t\tsw->cq_ring_space[cq]--;\n+\n+\t\tqid->stats.tx_pkts++;\n+\n+\t\tconst int head = (p->hist_head & (SW_PORT_HIST_LIST-1));\n+\n+\t\tp->hist_list[head].fid = qe->flow_id;\n+\t\tp->hist_list[head].qid = qid_id;\n+\n+\t\tif (keep_order)\n+\t\t\trte_ring_sc_dequeue(qid->reorder_buffer_freelist,\n+\t\t\t\t\t(void *)&p->hist_list[head].rob_entry);\n+\n+\t\tsw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;\n+\t\tiq_ring_pop(qid->iq[iq_num]);\n+\n+\t\trte_compiler_barrier();\n+\t\tp->inflights++;\n+\t\tp->stats.tx_pkts++;\n+\t\tp->hist_head++;\n+\t}\n+exit:\n+\tqid->cq_next_tx = cq_idx;\n+\treturn i;\n+}\n+\n+static uint32_t\n+sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,\n+\t\t    uint32_t iq_num, unsigned int count)\n+{\n+\tuint32_t cq_id = qid->cq_map[0];\n+\tstruct sw_port *port = &sw->ports[cq_id];\n+\n+\t/* get max burst enq size for cq_ring */\n+\tuint32_t count_free = sw->cq_ring_space[cq_id];\n+\tif (count == 0 || count_free == 0)\n+\t\treturn 0;\n+\n+\t/* burst dequeue from the QID IQ ring */\n+\tstruct iq_ring *ring = qid->iq[iq_num];\n+\tuint32_t ret = iq_ring_dequeue_burst(ring,\n+\t\t\t&port->cq_buf[port->cq_buf_count], count_free);\n+\tport->cq_buf_count += ret;\n+\n+\t/* Update QID, Port and Total TX stats */\n+\tqid->stats.tx_pkts += ret;\n+\tport->stats.tx_pkts += ret;\n+\n+\t/* Subtract credits from cached value */\n+\tsw->cq_ring_space[cq_id] -= ret;\n+\n+\treturn ret;\n+}\n+\n+static uint32_t\n+sw_schedule_qid_to_cq(struct sw_evdev *sw)\n+{\n+\tuint32_t pkts = 0;\n+\tuint32_t qid_idx;\n+\n+\tsw->sched_cq_qid_called++;\n+\n+\tfor (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) {\n+\t\t/* make the QID lookup here be based on priority of the QID */\n+\t\tstruct sw_qid *qid = sw->qids_prioritized[qid_idx];\n+\n+\t\tint type = qid->type;\n+\t\tint iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask);\n+\n+\t\t/* zero mapped CQs indicates directed */\n+\t\tif (iq_num >= SW_IQS_MAX)\n+\t\t\tcontinue;\n+\n+\t\tunsigned int count = iq_ring_count(qid->iq[iq_num]);\n+\t\tuint32_t pkts_done = 0;\n+\n+\t\tif (type == RTE_SCHED_TYPE_DIRECT)\n+\t\t\tpkts_done += sw_schedule_dir_to_cq(sw, qid,\n+\t\t\t\t\tiq_num, count);\n+\t\telse if (type == RTE_SCHED_TYPE_ATOMIC)\n+\t\t\tpkts_done += sw_schedule_atomic_to_cq(sw, qid,\n+\t\t\t\t\tiq_num, count);\n+\t\telse\n+\t\t\tpkts_done += sw_schedule_parallel_to_cq(sw, qid,\n+\t\t\t\t\tiq_num, count,\n+\t\t\t\t\t(type == RTE_SCHED_TYPE_ORDERED));\n+\n+\t\t/* Check if the IQ that was polled is now empty, and unset it\n+\t\t * in the IQ mask if its empty.\n+\t\t */\n+\t\tint all_done = (pkts_done == count);\n+\n+\t\tqid->iq_pkt_mask &= ~(all_done << (iq_num));\n+\t\tpkts += pkts_done;\n+\t}\n+\n+\treturn pkts;\n+}\n+\n+/* This function will perform re-ordering of packets, and injecting into\n+ * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT*\n+ * contiguous in that array, this function accepts a \"range\" of QIDs to scan.\n+ */\n+static uint16_t\n+sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)\n+{\n+\t/* Perform egress reordering */\n+\tstruct rte_event *qe;\n+\tuint32_t pkts_iter = 0;\n+\n+\tfor (; qid_start < qid_end; qid_start++) {\n+\t\tstruct sw_qid *qid = &sw->qids[qid_start];\n+\t\tint i, num_entries_in_use;\n+\n+\t\tif (qid->type != RTE_SCHED_TYPE_ORDERED)\n+\t\t\tcontinue;\n+\n+\t\tnum_entries_in_use = rte_ring_free_count(\n+\t\t\t\t\tqid->reorder_buffer_freelist);\n+\n+\t\tfor (i = 0; i < num_entries_in_use; i++) {\n+\t\t\tstruct reorder_buffer_entry *entry;\n+\t\t\tint j;\n+\n+\t\t\tentry = &qid->reorder_buffer[qid->reorder_buffer_index];\n+\n+\t\t\tif (!entry->ready)\n+\t\t\t\tbreak;\n+\n+\t\t\tfor (j = 0; j < entry->num_fragments; j++) {\n+\t\t\t\tuint16_t dest_qid;\n+\t\t\t\tuint16_t dest_iq;\n+\n+\t\t\t\tqe = &entry->fragments[entry->fragment_index + j];\n+\n+\t\t\t\tdest_qid = qe->flow_id;\n+\t\t\t\tdest_iq  = PRIO_TO_IQ(qe->priority);\n+\n+\t\t\t\tif(dest_qid >= sw->qid_count) {\n+\t\t\t\t\tsw->stats.rx_dropped++;\n+\t\t\t\t\tcontinue;\n+\t\t\t\t}\n+\n+\t\t\t\tstruct sw_qid *dest_qid_ptr = &sw->qids[dest_qid];\n+\t\t\t\tconst struct iq_ring *dest_iq_ptr = dest_qid_ptr->iq[dest_iq];\n+\t\t\t\tif (iq_ring_free_count(dest_iq_ptr) == 0)\n+\t\t\t\t\tbreak;\n+\n+\t\t\t\tpkts_iter++;\n+\n+\t\t\t\tstruct sw_qid *q = &sw->qids[dest_qid];\n+\t\t\t\tstruct iq_ring *r = q->iq[dest_iq];\n+\n+\t\t\t\t/* we checked for space above, so enqueue must\n+\t\t\t\t * succeed\n+\t\t\t\t */\n+\t\t\t\tiq_ring_enqueue(r, qe);\n+\t\t\t\tq->iq_pkt_mask |= (1 << (dest_iq));\n+\t\t\t\tq->iq_pkt_count[dest_iq]++;\n+\t\t\t\tq->stats.rx_pkts++;\n+\t\t\t}\n+\n+\t\t\tentry->ready = (j != entry->num_fragments);\n+\t\t\tentry->num_fragments -= j;\n+\t\t\tentry->fragment_index += j;\n+\n+\t\t\tif (!entry->ready) {\n+\t\t\t\tentry->fragment_index = 0;\n+\n+\t\t\t\trte_ring_sp_enqueue(qid->reorder_buffer_freelist,\n+\t\t\t\t\t\t    entry);\n+\n+\t\t\t\tqid->reorder_buffer_index++;\n+\t\t\t\tqid->reorder_buffer_index %= qid->window_size;\n+\t\t\t}\n+\t\t}\n+\t}\n+\treturn pkts_iter;\n+}\n+\n+static uint32_t\n+sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id)\n+{\n+\tuint32_t pkts_iter = 0;\n+\tstruct sw_port *port = &sw->ports[port_id];\n+\tstruct qe_ring *worker = port->rx_worker_ring;\n+\n+\t/* If shadow ring has 0 pkts, pull from worker ring */\n+\tif(port->pp_buf_count == 0) {\n+\t\tport->pp_buf_start = 0;\n+\t\tport->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf,\n+\t\t\t\tRTE_DIM(port->pp_buf));\n+\n+\t\tif (port->overloaded &&\n+\t\t\t\tqe_ring_count(worker) < SW_PORT_OVERLOAD_THRES/2) {\n+\t\t\tport->overloaded = 0;\n+\t\t\tsw->sched_overload_counter++;\n+\t\t\trte_atomic32_dec((void *)&sw->overloaded);\n+\t\t}\n+\t}\n+\n+\twhile (port->pp_buf_count) {\n+\t\tconst struct rte_event *qe = &port->pp_buf[port->pp_buf_start];\n+\t\tstruct sw_hist_list_entry *hist_entry = NULL;\n+\t\tuint8_t flags = qe->operation;\n+\t\tconst uint16_t eop = !(flags & QE_FLAG_NOT_EOP);\n+\t\tint needs_reorder = 0;\n+\n+\t\tstatic const struct reorder_buffer_entry dummy_rob;\n+\n+\t\t/*\n+\t\t * if we don't have space for this packet in an IQ,\n+\t\t * then move on to next queue. Technically, for a\n+\t\t * packet that needs reordering, we don't need to check\n+\t\t * here, but it simplifies things not to special-case\n+\t\t */\n+\t\tuint32_t iq_num = PRIO_TO_IQ(qe->priority);\n+\t\tstruct sw_qid *qid = &sw->qids[qe->queue_id];\n+\t\tstruct iq_ring *iq_ring = qid->iq[iq_num];\n+\n+\t\tif ((flags & QE_FLAG_VALID) &&\n+\t\t\t\tiq_ring_free_count(iq_ring) == 0)\n+\t\t\tbreak;\n+\n+\t\t/* now process based on flags. Note that for directed\n+\t\t * queues, the enqueue_flush masks off all but the\n+\t\t * valid flag. This makes FWD and partial enqueues just\n+\t\t * NEW type, and makes DROPS no-op calls.\n+\t\t */\n+\t\tif ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {\n+\t\t\tconst uint32_t hist_tail = port->hist_tail &\n+\t\t\t\t\t(SW_PORT_HIST_LIST - 1);\n+\n+\t\t\thist_entry = &port->hist_list[hist_tail];\n+\t\t\tconst uint32_t hist_qid = hist_entry->qid;\n+\t\t\tconst uint32_t hist_fid = hist_entry->fid;\n+\n+\t\t\tstruct sw_fid_t *fid = &sw->qids[hist_qid].fids[hist_fid];\n+\t\t\tfid->count -= eop;\n+\t\t\tif (fid->count == 0)\n+\t\t\t\tfid->cq = -1;\n+\n+\t\t\t/* set reorder ready if an ordered QID */\n+\t\t\tuintptr_t rob_ptr = (uintptr_t)hist_entry->rob_entry;\n+\t\t\tconst uintptr_t valid = (rob_ptr != 0);\n+\t\t\tneeds_reorder = valid;\n+\t\t\trob_ptr |= ((valid - 1) & (uintptr_t)&dummy_rob);\n+\t\t\t((struct reorder_buffer_entry*)rob_ptr)->ready =\n+\t\t\t\t\teop * needs_reorder;\n+\n+\t\t\tport->inflights -= eop;\n+\t\t\tport->hist_tail += eop;\n+\t\t}\n+\t\tif (flags & QE_FLAG_VALID) {\n+\t\t\tport->stats.rx_pkts++;\n+\n+\t\t\tif (needs_reorder) {\n+\t\t\t\tstruct reorder_buffer_entry *rob_entry =\n+\t\t\t\t\t\thist_entry->rob_entry;\n+\n+\t\t\t\t//TODO: How do we alert the user that they've exceeded max frags?\n+\t\t\t\tif (rob_entry->num_fragments == SW_FRAGMENTS_MAX)\n+\t\t\t\t\tsw->stats.rx_dropped++;\n+\t\t\t\telse\n+\t\t\t\t\trob_entry->fragments[rob_entry->num_fragments++] = *qe;\n+\t\t\t\tgoto end_qe;\n+\t\t\t}\n+\n+\t\t\t/* Use the iq_num from above to push the QE\n+\t\t\t * into the qid at the right priority\n+\t\t\t */\n+\n+\t\t\tqid->iq_pkt_mask |= (1 << (iq_num));\n+\t\t\tiq_ring_enqueue(iq_ring, qe);\n+\t\t\tqid->iq_pkt_count[iq_num]++;\n+\t\t\tqid->stats.rx_pkts++;\n+\t\t\tpkts_iter++;\n+\t\t}\n+\n+\t\tend_qe:\n+\t\tport->pp_buf_start++;\n+\t\tport->pp_buf_count--;\n+\t} /* while (avail_qes) */\n+\n+\treturn pkts_iter;\n+}\n+\n+static uint32_t\n+sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)\n+{\n+\tuint32_t pkts_iter = 0;\n+\tstruct sw_port *port = &sw->ports[port_id];\n+\tstruct qe_ring *worker = port->rx_worker_ring;\n+\n+\t/* If shadow ring has 0 pkts, pull from worker ring */\n+\tif (port->pp_buf_count == 0) {\n+\t\tport->pp_buf_start = 0;\n+\t\tport->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf,\n+\t\t\t\tRTE_DIM(port->pp_buf));\n+\n+\t\tif (port->overloaded &&\n+\t\t\t\tqe_ring_count(worker) < SW_PORT_OVERLOAD_THRES/2) {\n+\t\t\tport->overloaded = 0;\n+\t\t\tsw->sched_overload_counter++;\n+\t\t\trte_atomic32_dec((void *)&sw->overloaded);\n+\t\t}\n+\t}\n+\n+\twhile (port->pp_buf_count) {\n+\t\tconst struct rte_event *qe = &port->pp_buf[port->pp_buf_start];\n+\t\tuint8_t flags = qe->operation;\n+\n+\t\tif ((flags & QE_FLAG_VALID) == 0)\n+\t\t\tgoto end_qe;\n+\n+\t\tuint32_t iq_num = PRIO_TO_IQ(qe->priority);\n+\t\tstruct sw_qid *qid = &sw->qids[qe->queue_id];\n+\t\tstruct iq_ring *iq_ring = qid->iq[iq_num];\n+\n+\t\tif (iq_ring_free_count(iq_ring) == 0)\n+\t\t\tbreak; /* move to next port */\n+\n+\t\tport->stats.rx_pkts++;\n+\n+\t\t/* Use the iq_num from above to push the QE\n+\t\t * into the qid at the right priority\n+\t\t */\n+\t\tqid->iq_pkt_mask |= (1 << (iq_num));\n+\t\tiq_ring_enqueue(iq_ring, qe);\n+\t\tqid->iq_pkt_count[iq_num]++;\n+\t\tqid->stats.rx_pkts++;\n+\t\tpkts_iter++;\n+\n+\t\tend_qe:\n+\t\tport->pp_buf_start++;\n+\t\tport->pp_buf_count--;\n+\t} /* while port->pp_buf_count */\n+\n+\treturn pkts_iter;\n+}\n+\n+static uint32_t\n+sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id)\n+{\n+\tuint32_t pkts_iter = 0;\n+\tstruct sw_port *port = &sw->ports[port_id];\n+\tstruct qe_ring *worker = port->rx_worker_ring;\n+\n+\tif (port->pp_buf_count == 0) {\n+\t\tport->pp_buf_start = 0;\n+\t\tport->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf,\n+\t\t\t\tRTE_DIM(port->pp_buf));\n+\n+\t\tif (port->overloaded &&\n+\t\t\t\tqe_ring_count(worker) < SW_PORT_OVERLOAD_THRES/2) {\n+\t\t\tport->overloaded = 0;\n+\t\t\tsw->sched_overload_counter++;\n+\t\t\trte_atomic32_dec((void *)&sw->overloaded);\n+\t\t}\n+\t}\n+\n+\twhile (port->pp_buf_count) {\n+\t\tconst struct rte_event *ev = &port->pp_buf[port->pp_buf_start];\n+\t\tstruct sw_hist_list_entry *hist_entry = NULL;\n+\t\tuint8_t flags = ev->operation;\n+\n+\t\t/* for fragments, ignore completion\n+\t\t * NOTE: if not_eop flag is set, completion flag must\n+\t\t * also be set so we can use xor */\n+\t\tflags ^= !(flags & QE_FLAG_NOT_EOP) >>\n+\t\t\t\t(QE_FLAG_NOT_EOP_SHIFT - QE_FLAG_COMPLETE_SHIFT);\n+\n+\t\t/*\n+\t\t * if we don't have space for this packet in an IQ,\n+\t\t * then move on to next queue.\n+\t\t */\n+\t\tuint32_t iq_num = PRIO_TO_IQ(ev->priority);\n+\t\tstruct sw_qid *qid = &sw->qids[ev->queue_id];\n+\t\tstruct iq_ring *iq_ring = qid->iq[iq_num];\n+\n+\t\tif ((flags & QE_FLAG_VALID) &&\n+\t\t\t\tiq_ring_free_count(iq_ring) == 0)\n+\t\t\tbreak;\n+\n+\t\t/* now process based on flags. Note that for directed\n+\t\t * queues, the enqueue_flush masks off all but the\n+\t\t * valid flag. This makes FWD and partial enqueues just\n+\t\t * NEW type, and makes DROPS no-op calls.\n+\t\t */\n+\t\tif ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {\n+\t\t\tconst uint32_t hist_tail = port->hist_tail &\n+\t\t\t\t\t(SW_PORT_HIST_LIST - 1);\n+\n+\t\t\thist_entry = &port->hist_list[hist_tail];\n+\t\t\tconst uint32_t hist_qid = hist_entry->qid;\n+\t\t\tconst uint32_t hist_fid = hist_entry->fid;\n+\n+\t\t\tstruct sw_fid_t *fid = &sw->qids[hist_qid].fids[hist_fid];\n+\t\t\tfid->count--;\n+\t\t\tif (fid->count == 0)\n+\t\t\t\tfid->cq = -1;\n+\n+\t\t\tport->inflights --;\n+\t\t\tport->hist_tail ++;\n+\t\t}\n+\t\tif (flags & QE_FLAG_VALID) {\n+\t\t\tport->stats.rx_pkts++;\n+\n+\t\t\t/* Use the iq_num from above to push the QE\n+\t\t\t * into the qid at the right priority\n+\t\t\t */\n+\n+\t\t\tqid->iq_pkt_mask |= (1 << (iq_num));\n+\t\t\tiq_ring_enqueue(iq_ring, ev);\n+\t\t\tqid->iq_pkt_count[iq_num]++;\n+\t\t\tqid->stats.rx_pkts++;\n+\t\t\tpkts_iter++;\n+\t\t}\n+\n+\t\tport->pp_buf_start++;\n+\t\tport->pp_buf_count--;\n+\t} /* while (avail_qes) */\n+\n+\treturn pkts_iter;\n+}\n+\n+int\n+sw_event_schedule(struct rte_event_dev *dev)\n+{\n+\tstatic const uint32_t num_pkts = 256;\n+\tstruct sw_evdev *sw = (struct sw_evdev *)dev;\n+\tuint32_t in_pkts, out_pkts;\n+\tuint32_t out_pkts_total = 0, in_pkts_total = 0;\n+\tuint32_t i;\n+\n+\tsw->sched_called++;\n+\tif (!sw->started)\n+\t\treturn -1;\n+\n+\tdo {\n+\t\tuint32_t in_pkts_this_iteration = 0;\n+\n+\t\t/* Pull from rx_ring for ports */\n+\t\tdo {\n+\t\t\tin_pkts = 0;\n+\t\t\tfor (i = 0; i < sw->port_count; i++)\n+\t\t\t\t/* TODO: use a function pointer in the port itself */\n+\t\t\t\tif (sw->ports[i].is_directed)\n+\t\t\t\t\tin_pkts += sw_schedule_pull_port_dir(sw, i);\n+\t\t\t\telse if (sw->ports[i].num_ordered_qids > 0)\n+\t\t\t\t\tin_pkts += sw_schedule_pull_port_lb(sw, i);\n+\t\t\t\telse\n+\t\t\t\t\tin_pkts += sw_schedule_pull_port_no_reorder(sw, i);\n+\n+\t\t\t/* QID scan for re-ordered */\n+\t\t\tin_pkts += sw_schedule_reorder(sw, 0,\n+\t\t\t\t\tsw->qid_count);\n+\t\t\tin_pkts_this_iteration += in_pkts;\n+\t\t} while (in_pkts > 0 && in_pkts_this_iteration < num_pkts);\n+\n+\t\tout_pkts = 0;\n+\t\tout_pkts += sw_schedule_qid_to_cq(sw);\n+\t\tout_pkts_total += out_pkts;\n+\t\tin_pkts_total += in_pkts_this_iteration;\n+\n+\t\tif (in_pkts == 0 && out_pkts == 0)\n+\t\t\tbreak;\n+\t} while (out_pkts_total < num_pkts);\n+\n+\t/* push all the internal buffered QEs in port->cq_ring to the\n+\t * worker cores: aka, do the ring transfers batched.\n+\t */\n+\tfor(i = 0; i < sw->port_count; i++) {\n+\t\tstruct qe_ring *worker = sw->ports[i].cq_worker_ring;\n+\t\tqe_ring_enqueue_burst(worker, sw->ports[i].cq_buf,\n+\t\t\t\tsw->ports[i].cq_buf_count,\n+\t\t\t\t&sw->cq_ring_space[i]);\n+\t\tsw->ports[i].cq_buf_count = 0;\n+\t}\n+\n+\tsw->stats.tx_pkts += out_pkts_total;\n+\tsw->stats.rx_pkts += in_pkts_total;\n+\n+\tsw->sched_no_iq_enqueues += (in_pkts_total == 0);\n+\tsw->sched_no_cq_enqueues += (out_pkts_total == 0);\n+\n+\treturn out_pkts_total;\n+}\ndiff --git a/drivers/event/sw/sw_evdev_worker.c b/drivers/event/sw/sw_evdev_worker.c\nnew file mode 100644\nindex 0000000..1b055cc\n--- /dev/null\n+++ b/drivers/event/sw/sw_evdev_worker.c\n@@ -0,0 +1,218 @@\n+/*-\n+ *   BSD LICENSE\n+ *\n+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.\n+ *\n+ *   Redistribution and use in source and binary forms, with or without\n+ *   modification, are permitted provided that the following conditions\n+ *   are met:\n+ *\n+ *     * Redistributions of source code must retain the above copyright\n+ *       notice, this list of conditions and the following disclaimer.\n+ *     * Redistributions in binary form must reproduce the above copyright\n+ *       notice, this list of conditions and the following disclaimer in\n+ *       the documentation and/or other materials provided with the\n+ *       distribution.\n+ *     * Neither the name of Intel Corporation nor the names of its\n+ *       contributors may be used to endorse or promote products derived\n+ *       from this software without specific prior written permission.\n+ *\n+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n+ *   \"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n+ */\n+\n+#include \"sw_evdev.h\"\n+\n+#include <rte_atomic.h>\n+#include <rte_hash_crc.h>\n+\n+#define FLOWID_MASK (SW_QID_NUM_FIDS-1)\n+\n+static inline void\n+sw_overload_check_and_set(struct sw_evdev *sw, struct sw_port *p,\n+\t\t\t  uint16_t free_count)\n+{\n+\tif (!p->overloaded &&\n+\t\t\tfree_count < MAX_SW_PROD_Q_DEPTH - p->overload_threshold) {\n+\t\tp->overloaded = 1;\n+\t\trte_atomic32_inc((void *)&sw->overloaded);\n+\t}\n+}\n+\n+int\n+sw_event_enqueue(struct rte_event_dev *dev, uint8_t port_id, struct rte_event *ev,\n+\t\t  bool pin_event)\n+{\n+\tRTE_SET_USED(pin_event);\n+\tuint16_t free_count;\n+\tstruct sw_evdev *sw = (void *)dev;\n+\n+\tif(port_id >= sw->port_count)\n+\t\treturn -1;\n+\n+\tstruct sw_port *p = &sw->ports[port_id];\n+\t/* TODO: Concider optimization: keep port overloaded in flat array in\n+\t * sw instance, do a lookup and just one return branch together with\n+\t * port_id check above */\n+\tif(sw->overloaded && ev->operation == RTE_EVENT_OP_NEW)\n+\t\treturn -ENOSPC;\n+\n+\tev->operation = sw_qe_flag_map[ev->operation];\n+\tconst uint8_t invalid_qid = (ev[0].queue_id >= sw->qid_count);\n+\tev[0].operation &= ~(invalid_qid << QE_FLAG_VALID_SHIFT);\n+\t/* mask flowID to valid range after a crc to jumble bits */\n+\tev[0].flow_id = FLOWID_MASK & rte_hash_crc_4byte(ev[0].flow_id, -1);\n+\n+\tif(invalid_qid) {\n+\t\tp->stats.rx_dropped++;\n+\t}\n+\n+\tunsigned int num_enq = qe_ring_enqueue_burst(p->rx_worker_ring,\n+\t\t\t\t\t\t     ev, 1, &free_count);\n+\n+\tsw_overload_check_and_set(sw, p, free_count);\n+\n+\t/* TODO: Discuss on ML and fix this inconsistency in API:\n+\t * num_enq is the number of packet enqueued, so\n+\t * 0 = no packets\n+\t * 1 = got a packet\n+\t * This is different to how currently documented in API.\n+\t */\n+\treturn num_enq;\n+}\n+\n+int\n+sw_event_enqueue_burst(struct rte_event_dev *dev, uint8_t port_id,\n+\t\t\tstruct rte_event ev[], int num, bool pin_event)\n+{\n+\t/* TODO: change enqueue API to uint32_t for num? */\n+\tint32_t i;\n+\tuint16_t free_count;\n+\tstruct sw_evdev *sw = (void *)dev;\n+\n+\tif(port_id >= sw->port_count)\n+\t\treturn 0;\n+\n+\tstruct sw_port *p = &sw->ports[port_id];\n+\tRTE_SET_USED(pin_event);\n+\n+\tfor (i = 0; i < num; i++) {\n+\t\t/* optimize to two loops, with and without overload */\n+\t\tif(sw->overloaded && ev[i].operation == RTE_EVENT_OP_NEW)\n+\t\t\treturn -ENOSPC;\n+\n+\t\tev[i].operation = sw_qe_flag_map[ev[i].operation];\n+\t\tconst uint8_t invalid_qid = (ev[i].queue_id >= sw->qid_count);\n+\t\tev[i].operation &= ~(invalid_qid << QE_FLAG_VALID_SHIFT);\n+\t\tev[i].flow_id = FLOWID_MASK & rte_hash_crc_4byte(ev[i].flow_id, -1);\n+\n+\t\tif(invalid_qid) {\n+\t\t\tp->stats.rx_dropped++;\n+\t\t}\n+\t}\n+\n+\t/* returns number of events actually enqueued */\n+\tuint32_t deq = qe_ring_enqueue_burst(p->rx_worker_ring, ev, num,\n+\t\t\t\t\t     &free_count);\n+\tsw_overload_check_and_set(sw, p, free_count);\n+\treturn deq;\n+}\n+\n+bool\n+sw_event_dequeue(struct rte_event_dev *dev, uint8_t port_id,\n+\t\t  struct rte_event *ev, uint64_t wait)\n+{\n+\tRTE_SET_USED(wait);\n+\tstruct sw_evdev *sw = (void *)dev;\n+\n+\tif(port_id >= sw->port_count)\n+\t\treturn 0;\n+\n+\tstruct sw_port *p = &sw->ports[port_id];\n+\tstruct qe_ring *ring = p->cq_worker_ring;\n+\n+\t/* check that all previous dequeus have been released */\n+\tuint16_t out_rels = p->outstanding_releases;\n+\tuint16_t i;\n+\tfor(i = 0; i < out_rels; i++) {\n+\t\tsw_event_release(dev, port_id, i);\n+\t}\n+\n+\t/* Intel modification: may not be in final API */\n+\tif(ev == 0)\n+\t\treturn 0;\n+\n+\t/* returns number of events actually dequeued, after storing */\n+\tuint32_t ndeq = qe_ring_dequeue_burst(ring, ev, 1);\n+\tp->outstanding_releases = ndeq;\n+\treturn ndeq;\n+}\n+\n+int\n+sw_event_dequeue_burst(struct rte_event_dev *dev, uint8_t port_id,\n+\t\t\tstruct rte_event *ev, int num, uint64_t wait)\n+{\n+\tRTE_SET_USED(wait);\n+\tstruct sw_evdev *sw = (void *)dev;\n+\n+\tif(port_id >= sw->port_count)\n+\t\treturn 0;\n+\n+\tstruct sw_port *p = &sw->ports[port_id];\n+\tstruct qe_ring *ring = p->cq_worker_ring;\n+\n+\t/* check that all previous dequeus have been released */\n+\tif (!p->is_directed) {\n+\t\tuint16_t out_rels = p->outstanding_releases;\n+\t\tuint16_t i;\n+\t\tfor(i = 0; i < out_rels; i++) {\n+\t\t\tsw_event_release(dev, port_id, i);\n+\t\t}\n+\t}\n+\n+\t/* Intel modification: may not be in final API */\n+\tif(ev == 0)\n+\t\treturn 0;\n+\n+\t/* returns number of events actually dequeued */\n+\tuint32_t ndeq = qe_ring_dequeue_burst(ring, ev, num);\n+\tp->outstanding_releases = ndeq;\n+\treturn ndeq;\n+}\n+\n+void\n+sw_event_release(struct rte_event_dev *dev, uint8_t port_id, uint8_t index)\n+{\n+\tstruct sw_evdev *sw = (void *)dev;\n+\tstruct sw_port *p = &sw->ports[port_id];\n+\tRTE_SET_USED(p);\n+\tRTE_SET_USED(index);\n+\n+\t/* This function \"hints\" the scheduler that packet *index* of the\n+\t * previous burst:\n+\t * (Atomic)  has completed is critical section\n+\t * (Ordered) is ready for egress\n+\t *\n+\t * It is not mandatory to implement this functionality, but it may\n+\t * improve load-balancing / parallelism in the packet flows.\n+\t */\n+\n+\t/* create drop message */\n+\tstruct rte_event ev = {\n+\t\t.operation = sw_qe_flag_map[RTE_EVENT_OP_DROP],\n+\t};\n+\n+\tuint16_t free_count;\n+\tqe_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count);\n+\n+\tp->outstanding_releases--;\n+}\n",
    "prefixes": [
        "dpdk-dev",
        "3/7"
    ]
}