get:
Show a patch.

patch:
Update a patch.

put:
Update a patch.

GET /api/patches/43921/?format=api
HTTP 200 OK
Allow: GET, PUT, PATCH, HEAD, OPTIONS
Content-Type: application/json
Vary: Accept

{
    "id": 43921,
    "url": "http://patches.dpdk.org/api/patches/43921/?format=api",
    "web_url": "http://patches.dpdk.org/project/dpdk/patch/20180828083607.25752-2-mattias.ronnblom@ericsson.com/",
    "project": {
        "id": 1,
        "url": "http://patches.dpdk.org/api/projects/1/?format=api",
        "name": "DPDK",
        "link_name": "dpdk",
        "list_id": "dev.dpdk.org",
        "list_email": "dev@dpdk.org",
        "web_url": "http://core.dpdk.org",
        "scm_url": "git://dpdk.org/dpdk",
        "webscm_url": "http://git.dpdk.org/dpdk",
        "list_archive_url": "https://inbox.dpdk.org/dev",
        "list_archive_url_format": "https://inbox.dpdk.org/dev/{}",
        "commit_url_format": ""
    },
    "msgid": "<20180828083607.25752-2-mattias.ronnblom@ericsson.com>",
    "list_archive_url": "https://inbox.dpdk.org/dev/20180828083607.25752-2-mattias.ronnblom@ericsson.com",
    "date": "2018-08-28T08:36:07",
    "name": "[RFC,v2,1/1] eventdev: add distributed software (DSW) event device",
    "commit_ref": null,
    "pull_url": null,
    "state": "superseded",
    "archived": true,
    "hash": "2aa569c0bd36dad0a2b11e14602645aaaee3ceff",
    "submitter": {
        "id": 1077,
        "url": "http://patches.dpdk.org/api/people/1077/?format=api",
        "name": "Mattias Rönnblom",
        "email": "mattias.ronnblom@ericsson.com"
    },
    "delegate": {
        "id": 310,
        "url": "http://patches.dpdk.org/api/users/310/?format=api",
        "username": "jerin",
        "first_name": "Jerin",
        "last_name": "Jacob",
        "email": "jerinj@marvell.com"
    },
    "mbox": "http://patches.dpdk.org/project/dpdk/patch/20180828083607.25752-2-mattias.ronnblom@ericsson.com/mbox/",
    "series": [
        {
            "id": 1076,
            "url": "http://patches.dpdk.org/api/series/1076/?format=api",
            "web_url": "http://patches.dpdk.org/project/dpdk/list/?series=1076",
            "date": "2018-08-28T08:36:06",
            "name": "A Distributed Software Event Device",
            "version": 2,
            "mbox": "http://patches.dpdk.org/series/1076/mbox/"
        }
    ],
    "comments": "http://patches.dpdk.org/api/patches/43921/comments/",
    "check": "success",
    "checks": "http://patches.dpdk.org/api/patches/43921/checks/",
    "tags": {},
    "related": [],
    "headers": {
        "Return-Path": "<dev-bounces@dpdk.org>",
        "X-Original-To": "patchwork@dpdk.org",
        "Delivered-To": "patchwork@dpdk.org",
        "Received": [
            "from [92.243.14.124] (localhost [127.0.0.1])\n\tby dpdk.org (Postfix) with ESMTP id 5607E4C99;\n\tTue, 28 Aug 2018 10:36:40 +0200 (CEST)",
            "from mail.lysator.liu.se (mail.lysator.liu.se [130.236.254.3])\n\tby dpdk.org (Postfix) with ESMTP id 89022324B\n\tfor <dev@dpdk.org>; Tue, 28 Aug 2018 10:36:38 +0200 (CEST)",
            "from mail.lysator.liu.se (localhost [127.0.0.1])\n\tby mail.lysator.liu.se (Postfix) with ESMTP id 402E940053\n\tfor <dev@dpdk.org>; Tue, 28 Aug 2018 10:36:38 +0200 (CEST)",
            "by mail.lysator.liu.se (Postfix, from userid 1004)\n\tid 2AC5D4003D; Tue, 28 Aug 2018 10:36:38 +0200 (CEST)",
            "from isengard.friendlyfire.se\n\t(host-90-232-97-171.mobileonline.telia.com [90.232.97.171])\n\t(using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256\n\tbits)) (No client certificate requested)\n\tby mail.lysator.liu.se (Postfix) with ESMTPSA id 894CF40034;\n\tTue, 28 Aug 2018 10:36:33 +0200 (CEST)"
        ],
        "X-Spam-Checker-Version": "SpamAssassin 3.4.1 (2015-04-28) on\n\tbernadotte.lysator.liu.se",
        "X-Spam-Level": "",
        "X-Spam-Status": "No, score=-0.6 required=5.0 tests=ALL_TRUSTED,AWL\n\tautolearn=disabled version=3.4.1",
        "X-Spam-Score": "-0.6",
        "From": "=?utf-8?q?Mattias_R=C3=B6nnblom?= <mattias.ronnblom@ericsson.com>",
        "To": "jerin.jacob@caviumnetworks.com",
        "Cc": "dev@dpdk.org, =?utf-8?q?Mattias_R=C3=B6nnblom?=\n\t<mattias.ronnblom@ericsson.com>",
        "Date": "Tue, 28 Aug 2018 10:36:07 +0200",
        "Message-Id": "<20180828083607.25752-2-mattias.ronnblom@ericsson.com>",
        "X-Mailer": "git-send-email 2.17.1",
        "In-Reply-To": "<20180828083607.25752-1-mattias.ronnblom@ericsson.com>",
        "References": "<20180828083607.25752-1-mattias.ronnblom@ericsson.com>",
        "MIME-Version": "1.0",
        "Content-Type": "text/plain; charset=UTF-8",
        "Content-Transfer-Encoding": "8bit",
        "X-Virus-Scanned": "ClamAV using ClamSMTP",
        "Subject": "[dpdk-dev] [RFC v2 1/1] eventdev: add distributed software (DSW)\n\tevent device",
        "X-BeenThere": "dev@dpdk.org",
        "X-Mailman-Version": "2.1.15",
        "Precedence": "list",
        "List-Id": "DPDK patches and discussions <dev.dpdk.org>",
        "List-Unsubscribe": "<https://mails.dpdk.org/options/dev>,\n\t<mailto:dev-request@dpdk.org?subject=unsubscribe>",
        "List-Archive": "<http://mails.dpdk.org/archives/dev/>",
        "List-Post": "<mailto:dev@dpdk.org>",
        "List-Help": "<mailto:dev-request@dpdk.org?subject=help>",
        "List-Subscribe": "<https://mails.dpdk.org/listinfo/dev>,\n\t<mailto:dev-request@dpdk.org?subject=subscribe>",
        "Errors-To": "dev-bounces@dpdk.org",
        "Sender": "\"dev\" <dev-bounces@dpdk.org>"
    },
    "content": "The distributed software eventdev is a parallel implementation of the\neventdev API, which distributes the task of scheduling events among\nall the eventdev ports and the lcore threads using them.\n\nSigned-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>\n---\n config/common_base                            |    5 +\n doc/guides/eventdevs/dsw.rst                  |   97 ++\n doc/guides/eventdevs/index.rst                |    1 +\n drivers/event/Makefile                        |    1 +\n drivers/event/dsw/Makefile                    |   28 +\n drivers/event/dsw/dsw_evdev.c                 |  432 ++++++\n drivers/event/dsw/dsw_evdev.h                 |  288 ++++\n drivers/event/dsw/dsw_event.c                 | 1261 +++++++++++++++++\n drivers/event/dsw/dsw_sort.h                  |   48 +\n drivers/event/dsw/dsw_xstats.c                |  285 ++++\n drivers/event/dsw/meson.build                 |    8 +\n .../event/dsw/rte_pmd_dsw_event_version.map   |    3 +\n drivers/event/meson.build                     |    2 +-\n mk/rte.app.mk                                 |    1 +\n 14 files changed, 2459 insertions(+), 1 deletion(-)\n create mode 100644 doc/guides/eventdevs/dsw.rst\n create mode 100644 drivers/event/dsw/Makefile\n create mode 100644 drivers/event/dsw/dsw_evdev.c\n create mode 100644 drivers/event/dsw/dsw_evdev.h\n create mode 100644 drivers/event/dsw/dsw_event.c\n create mode 100644 drivers/event/dsw/dsw_sort.h\n create mode 100644 drivers/event/dsw/dsw_xstats.c\n create mode 100644 drivers/event/dsw/meson.build\n create mode 100644 drivers/event/dsw/rte_pmd_dsw_event_version.map",
    "diff": "diff --git a/config/common_base b/config/common_base\nindex 4bcbaf923..c43f5139d 100644\n--- a/config/common_base\n+++ b/config/common_base\n@@ -614,6 +614,11 @@ CONFIG_RTE_LIBRTE_PMD_SKELETON_EVENTDEV_DEBUG=n\n #\n CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV=y\n \n+#\n+# Compile PMD for distributed software event device\n+#\n+CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV=y\n+\n #\n # Compile PMD for octeontx sso event device\n #\ndiff --git a/doc/guides/eventdevs/dsw.rst b/doc/guides/eventdevs/dsw.rst\nnew file mode 100644\nindex 000000000..de41ae9d3\n--- /dev/null\n+++ b/doc/guides/eventdevs/dsw.rst\n@@ -0,0 +1,97 @@\n+..  SPDX-License-Identifier: BSD-3-Clause\n+    Copyright(c) 2017 Intel Corporation.\n+    Copyright(c) 2018 Ericsson AB\n+\n+Distributed Software Eventdev Poll Mode Driver\n+==============================================\n+\n+The distributed software eventdev is a parallel implementation of the\n+eventdev API, which distributes the task of scheduling events among\n+all the eventdev ports and the lcore threads using them.\n+\n+Features\n+--------\n+\n+Queues\n+ * Atomic\n+ * Parallel\n+ * Single-Link\n+\n+Ports\n+ * Load balanced (for Atomic, Ordered, Parallel queues)\n+ * Single Link (for single-link queues)\n+\n+Configuration and Options\n+-------------------------\n+\n+The distributed software eventdev is a vdev device, and as such can be\n+created from the application code, or from the EAL command line:\n+\n+* Call ``rte_vdev_init(\"event_dsw0\")`` from the application\n+\n+* Use ``--vdev=\"event_dsw0\"`` in the EAL options, which will call\n+  rte_vdev_init() internally\n+\n+Example:\n+\n+.. code-block:: console\n+\n+    ./your_eventdev_application --vdev=\"event_dsw0\"\n+\n+Limitations\n+-----------\n+\n+Unattended Ports\n+~~~~~~~~~~~~~~~~\n+\n+The distributed software eventdev uses an internal signaling schema\n+between the ports to achieve load balancing. In order for this to\n+work, the application must perform enqueue and/or dequeue operations\n+on all ports.\n+\n+Producer-only ports which currently have no events to enqueue should\n+periodically call rte_event_enqueue_burst() with a zero-sized burst.\n+\n+Ports left unattended for longer periods of time will prevent load\n+balancing, and also cause traffic interruptions on the flows which\n+are in the process of being migrated.\n+\n+Output Buffering\n+~~~~~~~~~~~~~~~~\n+\n+For efficiency reasons, the distributed software eventdev might not\n+send enqueued events immediately to the destination port, but instead\n+store them in an internal buffer in the source port.\n+\n+In case no more events are enqueued on a port with buffered events,\n+these events will be sent after the application has performed a number\n+of enqueue and/or dequeue operations.\n+\n+For explicit flushing, an application may call\n+rte_event_enqueue_burst() with a zero-sized burst.\n+\n+\n+Priorities\n+~~~~~~~~~~\n+\n+The distributed software eventdev does not support event priorities.\n+\n+Ordered Queues\n+~~~~~~~~~~~~~~\n+\n+The distributed software eventdev does not support the ordered queue type.\n+\n+\n+\"All Types\" Queues\n+~~~~~~~~~~~~~~~~~~\n+\n+The distributed software eventdev does not support queues of type\n+RTE_EVENT_QUEUE_CFG_ALL_TYPES, which allow both atomic, ordered, and\n+parallel events on the same queue.\n+\n+Dynamic Link/Unlink\n+~~~~~~~~~~~~~~~~~~~\n+\n+The distributed software eventdev does not support calls to\n+rte_event_port_link() or rte_event_port_unlink() after\n+rte_event_dev_start() has been called.\ndiff --git a/doc/guides/eventdevs/index.rst b/doc/guides/eventdevs/index.rst\nindex 18ec8e462..984eea5f4 100644\n--- a/doc/guides/eventdevs/index.rst\n+++ b/doc/guides/eventdevs/index.rst\n@@ -14,5 +14,6 @@ application trough the eventdev API.\n     dpaa\n     dpaa2\n     sw\n+    dsw\n     octeontx\n     opdl\ndiff --git a/drivers/event/Makefile b/drivers/event/Makefile\nindex f301d8dc2..03ad1b6cb 100644\n--- a/drivers/event/Makefile\n+++ b/drivers/event/Makefile\n@@ -6,6 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk\n \n DIRS-$(CONFIG_RTE_LIBRTE_PMD_SKELETON_EVENTDEV) += skeleton\n DIRS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw\n+DIRS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw\n DIRS-$(CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF) += octeontx\n ifeq ($(CONFIG_RTE_LIBRTE_DPAA_BUS),y)\n DIRS-$(CONFIG_RTE_LIBRTE_PMD_DPAA_EVENTDEV) += dpaa\ndiff --git a/drivers/event/dsw/Makefile b/drivers/event/dsw/Makefile\nnew file mode 100644\nindex 000000000..3565a37d3\n--- /dev/null\n+++ b/drivers/event/dsw/Makefile\n@@ -0,0 +1,28 @@\n+# SPDX-License-Identifier: BSD-3-Clause\n+# Copyright(c) 2018 Ericsson AB\n+\n+include $(RTE_SDK)/mk/rte.vars.mk\n+\n+LIB = librte_pmd_dsw_event.a\n+\n+CFLAGS += -DALLOW_EXPERIMENTAL_API\n+CFLAGS += -O3\n+CFLAGS += $(WERROR_FLAGS)\n+CFLAGS += -Wno-format-nonliteral\n+\n+LDLIBS += -lrte_eal\n+LDLIBS += -lrte_mbuf\n+LDLIBS += -lrte_mempool\n+LDLIBS += -lrte_ring\n+LDLIBS += -lrte_eventdev\n+LDLIBS += -lrte_bus_vdev\n+\n+LIBABIVER := 1\n+\n+EXPORT_MAP := rte_pmd_dsw_event_version.map\n+\n+SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw_evdev.c\n+SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw_event.c\n+SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw_xstats.c\n+\n+include $(RTE_SDK)/mk/rte.lib.mk\ndiff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c\nnew file mode 100644\nindex 000000000..657c28006\n--- /dev/null\n+++ b/drivers/event/dsw/dsw_evdev.c\n@@ -0,0 +1,432 @@\n+/* SPDX-License-Identifier: BSD-3-Clause\n+ * Copyright(c) 2018 Ericsson AB\n+ */\n+\n+#include <rte_cycles.h>\n+#include <rte_eventdev_pmd.h>\n+#include <rte_eventdev_pmd_vdev.h>\n+#include <rte_random.h>\n+\n+#include \"dsw_evdev.h\"\n+\n+#define EVENTDEV_NAME_DSW_PMD event_dsw\n+\n+static int\n+dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,\n+\t       const struct rte_event_port_conf *conf)\n+{\n+\tstruct dsw_evdev *dsw = dsw_pmd_priv(dev);\n+\tstruct dsw_port *port;\n+\tstruct rte_event_ring *in_ring;\n+\tstruct rte_ring *ctl_in_ring;\n+\tchar ring_name[RTE_RING_NAMESIZE];\n+\n+\tport = &dsw->ports[port_id];\n+\n+\t*port = (struct dsw_port) {\n+\t\t.id = port_id,\n+\t\t.dsw = dsw,\n+\t\t.dequeue_depth = conf->dequeue_depth,\n+\t\t.enqueue_depth = conf->enqueue_depth,\n+\t\t.new_event_threshold = conf->new_event_threshold\n+\t};\n+\n+\tsnprintf(ring_name, sizeof(ring_name), \"dsw%d_p%u\", dev->data->dev_id,\n+\t\t port_id);\n+\n+\tin_ring = rte_event_ring_create(ring_name, DSW_IN_RING_SIZE,\n+\t\t\t\t\tdev->data->socket_id,\n+\t\t\t\t\tRING_F_SC_DEQ|RING_F_EXACT_SZ);\n+\n+\tif (in_ring == NULL)\n+\t\treturn -ENOMEM;\n+\n+\tsnprintf(ring_name, sizeof(ring_name), \"dswctl%d_p%u\",\n+\t\t dev->data->dev_id, port_id);\n+\n+\tctl_in_ring = rte_ring_create(ring_name, DSW_CTL_IN_RING_SIZE,\n+\t\t\t\t      dev->data->socket_id,\n+\t\t\t\t      RING_F_SC_DEQ|RING_F_EXACT_SZ);\n+\n+\tif (in_ring == NULL) {\n+\t\trte_event_ring_free(in_ring);\n+\t\treturn -ENOMEM;\n+\t}\n+\n+\tport->in_ring = in_ring;\n+\tport->ctl_in_ring = ctl_in_ring;\n+\n+\trte_atomic16_init(&port->load);\n+\n+\tport->load_update_interval =\n+\t\t(DSW_LOAD_UPDATE_INTERVAL * rte_get_timer_hz()) / US_PER_S;\n+\n+\tport->migration_interval =\n+\t\t(DSW_MIGRATION_INTERVAL * rte_get_timer_hz()) / US_PER_S;\n+\n+\tdev->data->ports[port_id] = port;\n+\n+\treturn 0;\n+}\n+\n+static void\n+dsw_port_def_conf(struct rte_eventdev *dev __rte_unused,\n+\t\t  uint8_t port_id __rte_unused,\n+\t\t  struct rte_event_port_conf *port_conf)\n+{\n+\t*port_conf = (struct rte_event_port_conf) {\n+\t\t.new_event_threshold = 1024,\n+\t\t.dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH / 4,\n+\t\t.enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH / 4\n+\t};\n+}\n+\n+static void\n+dsw_port_release(void *p)\n+{\n+\tstruct dsw_port *port = p;\n+\n+\trte_event_ring_free(port->in_ring);\n+\trte_ring_free(port->ctl_in_ring);\n+}\n+\n+static int\n+dsw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id,\n+\t\tconst struct rte_event_queue_conf *conf)\n+{\n+\tstruct dsw_evdev *dsw = dsw_pmd_priv(dev);\n+\tstruct dsw_queue *queue = &dsw->queues[queue_id];\n+\n+\tif (RTE_EVENT_QUEUE_CFG_ALL_TYPES & conf->event_queue_cfg)\n+\t\treturn -ENOTSUP;\n+\n+\tif (conf->schedule_type == RTE_SCHED_TYPE_ORDERED)\n+\t\treturn -ENOTSUP;\n+\n+\t/* SINGLE_LINK is better off treated as TYPE_ATOMIC, since it\n+\t * avoid the \"fake\" TYPE_PARALLEL flow_id assignment. Since\n+\t * the queue will only have a single serving port, no\n+\t * migration will ever happen, so the extra TYPE_ATOMIC\n+\t * migration overhead is avoided.\n+\t */\n+\tif (RTE_EVENT_QUEUE_CFG_SINGLE_LINK & conf->event_queue_cfg)\n+\t\tqueue->schedule_type = RTE_SCHED_TYPE_ATOMIC;\n+\telse /* atomic or parallel */\n+\t\tqueue->schedule_type = conf->schedule_type;\n+\n+\tqueue->num_serving_ports = 0;\n+\n+\treturn 0;\n+}\n+\n+static void\n+dsw_queue_def_conf(struct rte_eventdev *dev __rte_unused,\n+\t\t   uint8_t queue_id __rte_unused,\n+\t\t   struct rte_event_queue_conf *queue_conf)\n+{\n+\t*queue_conf = (struct rte_event_queue_conf) {\n+\t\t.nb_atomic_flows = 4096,\n+\t\t.schedule_type = RTE_SCHED_TYPE_ATOMIC,\n+\t\t.priority = RTE_EVENT_DEV_PRIORITY_NORMAL\n+\t};\n+}\n+\n+static void\n+dsw_queue_release(struct rte_eventdev *dev __rte_unused,\n+\t\t  uint8_t queue_id __rte_unused)\n+{\n+}\n+\n+static void\n+queue_add_port(struct dsw_queue *queue, uint16_t port_id)\n+{\n+\tqueue->serving_ports[queue->num_serving_ports] = port_id;\n+\tqueue->num_serving_ports++;\n+}\n+\n+static bool\n+queue_remove_port(struct dsw_queue *queue, uint16_t port_id)\n+{\n+\tuint16_t i;\n+\n+\tfor (i = 0; i < queue->num_serving_ports; i++)\n+\t\tif (queue->serving_ports[i] == port_id) {\n+\t\t\tuint16_t last_idx = queue->num_serving_ports - 1;\n+\t\t\tif (i != last_idx)\n+\t\t\t\tqueue->serving_ports[i] =\n+\t\t\t\t\tqueue->serving_ports[last_idx];\n+\t\t\tqueue->num_serving_ports--;\n+\t\t\treturn true;\n+\t\t}\n+\treturn false;\n+}\n+\n+static int\n+dsw_port_link_unlink(struct rte_eventdev *dev, void *port,\n+\t\t     const uint8_t queues[], uint16_t num, bool link)\n+{\n+\tstruct dsw_evdev *dsw = dsw_pmd_priv(dev);\n+\tstruct dsw_port *p = port;\n+\tuint16_t i;\n+\tuint16_t count = 0;\n+\n+\tfor (i = 0; i < num; i++) {\n+\t\tuint8_t qid = queues[i];\n+\t\tstruct dsw_queue *q = &dsw->queues[qid];\n+\t\tif (link) {\n+\t\t\tqueue_add_port(q, p->id);\n+\t\t\tcount++;\n+\t\t} else {\n+\t\t\tbool removed = queue_remove_port(q, p->id);\n+\t\t\tif (removed)\n+\t\t\t\tcount++;\n+\t\t}\n+\t}\n+\n+\treturn count;\n+}\n+\n+static int\n+dsw_port_link(struct rte_eventdev *dev, void *port, const uint8_t queues[],\n+\t      const uint8_t priorities[] __rte_unused, uint16_t num)\n+{\n+\treturn dsw_port_link_unlink(dev, port, queues, num, true);\n+}\n+\n+static int\n+dsw_port_unlink(struct rte_eventdev *dev, void *port, uint8_t queues[],\n+\t\tuint16_t num)\n+{\n+\treturn dsw_port_link_unlink(dev, port, queues, num, false);\n+}\n+\n+static void\n+dsw_info_get(struct rte_eventdev *dev __rte_unused,\n+\t     struct rte_event_dev_info *info)\n+{\n+\t*info = (struct rte_event_dev_info) {\n+\t\t.driver_name = DSW_PMD_NAME,\n+\t\t.max_event_queues = DSW_MAX_QUEUES,\n+\t\t.max_event_queue_flows = DSW_MAX_FLOWS,\n+\t\t.max_event_queue_priority_levels = 1,\n+\t\t.max_event_priority_levels = 1,\n+\t\t.max_event_ports = DSW_MAX_PORTS,\n+\t\t.max_event_port_dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH,\n+\t\t.max_event_port_enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH,\n+\t\t.max_num_events = DSW_MAX_EVENTS,\n+\t\t.event_dev_cap = RTE_EVENT_DEV_CAP_BURST_MODE|\n+\t\tRTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED\n+\t};\n+}\n+\n+static int\n+dsw_configure(const struct rte_eventdev *dev)\n+{\n+\tstruct dsw_evdev *dsw = dsw_pmd_priv(dev);\n+\tconst struct rte_event_dev_config *conf = &dev->data->dev_conf;\n+\tint32_t min_max_in_flight;\n+\n+\tdsw->num_queues = conf->nb_event_queues;\n+\tdsw->num_ports = conf->nb_event_ports;\n+\n+\t/* Avoid a situation where consumer ports are holding all the\n+\t * credits, without making use of them.\n+\t */\n+\tmin_max_in_flight = conf->nb_event_ports * DSW_PORT_MAX_CREDITS;\n+\n+\tdsw->max_inflight = RTE_MAX(conf->nb_events_limit, min_max_in_flight);\n+\n+\treturn 0;\n+}\n+\n+static void\n+initial_flow_to_port_assignment(struct dsw_evdev *dsw)\n+{\n+\tuint8_t queue_id;\n+\tfor (queue_id = 0; queue_id < dsw->num_queues; queue_id++) {\n+\t\tstruct dsw_queue *queue = &dsw->queues[queue_id];\n+\t\tuint16_t flow_hash;\n+\t\tfor (flow_hash = 0; flow_hash < DSW_MAX_FLOWS; flow_hash++) {\n+\t\t\tuint8_t port_idx =\n+\t\t\t\trte_rand() % queue->num_serving_ports;\n+\t\t\tuint8_t port_id =\n+\t\t\t\tqueue->serving_ports[port_idx];\n+\t\t\tdsw->queues[queue_id].flow_to_port_map[flow_hash] =\n+\t\t\t\tport_id;\n+\t\t}\n+\t}\n+}\n+\n+static int\n+dsw_start(struct rte_eventdev *dev)\n+{\n+\tstruct dsw_evdev *dsw = dsw_pmd_priv(dev);\n+\tuint16_t i;\n+\tuint64_t now;\n+\n+\trte_atomic32_init(&dsw->credits_on_loan);\n+\n+\tinitial_flow_to_port_assignment(dsw);\n+\n+\tnow = rte_get_timer_cycles();\n+\tfor (i = 0; i < dsw->num_ports; i++) {\n+\t\tdsw->ports[i].measurement_start = now;\n+\t\tdsw->ports[i].busy_start = now;\n+\t}\n+\n+\treturn 0;\n+}\n+\n+static void\n+dsw_port_drain_buf(uint8_t dev_id, struct rte_event *buf, uint16_t buf_len,\n+\t\t   eventdev_stop_flush_t flush, void *flush_arg)\n+{\n+\tuint16_t i;\n+\n+\tfor (i = 0; i < buf_len; i++)\n+\t\tflush(dev_id, buf[i], flush_arg);\n+}\n+\n+static void\n+dsw_port_drain_paused(uint8_t dev_id, struct dsw_port *port,\n+\t\t      eventdev_stop_flush_t flush, void *flush_arg)\n+{\n+\tdsw_port_drain_buf(dev_id, port->paused_events, port->paused_events_len,\n+\t\t\t   flush, flush_arg);\n+}\n+\n+static void\n+dsw_port_drain_out(uint8_t dev_id, struct dsw_evdev *dsw, struct dsw_port *port,\n+\t\t   eventdev_stop_flush_t flush, void *flush_arg)\n+{\n+\tuint16_t dport_id;\n+\n+\tfor (dport_id = 0; dport_id < dsw->num_ports; dport_id++)\n+\t\tif (dport_id != port->id)\n+\t\t\tdsw_port_drain_buf(dev_id, port->out_buffer[dport_id],\n+\t\t\t\t\t   port->out_buffer_len[dport_id],\n+\t\t\t\t\t   flush, flush_arg);\n+}\n+\n+static void\n+dsw_port_drain_in_ring(uint8_t dev_id, struct dsw_port *port,\n+\t\t       eventdev_stop_flush_t flush, void *flush_arg)\n+{\n+\tstruct rte_event ev;\n+\n+\twhile (rte_event_ring_dequeue_burst(port->in_ring, &ev, 1, NULL))\n+\t\tflush(dev_id, ev, flush_arg);\n+}\n+\n+static void\n+dsw_drain(uint8_t dev_id, struct dsw_evdev *dsw,\n+\t  eventdev_stop_flush_t flush, void *flush_arg)\n+{\n+\tuint16_t port_id;\n+\n+\tif (flush == NULL)\n+\t\treturn;\n+\n+\tfor (port_id = 0; port_id < dsw->num_ports; port_id++) {\n+\t\tstruct dsw_port *port = &dsw->ports[port_id];\n+\n+\t\tdsw_port_drain_paused(dev_id, port, flush, flush_arg);\n+\t\tdsw_port_drain_out(dev_id, dsw, port, flush, flush_arg);\n+\t\tdsw_port_drain_in_ring(dev_id, port, flush, flush_arg);\n+\t}\n+}\n+\n+static void\n+dsw_stop(struct rte_eventdev *dev)\n+{\n+\tstruct dsw_evdev *dsw = dsw_pmd_priv(dev);\n+\tuint8_t dev_id;\n+\teventdev_stop_flush_t flush;\n+\tvoid *flush_arg;\n+\n+\tdev_id = dev->data->dev_id;\n+\tflush = dev->dev_ops->dev_stop_flush;\n+\tflush_arg = dev->data->dev_stop_flush_arg;\n+\n+\tdsw_drain(dev_id, dsw, flush, flush_arg);\n+}\n+\n+static int\n+dsw_close(struct rte_eventdev *dev)\n+{\n+\tstruct dsw_evdev *dsw = dsw_pmd_priv(dev);\n+\n+\tdsw->num_ports = 0;\n+\tdsw->num_queues = 0;\n+\n+\treturn 0;\n+}\n+\n+static struct rte_eventdev_ops dsw_evdev_ops = {\n+\t.dev_infos_get = dsw_info_get,\n+\t.dev_configure = dsw_configure,\n+\t.dev_start = dsw_start,\n+\t.dev_stop = dsw_stop,\n+\t.dev_close = dsw_close,\n+\t.port_setup = dsw_port_setup,\n+\t.port_def_conf = dsw_port_def_conf,\n+\t.port_release = dsw_port_release,\n+\t.queue_setup = dsw_queue_setup,\n+\t.queue_def_conf = dsw_queue_def_conf,\n+\t.queue_release = dsw_queue_release,\n+\t.port_link = dsw_port_link,\n+\t.port_unlink = dsw_port_unlink,\n+\t.xstats_get = dsw_xstats_get,\n+\t.xstats_get_names = dsw_xstats_get_names,\n+\t.xstats_get_by_name = dsw_xstats_get_by_name\n+};\n+\n+static int\n+dsw_probe(struct rte_vdev_device *vdev)\n+{\n+\tconst char *name;\n+\tstruct rte_eventdev *dev;\n+\tstruct dsw_evdev *dsw;\n+\n+\tname = rte_vdev_device_name(vdev);\n+\n+\tdev = rte_event_pmd_vdev_init(name, sizeof(struct dsw_evdev),\n+\t\t\t\t      rte_socket_id());\n+\tif (dev == NULL)\n+\t\treturn -EFAULT;\n+\n+\tdev->dev_ops = &dsw_evdev_ops;\n+\tdev->enqueue = dsw_event_enqueue;\n+\tdev->enqueue_burst = dsw_event_enqueue_burst;\n+\tdev->enqueue_new_burst = dsw_event_enqueue_new_burst;\n+\tdev->enqueue_forward_burst = dsw_event_enqueue_forward_burst;\n+\tdev->dequeue = dsw_event_dequeue;\n+\tdev->dequeue_burst = dsw_event_dequeue_burst;\n+\n+\tif (rte_eal_process_type() != RTE_PROC_PRIMARY)\n+\t\treturn 0;\n+\n+\tdsw = dev->data->dev_private;\n+\tdsw->data = dev->data;\n+\n+\treturn 0;\n+}\n+\n+static int\n+dsw_remove(struct rte_vdev_device *vdev)\n+{\n+\tconst char *name;\n+\n+\tname = rte_vdev_device_name(vdev);\n+\tif (name == NULL)\n+\t\treturn -EINVAL;\n+\n+\treturn rte_event_pmd_vdev_uninit(name);\n+}\n+\n+static struct rte_vdev_driver evdev_dsw_pmd_drv = {\n+\t.probe = dsw_probe,\n+\t.remove = dsw_remove\n+};\n+\n+RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_DSW_PMD, evdev_dsw_pmd_drv);\ndiff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h\nnew file mode 100644\nindex 000000000..52c6110e6\n--- /dev/null\n+++ b/drivers/event/dsw/dsw_evdev.h\n@@ -0,0 +1,288 @@\n+/* SPDX-License-Identifier: BSD-3-Clause\n+ * Copyright(c) 2018 Ericsson AB\n+ */\n+\n+#ifndef _DSW_EVDEV_H_\n+#define _DSW_EVDEV_H_\n+\n+#include <inttypes.h>\n+\n+#include <stdbool.h>\n+\n+#include <rte_config.h>\n+#include <rte_event_ring.h>\n+#include <rte_event_ring.h>\n+#include <rte_eventdev.h>\n+#include <rte_ring.h>\n+\n+#define DSW_PMD_NAME RTE_STR(event_dsw)\n+\n+/* Code changes are required to allow more ports. */\n+#define DSW_MAX_PORTS (64)\n+#define DSW_MAX_PORT_OUT_BUFFER (32)\n+#define DSW_MAX_PORT_DEQUEUE_DEPTH (128)\n+#define DSW_MAX_PORT_ENQUEUE_DEPTH (DSW_MAX_PORT_OUT_BUFFER)\n+\n+#define DSW_MAX_QUEUES (16)\n+\n+#define DSW_MAX_EVENTS (16384)\n+\n+/* Code changes are required to allow more flows than 32k. */\n+#define DSW_MAX_FLOWS_BITS (15)\n+#define DSW_MAX_FLOWS (1<<(DSW_MAX_FLOWS_BITS))\n+#define DSW_MAX_FLOWS_MASK (DSW_MAX_FLOWS-1)\n+\n+/* Eventdev RTE_SCHED_TYPE_PARALLEL doesn't have a concept of flows,\n+ * but the 'dsw' scheduler (more or less) randomly assign flow id to\n+ * events on parallel queues, to be able to reuse some of the\n+ * migration mechanism and scheduling logic from\n+ * RTE_SCHED_TYPE_ATOMIC. By moving one of the parallel \"flows\" from a\n+ * particular port, the likely-hood of events being scheduled to this\n+ * port is reduced, and thus a kind of statistical load balancing is\n+ * achieved.\n+ */\n+#define DSW_PARALLEL_FLOWS (1024)\n+\n+/* 'Background tasks' are polling the control rings for *\n+ *  migration-related messages, or flush the output buffer (so\n+ *  buffered events doesn't linger too long). Shouldn't be too low,\n+ *  since the system won't benefit from the 'batching' effects from\n+ *  the output buffer, and shouldn't be too high, since it will make\n+ *  buffered events linger too long in case the port goes idle.\n+ */\n+#define DSW_MAX_PORT_OPS_PER_BG_TASK (128)\n+\n+/* Avoid making small 'loans' from the central in-flight event credit\n+ * pool, to improve efficiency.\n+ */\n+#define DSW_MIN_CREDIT_LOAN (64)\n+#define DSW_PORT_MAX_CREDITS (2*DSW_MIN_CREDIT_LOAN)\n+#define DSW_PORT_MIN_CREDITS (DSW_MIN_CREDIT_LOAN)\n+\n+/* The rings are dimensioned so that all in-flight events can reside\n+ * on only one of the port rings, to avoid the trouble of having to\n+ * care about the case of an event not fitting on the receiver port's\n+ * ring.\n+ */\n+#define DSW_IN_RING_SIZE (DSW_MAX_EVENTS)\n+\n+#define DSW_MAX_LOAD (INT16_MAX)\n+#define DSW_LOAD_FROM_PERCENT(x) ((int16_t)(((x)*DSW_MAX_LOAD)/100))\n+#define DSW_LOAD_TO_PERCENT(x) ((100*x)/DSW_MAX_LOAD)\n+\n+/* The thought behind keeping the load update interval shorter than\n+ * the migration interval is that the load from newly migrated flows\n+ * should 'show up' on the load measurement before new migrations are\n+ * considered. This is to avoid having too many flows, from too many\n+ * source ports, to be migrated too quickly to a lightly loaded port -\n+ * in particular since this might cause the system to oscillate.\n+ */\n+#define DSW_LOAD_UPDATE_INTERVAL (DSW_MIGRATION_INTERVAL/4)\n+#define DSW_OLD_LOAD_WEIGHT (1)\n+\n+/* The minimum time (in us) between two flow migrations. What puts an\n+ * upper limit on the actual migration rate is primarily the pace in\n+ * which the ports send and receive control messages, which in turn is\n+ * largely a function of how much cycles are spent the processing of\n+ * an event burst.\n+ */\n+#define DSW_MIGRATION_INTERVAL (1000)\n+#define DSW_MIN_SOURCE_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(70))\n+#define DSW_MAX_TARGET_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(95))\n+\n+#define DSW_MAX_EVENTS_RECORDED (128)\n+\n+/* Only one outstanding migration per port is allowed */\n+#define DSW_MAX_PAUSED_FLOWS (DSW_MAX_PORTS)\n+\n+/* Enough room for paus request/confirm and unpaus request/confirm for\n+ * all possible senders.\n+ */\n+#define DSW_CTL_IN_RING_SIZE ((DSW_MAX_PORTS-1)*4)\n+\n+/* With DSW_SORT_DEQUEUED enabled, the scheduler will, at the point of\n+ * dequeue(), arrange events so that events with the same flow id on\n+ * the same queue forms a back-to-back \"burst\", and also so that such\n+ * bursts of different flow ids, but on the same queue, also come\n+ * consecutively. All this in an attempt to improve data and\n+ * instruction cache usage for the application, at the cost of a\n+ * scheduler overhead increase.\n+ */\n+\n+/* #define DSW_SORT_DEQUEUED */\n+\n+struct dsw_queue_flow {\n+\tuint8_t queue_id;\n+\tuint16_t flow_hash;\n+};\n+\n+enum dsw_migration_state {\n+\tDSW_MIGRATION_STATE_IDLE,\n+\tDSW_MIGRATION_STATE_PAUSING,\n+\tDSW_MIGRATION_STATE_FORWARDING,\n+\tDSW_MIGRATION_STATE_UNPAUSING\n+};\n+\n+struct dsw_port {\n+\tuint16_t id;\n+\n+\t/* Keeping a pointer here to avoid container_of() calls, which\n+\t * are expensive since they are very frequent and will result\n+\t * in an integer multiplication (since the port id is an index\n+\t * into the dsw_evdev port array).\n+\t */\n+\tstruct dsw_evdev *dsw;\n+\n+\tuint16_t dequeue_depth;\n+\tuint16_t enqueue_depth;\n+\n+\tint32_t inflight_credits;\n+\n+\tint32_t new_event_threshold;\n+\n+\tuint16_t pending_releases;\n+\n+\tuint16_t next_parallel_flow_id;\n+\n+\tuint16_t ops_since_bg_task;\n+\n+\t/* For port load measurement. */\n+\tuint64_t next_load_update;\n+\tuint64_t load_update_interval;\n+\tuint64_t measurement_start;\n+\tuint64_t busy_start;\n+\tuint64_t busy_cycles;\n+\n+\t/* For the ctl interface and flow migration mechanism. */\n+\tuint64_t next_migration;\n+\tuint64_t migration_interval;\n+\tenum dsw_migration_state migration_state;\n+\n+\tuint64_t new_enqueued;\n+\tuint64_t forward_enqueued;\n+\tuint64_t release_enqueued;\n+\tuint64_t queue_enqueued[DSW_MAX_QUEUES];\n+\n+\tuint64_t dequeued;\n+\tuint64_t queue_dequeued[DSW_MAX_QUEUES];\n+\n+\tuint64_t migration_start;\n+\tuint64_t migrations;\n+\tuint64_t migration_latency;\n+\n+\tuint64_t total_busy_cycles;\n+\n+\tuint64_t last_bg;\n+\n+\tuint8_t migration_target_port_id;\n+\tstruct dsw_queue_flow migration_target_qf;\n+\tuint8_t cfm_cnt;\n+\n+\tuint16_t paused_flows_len;\n+\tstruct dsw_queue_flow paused_flows[DSW_MAX_PAUSED_FLOWS];\n+\n+\t/* In a very contrived worst case all inflight events can be\n+\t * laying around paused here.\n+\t */\n+\tuint16_t paused_events_len;\n+\tstruct rte_event paused_events[DSW_MAX_EVENTS];\n+\n+\tuint16_t seen_events_len;\n+\tuint16_t seen_events_idx;\n+\tstruct dsw_queue_flow seen_events[DSW_MAX_EVENTS_RECORDED];\n+\n+\tuint16_t out_buffer_len[DSW_MAX_PORTS];\n+\tstruct rte_event out_buffer[DSW_MAX_PORTS][DSW_MAX_PORT_OUT_BUFFER];\n+\n+\tuint16_t in_buffer_len;\n+\tuint16_t in_buffer_start;\n+\t/* This buffer may contain events that were read up from the\n+\t * in_ring during the flow migration process.\n+\t */\n+\tstruct rte_event in_buffer[DSW_MAX_EVENTS];\n+\n+\tstruct rte_event_ring *in_ring __rte_cache_aligned;\n+\n+\tstruct rte_ring *ctl_in_ring __rte_cache_aligned;\n+\n+\t/* Estimate of current port load. */\n+\trte_atomic16_t load __rte_cache_aligned;\n+} __rte_cache_aligned;\n+\n+struct dsw_queue {\n+\tuint8_t schedule_type;\n+\tuint8_t serving_ports[DSW_MAX_PORTS];\n+\tuint16_t num_serving_ports;\n+\n+\tuint8_t flow_to_port_map[DSW_MAX_FLOWS] __rte_cache_aligned;\n+};\n+\n+struct dsw_evdev {\n+\tstruct rte_eventdev_data *data;\n+\n+\tstruct dsw_port ports[DSW_MAX_PORTS];\n+\tuint16_t num_ports;\n+\tstruct dsw_queue queues[DSW_MAX_QUEUES];\n+\tuint8_t num_queues;\n+\tint32_t max_inflight;\n+\n+\trte_atomic32_t credits_on_loan __rte_cache_aligned;\n+};\n+\n+#define DSW_CTL_PAUS_REQ (0)\n+#define DSW_CTL_UNPAUS_REQ (1)\n+#define DSW_CTL_CFM (2)\n+\n+/* sizeof(struct dsw_ctl_msg) must be equal or less than\n+ * sizeof(void *), to fit on the control ring.\n+ */\n+struct dsw_ctl_msg {\n+\tuint8_t type:2;\n+\tuint8_t originating_port_id:6;\n+\tuint8_t queue_id;\n+\tuint16_t flow_hash;\n+} __rte_packed;\n+\n+uint16_t dsw_event_enqueue(void *port, const struct rte_event *event);\n+uint16_t dsw_event_enqueue_burst(void *port,\n+\t\t\t\t const struct rte_event events[],\n+\t\t\t\t uint16_t events_len);\n+uint16_t dsw_event_enqueue_new_burst(void *port,\n+\t\t\t\t     const struct rte_event events[],\n+\t\t\t\t     uint16_t events_len);\n+uint16_t dsw_event_enqueue_forward_burst(void *port,\n+\t\t\t\t\t const struct rte_event events[],\n+\t\t\t\t\t uint16_t events_len);\n+\n+uint16_t dsw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait);\n+uint16_t dsw_event_dequeue_burst(void *port, struct rte_event *events,\n+\t\t\t\t uint16_t num, uint64_t wait);\n+\n+void dsw_event_schedule(struct rte_eventdev *dev);\n+\n+int dsw_xstats_get_names(const struct rte_eventdev *dev,\n+\t\t\t enum rte_event_dev_xstats_mode mode,\n+\t\t\t uint8_t queue_port_id,\n+\t\t\t struct rte_event_dev_xstats_name *xstats_names,\n+\t\t\t unsigned int *ids, unsigned int size);\n+int dsw_xstats_get(const struct rte_eventdev *dev,\n+\t\t   enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id,\n+\t\t   const unsigned int ids[], uint64_t values[], unsigned int n);\n+uint64_t dsw_xstats_get_by_name(const struct rte_eventdev *dev,\n+\t\t\t\tconst char *name, unsigned int *id);\n+\n+static inline struct dsw_evdev *\n+dsw_pmd_priv(const struct rte_eventdev *eventdev)\n+{\n+\treturn eventdev->data->dev_private;\n+}\n+\n+#define DSW_LOG_DP(level, fmt, args...)\t\t\t\t\t\\\n+\tRTE_LOG_DP(level, EVENTDEV, \"[%s] %s() line %u: \" fmt,\t\t\\\n+\t\t   DSW_PMD_NAME,\t\t\t\t\t\\\n+\t\t   __func__, __LINE__, ## args)\n+\n+#define DSW_LOG_DP_PORT(level, port_id, fmt, args...)\t\t\\\n+\tDSW_LOG_DP(level, \"<Port %d> \" fmt, port_id, ## args)\n+\n+#endif\ndiff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c\nnew file mode 100644\nindex 000000000..58054e44e\n--- /dev/null\n+++ b/drivers/event/dsw/dsw_event.c\n@@ -0,0 +1,1261 @@\n+/* SPDX-License-Identifier: BSD-3-Clause\n+ * Copyright(c) 2018 Ericsson AB\n+ */\n+\n+#ifdef DSW_SORT_DEQUEUED\n+#include \"dsw_sort.h\"\n+#endif\n+\n+#include \"dsw_evdev.h\"\n+\n+#include <stdbool.h>\n+#include <stdlib.h>\n+#include <string.h>\n+#include <unistd.h>\n+\n+#include <rte_atomic.h>\n+#include <rte_branch_prediction.h>\n+#include <rte_cycles.h>\n+#include <rte_memcpy.h>\n+#include <rte_random.h>\n+\n+static bool\n+dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,\n+\t\t\t int32_t credits)\n+{\n+\tint32_t inflight_credits = port->inflight_credits;\n+\tint32_t missing_credits = credits - inflight_credits;\n+\tint32_t total_on_loan;\n+\tint32_t available;\n+\tint32_t acquired_credits;\n+\tint32_t new_total_on_loan;\n+\n+\tif (likely(missing_credits <= 0)) {\n+\t\tport->inflight_credits -= credits;\n+\t\treturn true;\n+\t}\n+\n+\ttotal_on_loan = rte_atomic32_read(&dsw->credits_on_loan);\n+\tavailable = dsw->max_inflight - total_on_loan;\n+\tacquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);\n+\n+\tif (available < acquired_credits)\n+\t\treturn false;\n+\n+\t/* This is a race, no locks are involved, and thus some other\n+\t * thread can allocate tokens in between the check and the\n+\t * allocation.\n+\t */\n+\tnew_total_on_loan = rte_atomic32_add_return(&dsw->credits_on_loan,\n+\t\t\t\t\t\t    acquired_credits);\n+\n+\tif (unlikely(new_total_on_loan > dsw->max_inflight)) {\n+\t\t/* Some other port took the last credits */\n+\t\trte_atomic32_sub(&dsw->credits_on_loan, acquired_credits);\n+\t\treturn false;\n+\t}\n+\n+\tDSW_LOG_DP_PORT(DEBUG, port->id, \"Acquired %d tokens from pool.\\n\",\n+\t\t\tacquired_credits);\n+\n+\tport->inflight_credits += acquired_credits;\n+\tport->inflight_credits -= credits;\n+\n+\treturn true;\n+}\n+\n+static void\n+dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,\n+\t\t\tint32_t credits)\n+{\n+\tport->inflight_credits += credits;\n+\n+\tif (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {\n+\t\tint32_t leave_credits = DSW_PORT_MIN_CREDITS;\n+\t\tint32_t return_credits =\n+\t\t\tport->inflight_credits - leave_credits;\n+\n+\t\tport->inflight_credits = leave_credits;\n+\n+\t\trte_atomic32_sub(&dsw->credits_on_loan, return_credits);\n+\n+\t\tDSW_LOG_DP_PORT(DEBUG, port->id,\n+\t\t\t\t\"Returned %d tokens to pool.\\n\",\n+\t\t\t\treturn_credits);\n+\t}\n+}\n+\n+static void\n+dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,\n+\t\t       uint16_t num_forward, uint16_t num_release)\n+{\n+\tport->new_enqueued += num_new;\n+\tport->forward_enqueued += num_forward;\n+\tport->release_enqueued += num_release;\n+}\n+\n+static void\n+dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)\n+{\n+\tsource_port->queue_enqueued[queue_id]++;\n+}\n+\n+static void\n+dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)\n+{\n+\tport->dequeued += num;\n+}\n+\n+static void\n+dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)\n+{\n+\tsource_port->queue_dequeued[queue_id]++;\n+}\n+\n+static void\n+dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)\n+{\n+\tif (dequeued > 0 && port->busy_start == 0)\n+\t\t/* work period begins */\n+\t\tport->busy_start = rte_get_timer_cycles();\n+\telse if (dequeued == 0 && port->busy_start > 0) {\n+\t\t/* work period ends */\n+\t\tuint64_t work_period =\n+\t\t\trte_get_timer_cycles() - port->busy_start;\n+\t\tport->busy_cycles += work_period;\n+\t\tport->busy_start = 0;\n+\t}\n+}\n+\n+static int16_t\n+dsw_port_load_close_period(struct dsw_port *port, uint64_t now)\n+{\n+\tuint64_t passed = now - port->measurement_start;\n+\tuint64_t busy_cycles = port->busy_cycles;\n+\n+\tif (port->busy_start > 0) {\n+\t\tbusy_cycles += (now - port->busy_start);\n+\t\tport->busy_start = now;\n+\t}\n+\n+\tint16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;\n+\n+\tport->measurement_start = now;\n+\tport->busy_cycles = 0;\n+\n+\tport->total_busy_cycles += busy_cycles;\n+\n+\treturn load;\n+}\n+\n+static void\n+dsw_port_load_update(struct dsw_port *port, uint64_t now)\n+{\n+\tint16_t old_load;\n+\tint16_t period_load;\n+\tint16_t new_load;\n+\n+\told_load = rte_atomic16_read(&port->load);\n+\n+\tperiod_load = dsw_port_load_close_period(port, now);\n+\n+\tnew_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /\n+\t\t(DSW_OLD_LOAD_WEIGHT+1);\n+\n+\trte_atomic16_set(&port->load, new_load);\n+}\n+\n+static void\n+dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)\n+{\n+\tif (now < port->next_load_update)\n+\t\treturn;\n+\n+\tport->next_load_update = now + port->load_update_interval;\n+\n+\tdsw_port_load_update(port, now);\n+}\n+\n+static void\n+dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)\n+{\n+\tvoid *raw_msg;\n+\n+\tmemcpy(&raw_msg, msg, sizeof(*msg));\n+\n+\t/* there's always room on the ring */\n+\twhile (rte_ring_enqueue(port->ctl_in_ring, raw_msg) != 0)\n+\t\trte_pause();\n+}\n+\n+static int\n+dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)\n+{\n+\tvoid *raw_msg;\n+\tint rc;\n+\n+\trc = rte_ring_dequeue(port->ctl_in_ring, &raw_msg);\n+\n+\tif (rc == 0)\n+\t\tmemcpy(msg, &raw_msg, sizeof(*msg));\n+\n+\treturn rc;\n+}\n+\n+static void\n+dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,\n+\t\t       uint8_t type, uint8_t queue_id, uint16_t flow_hash)\n+{\n+\tuint16_t port_id;\n+\tstruct dsw_ctl_msg msg = {\n+\t\t.type = type,\n+\t\t.originating_port_id = source_port->id,\n+\t\t.queue_id = queue_id,\n+\t\t.flow_hash = flow_hash\n+\t};\n+\n+\tfor (port_id = 0; port_id < dsw->num_ports; port_id++)\n+\t\tif (port_id != source_port->id)\n+\t\t\tdsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);\n+}\n+\n+static bool\n+dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,\n+\t\t\tuint16_t flow_hash)\n+{\n+\tuint16_t i;\n+\n+\tfor (i = 0; i < port->paused_flows_len; i++) {\n+\t\tstruct dsw_queue_flow *qf = &port->paused_flows[i];\n+\t\tif (qf->queue_id == queue_id &&\n+\t\t    qf->flow_hash == flow_hash)\n+\t\t\treturn true;\n+\t}\n+\treturn false;\n+}\n+\n+static void\n+dsw_port_add_paused_flow(struct dsw_port *port, uint8_t queue_id,\n+\t\t\t uint16_t paused_flow_hash)\n+{\n+\tport->paused_flows[port->paused_flows_len] = (struct dsw_queue_flow) {\n+\t\t.queue_id = queue_id,\n+\t\t.flow_hash = paused_flow_hash\n+\t};\n+\tport->paused_flows_len++;\n+}\n+\n+static void\n+dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id,\n+\t\t\t    uint16_t paused_flow_hash)\n+{\n+\tuint16_t i;\n+\n+\tfor (i = 0; i < port->paused_flows_len; i++) {\n+\t\tstruct dsw_queue_flow *qf = &port->paused_flows[i];\n+\n+\t\tif (qf->queue_id == queue_id &&\n+\t\t    qf->flow_hash == paused_flow_hash) {\n+\t\t\tuint16_t last_idx = port->paused_flows_len-1;\n+\t\t\tif (i != last_idx)\n+\t\t\t\tport->paused_flows[i] =\n+\t\t\t\t\tport->paused_flows[last_idx];\n+\t\t\tport->paused_flows_len--;\n+\t\t\tbreak;\n+\t\t}\n+\t}\n+}\n+\n+static void\n+dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);\n+\n+static void\n+dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port,\n+\t\t\t   uint8_t originating_port_id, uint8_t queue_id,\n+\t\t\t   uint16_t paused_flow_hash)\n+{\n+\tstruct dsw_ctl_msg cfm = {\n+\t\t.type = DSW_CTL_CFM,\n+\t\t.originating_port_id = port->id,\n+\t\t.queue_id = queue_id,\n+\t\t.flow_hash = paused_flow_hash\n+\t};\n+\n+\tDSW_LOG_DP_PORT(DEBUG, port->id, \"Pausing queue_id %d flow_hash %d.\\n\",\n+\t\t\tqueue_id, paused_flow_hash);\n+\n+\t/* There might be already-scheduled events belonging to the\n+\t * paused flow in the output buffers.\n+\t */\n+\tdsw_port_flush_out_buffers(dsw, port);\n+\n+\tdsw_port_add_paused_flow(port, queue_id, paused_flow_hash);\n+\n+\t/* Make sure any stores to the original port's in_ring is seen\n+\t * before the ctl message.\n+\t */\n+\trte_smp_wmb();\n+\n+\tdsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);\n+}\n+\n+static void\n+dsw_find_lowest_load_port(uint8_t *port_ids, uint16_t num_port_ids,\n+\t\t\t  uint8_t exclude_port_id, int16_t *port_loads,\n+\t\t\t  uint8_t *target_port_id, int16_t *target_load)\n+{\n+\tint16_t candidate_port_id = -1;\n+\tint16_t candidate_load = DSW_MAX_LOAD;\n+\tuint16_t i;\n+\n+\tfor (i = 0; i < num_port_ids; i++) {\n+\t\tuint8_t port_id = port_ids[i];\n+\t\tif (port_id != exclude_port_id) {\n+\t\t\tint16_t load = port_loads[port_id];\n+\t\t\tif (candidate_port_id == -1 ||\n+\t\t\t    load < candidate_load) {\n+\t\t\t\tcandidate_port_id = port_id;\n+\t\t\t\tcandidate_load = load;\n+\t\t\t}\n+\t\t}\n+\t}\n+\t*target_port_id = candidate_port_id;\n+\t*target_load = candidate_load;\n+}\n+\n+struct dsw_queue_flow_burst {\n+\tstruct dsw_queue_flow queue_flow;\n+\tuint16_t count;\n+};\n+\n+static inline int\n+dsw_cmp_burst(const void *v_burst_a, const void *v_burst_b)\n+{\n+\tconst struct dsw_queue_flow_burst *burst_a = v_burst_a;\n+\tconst struct dsw_queue_flow_burst *burst_b = v_burst_b;\n+\n+\tint a_count = burst_a->count;\n+\tint b_count = burst_b->count;\n+\n+\treturn a_count - b_count;\n+}\n+\n+#define DSW_QF_TO_INT(_qf)\t\t\t\t\t\\\n+\t((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))\n+\n+static inline int\n+dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)\n+{\n+\tconst struct dsw_queue_flow *qf_a = v_qf_a;\n+\tconst struct dsw_queue_flow *qf_b = v_qf_b;\n+\n+\treturn DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);\n+}\n+\n+static uint16_t\n+dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,\n+\t\t       struct dsw_queue_flow_burst *bursts)\n+{\n+\tuint16_t i;\n+\tstruct dsw_queue_flow_burst *current_burst = NULL;\n+\tuint16_t num_bursts = 0;\n+\n+\t/* We don't need the stable property, and the list is likely\n+\t * large enough for qsort() to outperform dsw_stable_sort(),\n+\t * so we use qsort() here.\n+\t */\n+\tqsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);\n+\n+\t/* arrange the (now-consecutive) events into bursts */\n+\tfor (i = 0; i < qfs_len; i++) {\n+\t\tif (i == 0 ||\n+\t\t    dsw_cmp_qf(&qfs[i], &current_burst->queue_flow) != 0) {\n+\t\t\tcurrent_burst = &bursts[num_bursts];\n+\t\t\tcurrent_burst->queue_flow = qfs[i];\n+\t\t\tcurrent_burst->count = 0;\n+\t\t\tnum_bursts++;\n+\t\t}\n+\t\tcurrent_burst->count++;\n+\t}\n+\n+\tqsort(bursts, num_bursts, sizeof(bursts[0]), dsw_cmp_burst);\n+\n+\treturn num_bursts;\n+}\n+\n+static bool\n+dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,\n+\t\t\tint16_t load_limit)\n+{\n+\tbool below_limit = false;\n+\tuint16_t i;\n+\n+\tfor (i = 0; i < dsw->num_ports; i++) {\n+\t\tint16_t load = rte_atomic16_read(&dsw->ports[i].load);\n+\t\tif (load < load_limit)\n+\t\t\tbelow_limit = true;\n+\t\tport_loads[i] = load;\n+\t}\n+\treturn below_limit;\n+}\n+\n+static bool\n+dsw_select_migration_target(struct dsw_evdev *dsw,\n+\t\t\t    struct dsw_port *source_port,\n+\t\t\t    struct dsw_queue_flow_burst *bursts,\n+\t\t\t    uint16_t num_bursts, int16_t *port_loads,\n+\t\t\t    int16_t max_load, struct dsw_queue_flow *target_qf,\n+\t\t\t    uint8_t *target_port_id)\n+{\n+\tuint16_t source_load = port_loads[source_port->id];\n+\tuint16_t i;\n+\n+\tfor (i = 0; i < num_bursts; i++) {\n+\t\tstruct dsw_queue_flow *qf = &bursts[i].queue_flow;\n+\n+\t\tif (dsw_port_is_flow_paused(source_port, qf->queue_id,\n+\t\t\t\t\t    qf->flow_hash))\n+\t\t\tcontinue;\n+\n+\t\tstruct dsw_queue *queue = &dsw->queues[qf->queue_id];\n+\t\tint16_t target_load;\n+\n+\t\tdsw_find_lowest_load_port(queue->serving_ports,\n+\t\t\t\t\t  queue->num_serving_ports,\n+\t\t\t\t\t  source_port->id, port_loads,\n+\t\t\t\t\t  target_port_id, &target_load);\n+\n+\t\tif (target_load < source_load &&\n+\t\t    target_load < max_load) {\n+\t\t\t*target_qf = *qf;\n+\t\t\treturn true;\n+\t\t}\n+\t}\n+\n+\tDSW_LOG_DP_PORT(DEBUG, source_port->id, \"For the %d flows considered, \"\n+\t\t\t\"no target port found with load less than %d.\\n\",\n+\t\t\tnum_bursts, DSW_LOAD_TO_PERCENT(max_load));\n+\n+\treturn false;\n+}\n+\n+static uint8_t\n+dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)\n+{\n+\tstruct dsw_queue *queue = &dsw->queues[queue_id];\n+\tuint8_t port_id;\n+\n+\tif (queue->num_serving_ports > 1)\n+\t\tport_id = queue->flow_to_port_map[flow_hash];\n+\telse\n+\t\t/* A single-link queue, or atomic/ordered/parallel but\n+\t\t * with just a single serving port.\n+\t\t */\n+\t\tport_id = queue->serving_ports[0];\n+\n+\tDSW_LOG_DP(DEBUG, \"Event with queue_id %d flow_hash %d is scheduled \"\n+\t\t   \"to port %d.\\n\", queue_id, flow_hash, port_id);\n+\n+\treturn port_id;\n+}\n+\n+static void\n+dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,\n+\t\t\t   uint8_t dest_port_id)\n+{\n+\tstruct dsw_port *dest_port = &(dsw->ports[dest_port_id]);\n+\tuint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];\n+\tstruct rte_event *buffer = source_port->out_buffer[dest_port_id];\n+\tuint16_t enqueued = 0;\n+\n+\tif (*buffer_len == 0)\n+\t\treturn;\n+\n+\t/* The rings are dimensioned to fit all in-flight events (even\n+\t * on a single ring), so looping will work.\n+\t */\n+\tdo {\n+\t\tenqueued +=\n+\t\t\trte_event_ring_enqueue_burst(dest_port->in_ring,\n+\t\t\t\t\t\t     buffer+enqueued,\n+\t\t\t\t\t\t     *buffer_len-enqueued,\n+\t\t\t\t\t\t     NULL);\n+\t} while (unlikely(enqueued != *buffer_len));\n+\n+\t(*buffer_len) = 0;\n+}\n+\n+static uint16_t\n+dsw_port_get_parallel_flow_id(struct dsw_port *port)\n+{\n+\tuint16_t flow_id = port->next_parallel_flow_id;\n+\n+\tport->next_parallel_flow_id =\n+\t\t(port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;\n+\n+\treturn flow_id;\n+}\n+\n+static void\n+dsw_port_buffer_paused(struct dsw_port *port,\n+\t\t       const struct rte_event *paused_event)\n+{\n+\tport->paused_events[port->paused_events_len] = *paused_event;\n+\tport->paused_events_len++;\n+}\n+\n+static void\n+dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,\n+\t\t\t   uint8_t dest_port_id, const struct rte_event *event)\n+{\n+\tstruct rte_event *buffer = source_port->out_buffer[dest_port_id];\n+\tuint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];\n+\n+\tif (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)\n+\t\tdsw_port_transmit_buffered(dsw, source_port, dest_port_id);\n+\n+\tbuffer[*buffer_len] = *event;\n+\n+\t(*buffer_len)++;\n+}\n+\n+#define DSW_FLOW_ID_BITS (24)\n+static uint16_t\n+dsw_flow_id_hash(uint32_t flow_id)\n+{\n+\tuint16_t hash = 0;\n+\tuint16_t offset = 0;\n+\n+\tdo {\n+\t\thash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);\n+\t\toffset += DSW_MAX_FLOWS_BITS;\n+\t} while (offset < DSW_FLOW_ID_BITS);\n+\n+\treturn hash;\n+}\n+\n+static void\n+dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,\n+\t\t\t struct rte_event event)\n+{\n+\tuint8_t dest_port_id;\n+\n+\tevent.flow_id = dsw_port_get_parallel_flow_id(source_port);\n+\n+\tdest_port_id = dsw_schedule(dsw, event.queue_id,\n+\t\t\t\t    dsw_flow_id_hash(event.flow_id));\n+\n+\tdsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);\n+}\n+\n+static void\n+dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,\n+\t\t      const struct rte_event *event)\n+{\n+\tuint16_t flow_hash;\n+\tuint8_t dest_port_id;\n+\n+\tif (unlikely(dsw->queues[event->queue_id].schedule_type ==\n+\t\t     RTE_SCHED_TYPE_PARALLEL)) {\n+\t\tdsw_port_buffer_parallel(dsw, source_port, *event);\n+\t\treturn;\n+\t}\n+\n+\tflow_hash = dsw_flow_id_hash(event->flow_id);\n+\n+\tif (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,\n+\t\t\t\t\t     flow_hash))) {\n+\t\tdsw_port_buffer_paused(source_port, event);\n+\t\treturn;\n+\t}\n+\n+\tdest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);\n+\n+\tdsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);\n+}\n+\n+static void\n+dsw_port_flush_paused_events(struct dsw_evdev *dsw,\n+\t\t\t     struct dsw_port *source_port,\n+\t\t\t     uint8_t queue_id, uint16_t paused_flow_hash)\n+{\n+\tuint16_t paused_events_len = source_port->paused_events_len;\n+\tstruct rte_event paused_events[paused_events_len];\n+\tuint8_t dest_port_id;\n+\tuint16_t i;\n+\n+\tif (paused_events_len == 0)\n+\t\treturn;\n+\n+\tif (dsw_port_is_flow_paused(source_port, queue_id, paused_flow_hash))\n+\t\treturn;\n+\n+\trte_memcpy(paused_events, source_port->paused_events,\n+\t\t   paused_events_len * sizeof(struct rte_event));\n+\n+\tsource_port->paused_events_len = 0;\n+\n+\tdest_port_id = dsw_schedule(dsw, queue_id, paused_flow_hash);\n+\n+\tfor (i = 0; i < paused_events_len; i++) {\n+\t\tstruct rte_event *event = &paused_events[i];\n+\t\tuint16_t flow_hash;\n+\n+\t\tflow_hash = dsw_flow_id_hash(event->flow_id);\n+\n+\t\tif (event->queue_id == queue_id &&\n+\t\t    flow_hash == paused_flow_hash)\n+\t\t\tdsw_port_buffer_non_paused(dsw, source_port,\n+\t\t\t\t\t\t   dest_port_id, event);\n+\t\telse\n+\t\t\tdsw_port_buffer_paused(source_port, event);\n+\t}\n+}\n+\n+static void\n+dsw_port_migration_stats(struct dsw_port *port)\n+{\n+\tuint64_t migration_latency;\n+\n+\tmigration_latency = (rte_get_timer_cycles() - port->migration_start);\n+\tport->migration_latency += migration_latency;\n+\tport->migrations++;\n+}\n+\n+static void\n+dsw_port_end_migration(struct dsw_evdev *dsw, struct dsw_port *port)\n+{\n+\tuint8_t queue_id = port->migration_target_qf.queue_id;\n+\tuint16_t flow_hash = port->migration_target_qf.flow_hash;\n+\n+\tport->migration_state = DSW_MIGRATION_STATE_IDLE;\n+\tport->seen_events_len = 0;\n+\n+\tdsw_port_migration_stats(port);\n+\n+\tif (dsw->queues[queue_id].schedule_type != RTE_SCHED_TYPE_PARALLEL) {\n+\t\tdsw_port_remove_paused_flow(port, queue_id, flow_hash);\n+\t\tdsw_port_flush_paused_events(dsw, port, queue_id, flow_hash);\n+\t}\n+\n+\tDSW_LOG_DP_PORT(DEBUG, port->id, \"Migration completed for queue_id \"\n+\t\t\t\"%d flow_hash %d.\\n\", queue_id, flow_hash);\n+}\n+\n+static void\n+dsw_port_consider_migration(struct dsw_evdev *dsw,\n+\t\t\t    struct dsw_port *source_port,\n+\t\t\t    uint64_t now)\n+{\n+\tbool any_port_below_limit;\n+\tstruct dsw_queue_flow *seen_events = source_port->seen_events;\n+\tuint16_t seen_events_len = source_port->seen_events_len;\n+\tstruct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];\n+\tuint16_t num_bursts;\n+\tint16_t source_port_load;\n+\tint16_t port_loads[dsw->num_ports];\n+\n+\tif (now < source_port->next_migration)\n+\t\treturn;\n+\n+\tif (dsw->num_ports == 1)\n+\t\treturn;\n+\n+\tDSW_LOG_DP_PORT(DEBUG, source_port->id, \"Considering migration.\\n\");\n+\n+\t/* Randomize interval to avoid having all threads considering\n+\t * migration at the same in point in time, which might lead to\n+\t * all choosing the same target port.\n+\t */\n+\tsource_port->next_migration = now +\n+\t\tsource_port->migration_interval / 2 +\n+\t\trte_rand() % source_port->migration_interval;\n+\n+\tif (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {\n+\t\tDSW_LOG_DP_PORT(DEBUG, source_port->id,\n+\t\t\t\t\"Migration already in progress.\\n\");\n+\t\treturn;\n+\t}\n+\n+\t/* For simplicity, avoid migration in the unlikely case there\n+\t * is still events to consume in the in_buffer (from the last\n+\t * migration).\n+\t */\n+\tif (source_port->in_buffer_len > 0) {\n+\t\tDSW_LOG_DP_PORT(DEBUG, source_port->id, \"There are still \"\n+\t\t\t\t\"events in the input buffer.\\n\");\n+\t\treturn;\n+\t}\n+\n+\tsource_port_load = rte_atomic16_read(&source_port->load);\n+\tif (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {\n+\t\tDSW_LOG_DP_PORT(DEBUG, source_port->id,\n+\t\t\t\t\"Load %d is below threshold level %d.\\n\",\n+\t\t\t\tDSW_LOAD_TO_PERCENT(source_port_load),\n+\t\t       DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));\n+\t\treturn;\n+\t}\n+\n+\t/* Avoid starting any expensive operations (sorting etc), in\n+\t * case of a scenario with all ports above the load limit.\n+\t */\n+\tany_port_below_limit =\n+\t\tdsw_retrieve_port_loads(dsw, port_loads,\n+\t\t\t\t\tDSW_MAX_TARGET_LOAD_FOR_MIGRATION);\n+\tif (!any_port_below_limit) {\n+\t\tDSW_LOG_DP_PORT(DEBUG, source_port->id,\n+\t\t\t\t\"Candidate target ports are all too highly \"\n+\t\t\t\t\"loaded.\\n\");\n+\t\treturn;\n+\t}\n+\n+\t/* Sort flows into 'bursts' to allow attempting to migrating\n+\t * small (but still active) flows first - this it to avoid\n+\t * having large flows moving around the worker cores too much\n+\t * (to avoid cache misses, among other things). Of course, the\n+\t * number of recorded events (queue+flow ids) are limited, and\n+\t * provides only a snapshot, so only so many conclusions can\n+\t * be drawn from this data.\n+\t */\n+\tnum_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,\n+\t\t\t\t\t    bursts);\n+\t/* For non-big-little systems, there's no point in moving the\n+\t * only (known) flow.\n+\t */\n+\tif (num_bursts < 2) {\n+\t\tDSW_LOG_DP_PORT(DEBUG, source_port->id, \"Only a single flow \"\n+\t\t\t\t\"queue_id %d flow_hash %d has been seen.\\n\",\n+\t\t\t\tbursts[0].queue_flow.queue_id,\n+\t\t\t\tbursts[0].queue_flow.flow_hash);\n+\t\treturn;\n+\t}\n+\n+\t/* The strategy is to first try to find a flow to move to a\n+\t * port with low load (below the migration-attempt\n+\t * threshold). If that fails, we try to find a port which is\n+\t * below the max threshold, and also less loaded than this\n+\t * port is.\n+\t */\n+\tif (!dsw_select_migration_target(dsw, source_port, bursts, num_bursts,\n+\t\t\t\t\t port_loads,\n+\t\t\t\t\t DSW_MIN_SOURCE_LOAD_FOR_MIGRATION,\n+\t\t\t\t\t &source_port->migration_target_qf,\n+\t\t\t\t\t &source_port->migration_target_port_id)\n+\t    &&\n+\t    !dsw_select_migration_target(dsw, source_port, bursts, num_bursts,\n+\t\t\t\t\t port_loads,\n+\t\t\t\t\t DSW_MAX_TARGET_LOAD_FOR_MIGRATION,\n+\t\t\t\t\t &source_port->migration_target_qf,\n+\t\t\t\t       &source_port->migration_target_port_id))\n+\t\treturn;\n+\n+\tDSW_LOG_DP_PORT(DEBUG, source_port->id, \"Migrating queue_id %d \"\n+\t\t\t\"flow_hash %d from port %d to port %d.\\n\",\n+\t\t\tsource_port->migration_target_qf.queue_id,\n+\t\t\tsource_port->migration_target_qf.flow_hash,\n+\t\t\tsource_port->id, source_port->migration_target_port_id);\n+\n+\t/* We have a winner. */\n+\n+\tsource_port->migration_state = DSW_MIGRATION_STATE_PAUSING;\n+\tsource_port->migration_start = rte_get_timer_cycles();\n+\n+\t/* No need to go through the whole pause procedure for\n+\t * parallel queues, since atomic/ordered semantics need not to\n+\t * be maintained.\n+\t */\n+\n+\tif (dsw->queues[source_port->migration_target_qf.queue_id].schedule_type\n+\t    == RTE_SCHED_TYPE_PARALLEL) {\n+\t\tuint8_t queue_id = source_port->migration_target_qf.queue_id;\n+\t\tuint16_t flow_hash = source_port->migration_target_qf.flow_hash;\n+\t\tuint8_t dest_port_id = source_port->migration_target_port_id;\n+\n+\t\t/* Single byte-sized stores are always atomic. */\n+\t\tdsw->queues[queue_id].flow_to_port_map[flow_hash] =\n+\t\t\tdest_port_id;\n+\t\trte_smp_wmb();\n+\n+\t\tdsw_port_end_migration(dsw, source_port);\n+\n+\t\treturn;\n+\t}\n+\n+\t/* There might be 'loopback' events already scheduled in the\n+\t * output buffers.\n+\t */\n+\tdsw_port_flush_out_buffers(dsw, source_port);\n+\n+\tdsw_port_add_paused_flow(source_port,\n+\t\t\t\t source_port->migration_target_qf.queue_id,\n+\t\t\t\t source_port->migration_target_qf.flow_hash);\n+\n+\tdsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,\n+\t\t\t       source_port->migration_target_qf.queue_id,\n+\t\t\t       source_port->migration_target_qf.flow_hash);\n+\tsource_port->cfm_cnt = 0;\n+}\n+\n+static void\n+dsw_port_flush_paused_events(struct dsw_evdev *dsw,\n+\t\t\t     struct dsw_port *source_port,\n+\t\t\t     uint8_t queue_id, uint16_t paused_flow_hash);\n+\n+static void\n+dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port,\n+\t\t\t     uint8_t originating_port_id, uint8_t queue_id,\n+\t\t\t     uint16_t paused_flow_hash)\n+{\n+\tstruct dsw_ctl_msg cfm = {\n+\t\t.type = DSW_CTL_CFM,\n+\t\t.originating_port_id = port->id,\n+\t\t.queue_id = queue_id,\n+\t\t.flow_hash = paused_flow_hash\n+\t};\n+\n+\tDSW_LOG_DP_PORT(DEBUG, port->id, \"Un-pausing queue_id %d flow_hash %d.\\n\",\n+\t\t\tqueue_id, paused_flow_hash);\n+\n+\tdsw_port_remove_paused_flow(port, queue_id, paused_flow_hash);\n+\n+\trte_smp_rmb();\n+\n+\tdsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);\n+\n+\tdsw_port_flush_paused_events(dsw, port, queue_id, paused_flow_hash);\n+}\n+\n+#define FORWARD_BURST_SIZE (32)\n+\n+static void\n+dsw_port_forward_migrated_flow(struct dsw_port *source_port,\n+\t\t\t       struct rte_event_ring *dest_ring,\n+\t\t\t       uint8_t queue_id,\n+\t\t\t       uint16_t flow_hash)\n+{\n+\tuint16_t events_left;\n+\n+\t/* Control ring message should been seen before the ring count\n+\t * is read on the port's in_ring.\n+\t */\n+\trte_smp_rmb();\n+\n+\tevents_left = rte_event_ring_count(source_port->in_ring);\n+\n+\twhile (events_left > 0) {\n+\t\tuint16_t in_burst_size =\n+\t\t\tRTE_MIN(FORWARD_BURST_SIZE, events_left);\n+\t\tstruct rte_event in_burst[in_burst_size];\n+\t\tuint16_t in_len;\n+\t\tuint16_t i;\n+\n+\t\tin_len = rte_event_ring_dequeue_burst(source_port->in_ring,\n+\t\t\t\t\t\t      in_burst,\n+\t\t\t\t\t\t      in_burst_size, NULL);\n+\t\t/* No need to care about bursting forwarded events (to\n+\t\t * the destination port's in_ring), since migration\n+\t\t * doesn't happen very often, and also the majority of\n+\t\t * the dequeued events will likely *not* be forwarded.\n+\t\t */\n+\t\tfor (i = 0; i < in_len; i++) {\n+\t\t\tstruct rte_event *e = &in_burst[i];\n+\t\t\tif (e->queue_id == queue_id &&\n+\t\t\t    dsw_flow_id_hash(e->flow_id) == flow_hash) {\n+\t\t\t\twhile (rte_event_ring_enqueue_burst(dest_ring,\n+\t\t\t\t\t\t\t\t    e, 1,\n+\t\t\t\t\t\t\t\t    NULL) != 1)\n+\t\t\t\t\trte_pause();\n+\t\t\t} else {\n+\t\t\t\tuint16_t last_idx = source_port->in_buffer_len;\n+\t\t\t\tsource_port->in_buffer[last_idx] = *e;\n+\t\t\t\tsource_port->in_buffer_len++;\n+\t\t\t}\n+\t\t}\n+\n+\t\tevents_left -= in_len;\n+\t}\n+}\n+\n+static void\n+dsw_port_move_migrating_flow(struct dsw_evdev *dsw,\n+\t\t\t     struct dsw_port *source_port)\n+{\n+\tuint8_t queue_id = source_port->migration_target_qf.queue_id;\n+\tuint16_t flow_hash = source_port->migration_target_qf.flow_hash;\n+\tuint8_t dest_port_id = source_port->migration_target_port_id;\n+\tstruct dsw_port *dest_port = &dsw->ports[dest_port_id];\n+\n+\tdsw_port_flush_out_buffers(dsw, source_port);\n+\n+\trte_smp_wmb();\n+\n+\tdsw->queues[queue_id].flow_to_port_map[flow_hash] =\n+\t\tdest_port_id;\n+\n+\tdsw_port_forward_migrated_flow(source_port, dest_port->in_ring,\n+\t\t\t\t       queue_id, flow_hash);\n+\n+\t/* Flow table update and migration destination port's enqueues\n+\t * must be seen before the control message.\n+\t */\n+\trte_smp_wmb();\n+\n+\tdsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, queue_id,\n+\t\t\t       flow_hash);\n+\tsource_port->cfm_cnt = 0;\n+\tsource_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;\n+}\n+\n+static void\n+dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)\n+{\n+\tport->cfm_cnt++;\n+\n+\tif (port->cfm_cnt == (dsw->num_ports-1)) {\n+\t\tswitch (port->migration_state) {\n+\t\tcase DSW_MIGRATION_STATE_PAUSING:\n+\t\t\tDSW_LOG_DP_PORT(DEBUG, port->id, \"Going into forwarding \"\n+\t\t\t\t\t\"migration state.\\n\");\n+\t\t\tport->migration_state = DSW_MIGRATION_STATE_FORWARDING;\n+\t\t\tbreak;\n+\t\tcase DSW_MIGRATION_STATE_UNPAUSING:\n+\t\t\tdsw_port_end_migration(dsw, port);\n+\t\t\tbreak;\n+\t\tdefault:\n+\t\t\tRTE_ASSERT(0);\n+\t\t\tbreak;\n+\t\t}\n+\t}\n+}\n+\n+static void\n+dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)\n+{\n+\tstruct dsw_ctl_msg msg;\n+\n+\t/* So any table loads happens before the ring dequeue, in the\n+\t * case of a 'paus' message.\n+\t */\n+\trte_smp_rmb();\n+\n+\tif (dsw_port_ctl_dequeue(port, &msg) == 0) {\n+\t\tswitch (msg.type) {\n+\t\tcase DSW_CTL_PAUS_REQ:\n+\t\t\tdsw_port_handle_pause_flow(dsw, port,\n+\t\t\t\t\t\t   msg.originating_port_id,\n+\t\t\t\t\t\t   msg.queue_id, msg.flow_hash);\n+\t\t\tbreak;\n+\t\tcase DSW_CTL_UNPAUS_REQ:\n+\t\t\tdsw_port_handle_unpause_flow(dsw, port,\n+\t\t\t\t\t\t     msg.originating_port_id,\n+\t\t\t\t\t\t     msg.queue_id,\n+\t\t\t\t\t\t     msg.flow_hash);\n+\t\t\tbreak;\n+\t\tcase DSW_CTL_CFM:\n+\t\t\tdsw_port_handle_confirm(dsw, port);\n+\t\t\tbreak;\n+\t\t}\n+\t}\n+}\n+\n+static void\n+dsw_port_note_op(struct dsw_port *port, uint16_t num_events)\n+{\n+\t/* To pull the control ring reasonbly often on busy ports,\n+\t * each dequeued/enqueued event is considered an 'op' too.\n+\t */\n+\tport->ops_since_bg_task += (num_events+1);\n+}\n+\n+static void\n+dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)\n+{\n+\tif (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&\n+\t\t     port->pending_releases == 0))\n+\t\tdsw_port_move_migrating_flow(dsw, port);\n+\n+\t/* Polling the control ring is relatively inexpensive, and\n+\t * polling it often helps bringing down migration latency, so\n+\t * do this for every iteration.\n+\t */\n+\tdsw_port_ctl_process(dsw, port);\n+\n+\t/* To avoid considering migration and flushing output buffers,\n+\t * and polling control rings on every dequeue/enqueue call,\n+\t * the scheduler only performs such 'background' tasks every\n+\t * nth (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.\n+\t */\n+\tif (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {\n+\t\tuint64_t now;\n+\n+\t\tnow = rte_get_timer_cycles();\n+\n+\t\tport->last_bg = now;\n+\n+\t\t/* Logic to avoid having events linger in the output\n+\t\t * buffer too long.\n+\t\t */\n+\t\tdsw_port_flush_out_buffers(dsw, port);\n+\n+\t\tdsw_port_consider_load_update(port, now);\n+\n+\t\tdsw_port_consider_migration(dsw, port, now);\n+\n+\t\tport->ops_since_bg_task = 0;\n+\t}\n+}\n+\n+static void\n+dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)\n+{\n+\tuint16_t dest_port_id;\n+\n+\tfor (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)\n+\t\tdsw_port_transmit_buffered(dsw, source_port, dest_port_id);\n+}\n+\n+uint16_t\n+dsw_event_enqueue(void *port, const struct rte_event *ev)\n+{\n+\treturn dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);\n+}\n+\n+static __rte_always_inline uint16_t\n+dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],\n+\t\t\t\tuint16_t events_len, bool op_types_known,\n+\t\t\t\tuint16_t num_new, uint16_t num_release,\n+\t\t\t\tuint16_t num_non_release)\n+{\n+\tstruct dsw_port *source_port = port;\n+\tstruct dsw_evdev *dsw = source_port->dsw;\n+\tbool enough_credits;\n+\tuint16_t i;\n+\n+\tDSW_LOG_DP_PORT(DEBUG, source_port->id, \"Attempting to enqueue %d \"\n+\t\t\t\"events to port %d.\\n\", events_len, source_port->id);\n+\n+\tdsw_port_bg_process(dsw, source_port);\n+\n+\t/* XXX: For performance (=ring efficiency) reasons, the\n+\t * scheduler relies on internal non-ring buffers instead of\n+\t * immediately sending the event to the destination ring. For\n+\t * a producer that doesn't intend to produce or consume any\n+\t * more events, the scheduler provides a way to flush the\n+\t * buffer, by means of doing an enqueue of zero events. In\n+\t * addition, a port cannot be left \"unattended\" (e.g. unused)\n+\t * for long periods of time, since that would stall\n+\t * migration. Eventdev API extensions to provide a cleaner way\n+\t * to archieve both of these functions should be\n+\t * considered.\n+\t */\n+\tif (unlikely(events_len == 0)) {\n+\t\tdsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);\n+\t\tdsw_port_flush_out_buffers(dsw, source_port);\n+\t\treturn 0;\n+\t}\n+\n+\tif (unlikely(events_len > source_port->enqueue_depth))\n+\t\tevents_len = source_port->enqueue_depth;\n+\n+\tdsw_port_note_op(source_port, events_len);\n+\n+\tif (!op_types_known)\n+\t\tfor (i = 0; i < events_len; i++) {\n+\t\t\tswitch (events[i].op) {\n+\t\t\tcase RTE_EVENT_OP_RELEASE:\n+\t\t\t\tnum_release++;\n+\t\t\t\tbreak;\n+\t\t\tcase RTE_EVENT_OP_NEW:\n+\t\t\t\tnum_new++;\n+\t\t\t\t/* Falls through. */\n+\t\t\tdefault:\n+\t\t\t\tnum_non_release++;\n+\t\t\t\tbreak;\n+\t\t\t}\n+\t\t}\n+\n+\t/* Technically, we could allow the non-new events up to the\n+\t * first new event in the array into the system, but for\n+\t * simplicity reasons, we deny the whole burst if the port is\n+\t * above the water mark.\n+\t */\n+\tif (unlikely(num_new > 0 && rte_atomic32_read(&dsw->credits_on_loan) >\n+\t\t     source_port->new_event_threshold))\n+\t\treturn 0;\n+\n+\tenough_credits = dsw_port_acquire_credits(dsw, source_port,\n+\t\t\t\t\t\t  num_non_release);\n+\tif (unlikely(!enough_credits))\n+\t\treturn 0;\n+\n+\tsource_port->pending_releases -= num_release;\n+\n+\tdsw_port_enqueue_stats(source_port, num_new,\n+\t\t\t       num_non_release-num_new, num_release);\n+\n+\tfor (i = 0; i < events_len; i++) {\n+\t\tconst struct rte_event *event = &events[i];\n+\n+\t\tif (likely(num_release == 0 ||\n+\t\t\t   event->op != RTE_EVENT_OP_RELEASE))\n+\t\t\tdsw_port_buffer_event(dsw, source_port, event);\n+\t\tdsw_port_queue_enqueue_stats(source_port, event->queue_id);\n+\t}\n+\n+\tDSW_LOG_DP_PORT(DEBUG, source_port->id, \"%d non-release events \"\n+\t\t\t\"accepted.\\n\", num_non_release);\n+\n+\treturn num_non_release;\n+}\n+\n+uint16_t\n+dsw_event_enqueue_burst(void *port, const struct rte_event events[],\n+\t\t\tuint16_t events_len)\n+{\n+\treturn dsw_event_enqueue_burst_generic(port, events, events_len, false,\n+\t\t\t\t\t       0, 0, 0);\n+}\n+\n+uint16_t\n+dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],\n+\t\t\t    uint16_t events_len)\n+{\n+\treturn dsw_event_enqueue_burst_generic(port, events, events_len, true,\n+\t\t\t\t\t       events_len, 0, events_len);\n+}\n+\n+uint16_t\n+dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],\n+\t\t\t\tuint16_t events_len)\n+{\n+\treturn dsw_event_enqueue_burst_generic(port, events, events_len, true,\n+\t\t\t\t\t       0, 0, events_len);\n+}\n+\n+uint16_t\n+dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)\n+{\n+\treturn dsw_event_dequeue_burst(port, events, 1, wait);\n+}\n+\n+static void\n+dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,\n+\t\t\t    uint16_t num)\n+{\n+\tuint16_t i;\n+\n+\tdsw_port_dequeue_stats(port, num);\n+\n+\tfor (i = 0; i < num; i++) {\n+\t\tuint16_t l_idx = port->seen_events_idx;\n+\t\tstruct dsw_queue_flow *qf = &port->seen_events[l_idx];\n+\t\tstruct rte_event *event = &events[i];\n+\t\tqf->queue_id = event->queue_id;\n+\t\tqf->flow_hash = dsw_flow_id_hash(event->flow_id);\n+\n+\t\tport->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;\n+\n+\t\tdsw_port_queue_dequeued_stats(port, event->queue_id);\n+\t}\n+\n+\tif (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))\n+\t\tport->seen_events_len =\n+\t\t\tRTE_MIN(port->seen_events_len + num,\n+\t\t\t\tDSW_MAX_EVENTS_RECORDED);\n+}\n+\n+#ifdef DSW_SORT_DEQUEUED\n+\n+#define DSW_EVENT_TO_INT(_event)\t\t\t\t\\\n+\t((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))\n+\n+static inline int\n+dsw_cmp_event(const void *v_event_a, const void *v_event_b)\n+{\n+\tconst struct rte_event *event_a = v_event_a;\n+\tconst struct rte_event *event_b = v_event_b;\n+\n+\treturn DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);\n+}\n+#endif\n+\n+static uint16_t\n+dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,\n+\t\t       uint16_t num)\n+{\n+\tstruct dsw_port *source_port = port;\n+\tstruct dsw_evdev *dsw = source_port->dsw;\n+\n+\tdsw_port_ctl_process(dsw, source_port);\n+\n+\tif (unlikely(port->in_buffer_len > 0)) {\n+\t\tuint16_t dequeued = RTE_MIN(num, port->in_buffer_len);\n+\n+\t\trte_memcpy(events, &port->in_buffer[port->in_buffer_start],\n+\t\t\t   dequeued * sizeof(struct rte_event));\n+\n+\t\tport->in_buffer_start += dequeued;\n+\t\tport->in_buffer_len -= dequeued;\n+\n+\t\tif (port->in_buffer_len == 0)\n+\t\t\tport->in_buffer_start = 0;\n+\n+\t\treturn dequeued;\n+\t}\n+\n+\treturn rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);\n+}\n+\n+uint16_t\n+dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,\n+\t\t\tuint64_t wait __rte_unused)\n+{\n+\tstruct dsw_port *source_port = port;\n+\tstruct dsw_evdev *dsw = source_port->dsw;\n+\tuint16_t dequeued;\n+\n+\tsource_port->pending_releases = 0;\n+\n+\tdsw_port_bg_process(dsw, source_port);\n+\n+\tif (unlikely(num > source_port->dequeue_depth))\n+\t\tnum = source_port->dequeue_depth;\n+\n+\tdequeued = dsw_port_dequeue_burst(source_port, events, num);\n+\n+\tsource_port->pending_releases = dequeued;\n+\n+\tdsw_port_load_record(source_port, dequeued);\n+\n+\tdsw_port_note_op(source_port, dequeued);\n+\n+\tif (dequeued > 0) {\n+\t\tDSW_LOG_DP_PORT(DEBUG, source_port->id, \"Dequeued %d events.\\n\",\n+\t\t\t\tdequeued);\n+\n+\t\tdsw_port_return_credits(dsw, source_port, dequeued);\n+\n+\t\t/* One potential optimization one might think of is to\n+\t\t * add a migration state (prior to 'pausing'), and\n+\t\t * only record seen events when the port is in this\n+\t\t * state (and transit to 'pausing' when enough events\n+\t\t * have been gathered). However, that schema doesn't\n+\t\t * seem to improve performance.\n+\t\t */\n+\t\tdsw_port_record_seen_events(port, events, dequeued);\n+\t}\n+\t/* XXX: Assuming the port can't produce any more work,\n+\t *\tconsider flushing the output buffer, on dequeued ==\n+\t *\t0.\n+\t */\n+\n+#ifdef DSW_SORT_DEQUEUED\n+\tdsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);\n+#endif\n+\n+\treturn dequeued;\n+}\n+\n+void dsw_event_schedule(struct rte_eventdev *dev __rte_unused)\n+{\n+}\ndiff --git a/drivers/event/dsw/dsw_sort.h b/drivers/event/dsw/dsw_sort.h\nnew file mode 100644\nindex 000000000..609767fdf\n--- /dev/null\n+++ b/drivers/event/dsw/dsw_sort.h\n@@ -0,0 +1,48 @@\n+/* SPDX-License-Identifier: BSD-3-Clause\n+ * Copyright(c) 2018 Ericsson AB\n+ */\n+\n+#ifndef _DSW_SORT_\n+#define _DSW_SORT_\n+\n+#include <string.h>\n+\n+#include <rte_common.h>\n+\n+#define DSW_ARY_ELEM_PTR(_ary, _idx, _elem_size)\t\\\n+\tRTE_PTR_ADD(_ary, (_idx) * (_elem_size))\n+\n+#define DSW_ARY_ELEM_SWAP(_ary, _a_idx, _b_idx, _elem_size)\t\t\\\n+\tdo {\t\t\t\t\t\t\t\t\\\n+\t\tchar tmp[_elem_size];\t\t\t\t\t\\\n+\t\tvoid *_a_ptr = DSW_ARY_ELEM_PTR(_ary, _a_idx, _elem_size); \\\n+\t\tvoid *_b_ptr = DSW_ARY_ELEM_PTR(_ary, _b_idx, _elem_size); \\\n+\t\tmemcpy(tmp, _a_ptr, _elem_size);\t\t\t\\\n+\t\tmemcpy(_a_ptr, _b_ptr, _elem_size);\t\t\t\\\n+\t\tmemcpy(_b_ptr, tmp, _elem_size);\t\t\t\\\n+\t} while (0)\n+\n+static inline void\n+dsw_insertion_sort(void *ary, uint16_t len, uint16_t elem_size,\n+\t\t   int (*cmp_fn)(const void *, const void *))\n+{\n+\tuint16_t i;\n+\n+\tfor (i = 1; i < len; i++) {\n+\t\tuint16_t j;\n+\t\tfor (j = i; j > 0 &&\n+\t\t\t     cmp_fn(DSW_ARY_ELEM_PTR(ary, j-1, elem_size),\n+\t\t\t\t    DSW_ARY_ELEM_PTR(ary, j, elem_size)) > 0;\n+\t\t     j--)\n+\t\t\tDSW_ARY_ELEM_SWAP(ary, j, j-1, elem_size);\n+\t}\n+}\n+\n+static inline void\n+dsw_stable_sort(void *ary, uint16_t len, uint16_t elem_size,\n+\t\tint (*cmp_fn)(const void *, const void *))\n+{\n+\tdsw_insertion_sort(ary, len, elem_size, cmp_fn);\n+}\n+\n+#endif\ndiff --git a/drivers/event/dsw/dsw_xstats.c b/drivers/event/dsw/dsw_xstats.c\nnew file mode 100644\nindex 000000000..df81229d7\n--- /dev/null\n+++ b/drivers/event/dsw/dsw_xstats.c\n@@ -0,0 +1,285 @@\n+/* SPDX-License-Identifier: BSD-3-Clause\n+ * Copyright(c) 2018 Ericsson AB\n+ */\n+\n+#include \"dsw_evdev.h\"\n+#include <string.h>\n+#include <rte_debug.h>\n+\n+/* The high bits in the xstats id is used to store an additional\n+ * parameter (beyond the queue or port id already in the xstats\n+ * interface).\n+ */\n+#define DSW_XSTATS_ID_PARAM_BITS (8)\n+#define DSW_XSTATS_ID_STAT_BITS\t\t\t\t\t\\\n+\t(sizeof(unsigned int)*CHAR_BIT - DSW_XSTATS_ID_PARAM_BITS)\n+#define DSW_XSTATS_ID_STAT_MASK ((1 << DSW_XSTATS_ID_STAT_BITS) - 1)\n+\n+#define DSW_XSTATS_ID_GET_PARAM(id)\t\t\\\n+\t((id)>>DSW_XSTATS_ID_STAT_BITS)\n+\n+#define DSW_XSTATS_ID_GET_STAT(id)\t\t\\\n+\t((id) & DSW_XSTATS_ID_STAT_MASK)\n+\n+#define DSW_XSTATS_ID_CREATE(id, param_value)\t\t\t\\\n+\t(((param_value) << DSW_XSTATS_ID_STAT_BITS) | id)\n+\n+typedef\n+uint64_t (*dsw_xstats_dev_get_value_fn)(struct dsw_evdev *dsw);\n+\n+struct dsw_xstat_dev {\n+\tconst char *name;\n+\tdsw_xstats_dev_get_value_fn get_value_fn;\n+};\n+\n+typedef\n+uint64_t (*dsw_xstats_port_get_value_fn)(struct dsw_evdev *dsw,\n+\t\t\t\t\t uint8_t port_id, uint8_t queue_id);\n+\n+struct dsw_xstats_port {\n+\tconst char *name_fmt;\n+\tdsw_xstats_port_get_value_fn get_value_fn;\n+\tbool per_queue;\n+};\n+\n+static uint64_t\n+dsw_xstats_dev_credits_on_loan(struct dsw_evdev *dsw)\n+{\n+\treturn rte_atomic32_read(&dsw->credits_on_loan);\n+}\n+\n+static struct dsw_xstat_dev dsw_dev_xstats[] = {\n+\t{ \"dev_credits_on_loan\", dsw_xstats_dev_credits_on_loan }\n+};\n+\n+#define DSW_GEN_PORT_ACCESS_FN(_variable)\t\t\t\t\\\n+\tstatic uint64_t\t\t\t\t\t\t\t\\\n+\tdsw_xstats_port_get_ ## _variable(struct dsw_evdev *dsw,\t\\\n+\t\t\t\t\t  uint8_t port_id,\t\t\\\n+\t\t\t\t\t  uint8_t queue_id __rte_unused) \\\n+\t{\t\t\t\t\t\t\t\t\\\n+\t\treturn dsw->ports[port_id]._variable;\t\t\t\\\n+\t}\n+\n+DSW_GEN_PORT_ACCESS_FN(new_enqueued)\n+DSW_GEN_PORT_ACCESS_FN(forward_enqueued)\n+DSW_GEN_PORT_ACCESS_FN(release_enqueued)\n+\n+static uint64_t\n+dsw_xstats_port_get_queue_enqueued(struct dsw_evdev *dsw, uint8_t port_id,\n+\t\t\t\t   uint8_t queue_id)\n+{\n+\treturn dsw->ports[port_id].queue_enqueued[queue_id];\n+}\n+\n+DSW_GEN_PORT_ACCESS_FN(dequeued)\n+\n+static uint64_t\n+dsw_xstats_port_get_queue_dequeued(struct dsw_evdev *dsw, uint8_t port_id,\n+\t\t\t\t   uint8_t queue_id)\n+{\n+\treturn dsw->ports[port_id].queue_dequeued[queue_id];\n+}\n+\n+DSW_GEN_PORT_ACCESS_FN(migrations)\n+\n+static uint64_t\n+dsw_xstats_port_get_migration_latency(struct dsw_evdev *dsw, uint8_t port_id,\n+\t\t\t\t      uint8_t queue_id __rte_unused)\n+{\n+\tuint64_t total_latency = dsw->ports[port_id].migration_latency;\n+\tuint64_t num_migrations = dsw->ports[port_id].migrations;\n+\n+\treturn num_migrations > 0 ? total_latency / num_migrations : 0;\n+}\n+\n+static uint64_t\n+dsw_xstats_port_get_event_proc_latency(struct dsw_evdev *dsw, uint8_t port_id,\n+\t\t\t\t       uint8_t queue_id __rte_unused)\n+{\n+\tuint64_t total_busy_cycles =\n+\t\tdsw->ports[port_id].total_busy_cycles;\n+\tuint64_t dequeued =\n+\t\tdsw->ports[port_id].dequeued;\n+\n+\treturn dequeued > 0 ? total_busy_cycles / dequeued : 0;\n+}\n+\n+DSW_GEN_PORT_ACCESS_FN(inflight_credits)\n+\n+static uint64_t\n+dsw_xstats_port_get_load(struct dsw_evdev *dsw, uint8_t port_id,\n+\t\t\t uint8_t queue_id __rte_unused)\n+{\n+\tint16_t load;\n+\n+\tload = rte_atomic16_read(&dsw->ports[port_id].load);\n+\n+\treturn DSW_LOAD_TO_PERCENT(load);\n+}\n+\n+DSW_GEN_PORT_ACCESS_FN(last_bg)\n+\n+static struct dsw_xstats_port dsw_port_xstats[] = {\n+\t{ \"port_%u_new_enqueued\", dsw_xstats_port_get_new_enqueued,\n+\t  false },\n+\t{ \"port_%u_forward_enqueued\", dsw_xstats_port_get_forward_enqueued,\n+\t  false },\n+\t{ \"port_%u_release_enqueued\", dsw_xstats_port_get_release_enqueued,\n+\t  false },\n+\t{ \"port_%u_queue_%u_enqueued\", dsw_xstats_port_get_queue_enqueued,\n+\t  true },\n+\t{ \"port_%u_dequeued\", dsw_xstats_port_get_dequeued,\n+\t  false },\n+\t{ \"port_%u_queue_%u_dequeued\", dsw_xstats_port_get_queue_dequeued,\n+\t  true },\n+\t{ \"port_%u_migrations\", dsw_xstats_port_get_migrations,\n+\t  false },\n+\t{ \"port_%u_migration_latency\", dsw_xstats_port_get_migration_latency,\n+\t  false },\n+\t{ \"port_%u_event_proc_latency\", dsw_xstats_port_get_event_proc_latency,\n+\t  false },\n+\t{ \"port_%u_inflight_credits\", dsw_xstats_port_get_inflight_credits,\n+\t  false },\n+\t{ \"port_%u_load\", dsw_xstats_port_get_load,\n+\t  false },\n+\t{ \"port_%u_last_bg\", dsw_xstats_port_get_last_bg,\n+\t  false }\n+};\n+\n+static int\n+dsw_xstats_dev_get_names(struct rte_event_dev_xstats_name *xstats_names,\n+\t\t\t unsigned int *ids, unsigned int size)\n+{\n+\tunsigned int i;\n+\n+\tfor (i = 0; i < RTE_DIM(dsw_dev_xstats) && i < size; i++) {\n+\t\tids[i] = i;\n+\t\tstrcpy(xstats_names[i].name, dsw_dev_xstats[i].name);\n+\t}\n+\n+\treturn i;\n+}\n+\n+static int\n+dsw_xstats_port_get_names(struct dsw_evdev *dsw, uint8_t port_id,\n+\t\t\t  struct rte_event_dev_xstats_name *xstats_names,\n+\t\t\t  unsigned int *ids, unsigned int size)\n+{\n+\tuint8_t queue_id = 0;\n+\tunsigned int id_idx;\n+\tunsigned int stat_idx;\n+\n+\tfor (id_idx = 0, stat_idx = 0;\n+\t     id_idx < size && stat_idx < RTE_DIM(dsw_port_xstats);\n+\t     id_idx++) {\n+\t\tstruct dsw_xstats_port *xstat = &dsw_port_xstats[stat_idx];\n+\n+\t\tif (xstat->per_queue) {\n+\t\t\tids[id_idx] = DSW_XSTATS_ID_CREATE(stat_idx, queue_id);\n+\t\t\tsnprintf(xstats_names[id_idx].name,\n+\t\t\t\t RTE_EVENT_DEV_XSTATS_NAME_SIZE,\n+\t\t\t\t dsw_port_xstats[stat_idx].name_fmt, port_id,\n+\t\t\t\t queue_id);\n+\t\t\tqueue_id++;\n+\t\t} else {\n+\t\t\tids[id_idx] = stat_idx;\n+\t\t\tsnprintf(xstats_names[id_idx].name,\n+\t\t\t\t RTE_EVENT_DEV_XSTATS_NAME_SIZE,\n+\t\t\t\t dsw_port_xstats[stat_idx].name_fmt, port_id);\n+\t\t}\n+\n+\t\tif (!(xstat->per_queue && queue_id < dsw->num_queues)) {\n+\t\t\tstat_idx++;\n+\t\t\tqueue_id = 0;\n+\t\t}\n+\t}\n+\treturn id_idx;\n+}\n+\n+int\n+dsw_xstats_get_names(const struct rte_eventdev *dev,\n+\t\t     enum rte_event_dev_xstats_mode mode,\n+\t\t     uint8_t queue_port_id,\n+\t\t     struct rte_event_dev_xstats_name *xstats_names,\n+\t\t     unsigned int *ids, unsigned int size)\n+{\n+\tstruct dsw_evdev *dsw = dsw_pmd_priv(dev);\n+\n+\tswitch (mode) {\n+\tcase RTE_EVENT_DEV_XSTATS_DEVICE:\n+\t\treturn dsw_xstats_dev_get_names(xstats_names, ids, size);\n+\tcase RTE_EVENT_DEV_XSTATS_PORT:\n+\t\treturn dsw_xstats_port_get_names(dsw, queue_port_id,\n+\t\t\t\t\t\t xstats_names, ids, size);\n+\tcase RTE_EVENT_DEV_XSTATS_QUEUE:\n+\t\treturn 0;\n+\tdefault:\n+\t\tRTE_ASSERT(false);\n+\t\treturn -1;\n+\t}\n+}\n+\n+static int\n+dsw_xstats_dev_get(const struct rte_eventdev *dev,\n+\t\t   const unsigned int ids[], uint64_t values[], unsigned int n)\n+{\n+\tstruct dsw_evdev *dsw = dsw_pmd_priv(dev);\n+\tunsigned int i;\n+\n+\tfor (i = 0; i < n; i++) {\n+\t\tunsigned int id = ids[i];\n+\t\tstruct dsw_xstat_dev *xstat = &dsw_dev_xstats[id];\n+\t\tvalues[i] = xstat->get_value_fn(dsw);\n+\t}\n+\treturn n;\n+}\n+\n+static int\n+dsw_xstats_port_get(const struct rte_eventdev *dev, uint8_t port_id,\n+\t\t    const unsigned int ids[], uint64_t values[], unsigned int n)\n+{\n+\tstruct dsw_evdev *dsw = dsw_pmd_priv(dev);\n+\tunsigned int i;\n+\n+\tfor (i = 0; i < n; i++) {\n+\t\tunsigned int id = ids[i];\n+\t\tunsigned int stat_idx = DSW_XSTATS_ID_GET_STAT(id);\n+\t\tstruct dsw_xstats_port *xstat = &dsw_port_xstats[stat_idx];\n+\t\tuint8_t queue_id = 0;\n+\n+\t\tif (xstat->per_queue)\n+\t\t\tqueue_id = DSW_XSTATS_ID_GET_PARAM(id);\n+\n+\t\tvalues[i] = xstat->get_value_fn(dsw, port_id, queue_id);\n+\t}\n+\treturn n;\n+}\n+\n+int\n+dsw_xstats_get(const struct rte_eventdev *dev,\n+\t       enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id,\n+\t       const unsigned int ids[], uint64_t values[], unsigned int n)\n+{\n+\tswitch (mode) {\n+\tcase RTE_EVENT_DEV_XSTATS_DEVICE:\n+\t\treturn dsw_xstats_dev_get(dev, ids, values, n);\n+\tcase RTE_EVENT_DEV_XSTATS_PORT:\n+\t\treturn dsw_xstats_port_get(dev, queue_port_id, ids, values, n);\n+\tcase RTE_EVENT_DEV_XSTATS_QUEUE:\n+\t\treturn 0;\n+\tdefault:\n+\t\tRTE_ASSERT(false);\n+\t\treturn -1;\n+\t}\n+\treturn 0;\n+}\n+\n+uint64_t dsw_xstats_get_by_name(const struct rte_eventdev *dev,\n+\t\t\t\tconst char *name, unsigned int *id)\n+{\n+\tRTE_SET_USED(dev);\n+\tRTE_SET_USED(name);\n+\tRTE_SET_USED(id);\n+\treturn 0;\n+}\ndiff --git a/drivers/event/dsw/meson.build b/drivers/event/dsw/meson.build\nnew file mode 100644\nindex 000000000..02dd6e2ae\n--- /dev/null\n+++ b/drivers/event/dsw/meson.build\n@@ -0,0 +1,8 @@\n+# SPDX-License-Identifier: BSD-3-Clause\n+# Copyright(c) 2018 Ericsson AB\n+\n+allow_experimental_apis = true\n+deps += ['bus_vdev']\n+sources = files('dsw_evdev.c',\n+\t\t'dsw_event.c',\n+\t\t'dsw_xstats.c')\ndiff --git a/drivers/event/dsw/rte_pmd_dsw_event_version.map b/drivers/event/dsw/rte_pmd_dsw_event_version.map\nnew file mode 100644\nindex 000000000..24bd5cdb3\n--- /dev/null\n+++ b/drivers/event/dsw/rte_pmd_dsw_event_version.map\n@@ -0,0 +1,3 @@\n+DPDK_18.11 {\n+\tlocal: *;\n+};\ndiff --git a/drivers/event/meson.build b/drivers/event/meson.build\nindex e95119935..4b9fed22b 100644\n--- a/drivers/event/meson.build\n+++ b/drivers/event/meson.build\n@@ -1,7 +1,7 @@\n # SPDX-License-Identifier: BSD-3-Clause\n # Copyright(c) 2017 Intel Corporation\n \n-drivers = ['dpaa', 'dpaa2', 'octeontx', 'skeleton', 'sw']\n+drivers = ['dpaa', 'dpaa2', 'octeontx', 'skeleton', 'sw', 'dsw']\n std_deps = ['eventdev', 'kvargs']\n config_flag_fmt = 'RTE_LIBRTE_@0@_EVENTDEV_PMD'\n driver_name_fmt = 'rte_pmd_@0@_event'\ndiff --git a/mk/rte.app.mk b/mk/rte.app.mk\nindex de33883be..682e224eb 100644\n--- a/mk/rte.app.mk\n+++ b/mk/rte.app.mk\n@@ -236,6 +236,7 @@ endif # CONFIG_RTE_LIBRTE_COMPRESSDEV\n ifeq ($(CONFIG_RTE_LIBRTE_EVENTDEV),y)\n _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_SKELETON_EVENTDEV) += -lrte_pmd_skeleton_event\n _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += -lrte_pmd_sw_event\n+_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += -lrte_pmd_dsw_event\n _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF) += -lrte_pmd_octeontx_ssovf\n ifeq ($(CONFIG_RTE_LIBRTE_DPAA_BUS),y)\n _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_DPAA_EVENTDEV) += -lrte_pmd_dpaa_event\n",
    "prefixes": [
        "RFC",
        "v2",
        "1/1"
    ]
}