get:
Show a patch.

patch:
Update a patch.

put:
Update a patch.

GET /api/patches/44561/?format=api
HTTP 200 OK
Allow: GET, PUT, PATCH, HEAD, OPTIONS
Content-Type: application/json
Vary: Accept

{
    "id": 44561,
    "url": "http://patches.dpdk.org/api/patches/44561/?format=api",
    "web_url": "http://patches.dpdk.org/project/dpdk/patch/20180911080216.3017-6-mattias.ronnblom@ericsson.com/",
    "project": {
        "id": 1,
        "url": "http://patches.dpdk.org/api/projects/1/?format=api",
        "name": "DPDK",
        "link_name": "dpdk",
        "list_id": "dev.dpdk.org",
        "list_email": "dev@dpdk.org",
        "web_url": "http://core.dpdk.org",
        "scm_url": "git://dpdk.org/dpdk",
        "webscm_url": "http://git.dpdk.org/dpdk",
        "list_archive_url": "https://inbox.dpdk.org/dev",
        "list_archive_url_format": "https://inbox.dpdk.org/dev/{}",
        "commit_url_format": ""
    },
    "msgid": "<20180911080216.3017-6-mattias.ronnblom@ericsson.com>",
    "list_archive_url": "https://inbox.dpdk.org/dev/20180911080216.3017-6-mattias.ronnblom@ericsson.com",
    "date": "2018-09-11T08:02:11",
    "name": "[v3,05/10] event/dsw: add DSW event scheduling and device start/stop",
    "commit_ref": null,
    "pull_url": null,
    "state": "changes-requested",
    "archived": true,
    "hash": "f019aaa6fbac782d450efe09d1698aafe76d5f8b",
    "submitter": {
        "id": 1077,
        "url": "http://patches.dpdk.org/api/people/1077/?format=api",
        "name": "Mattias Rönnblom",
        "email": "mattias.ronnblom@ericsson.com"
    },
    "delegate": {
        "id": 310,
        "url": "http://patches.dpdk.org/api/users/310/?format=api",
        "username": "jerin",
        "first_name": "Jerin",
        "last_name": "Jacob",
        "email": "jerinj@marvell.com"
    },
    "mbox": "http://patches.dpdk.org/project/dpdk/patch/20180911080216.3017-6-mattias.ronnblom@ericsson.com/mbox/",
    "series": [
        {
            "id": 1264,
            "url": "http://patches.dpdk.org/api/series/1264/?format=api",
            "web_url": "http://patches.dpdk.org/project/dpdk/list/?series=1264",
            "date": "2018-09-11T08:02:07",
            "name": "A Distributed Software Event Device",
            "version": 3,
            "mbox": "http://patches.dpdk.org/series/1264/mbox/"
        }
    ],
    "comments": "http://patches.dpdk.org/api/patches/44561/comments/",
    "check": "success",
    "checks": "http://patches.dpdk.org/api/patches/44561/checks/",
    "tags": {},
    "related": [],
    "headers": {
        "Return-Path": "<dev-bounces@dpdk.org>",
        "X-Original-To": "patchwork@dpdk.org",
        "Delivered-To": "patchwork@dpdk.org",
        "Received": [
            "from [92.243.14.124] (localhost [127.0.0.1])\n\tby dpdk.org (Postfix) with ESMTP id 0DA81568A;\n\tTue, 11 Sep 2018 10:03:24 +0200 (CEST)",
            "from mail.lysator.liu.se (mail.lysator.liu.se [130.236.254.3])\n\tby dpdk.org (Postfix) with ESMTP id 730F44CB3\n\tfor <dev@dpdk.org>; Tue, 11 Sep 2018 10:03:16 +0200 (CEST)",
            "from mail.lysator.liu.se (localhost [127.0.0.1])\n\tby mail.lysator.liu.se (Postfix) with ESMTP id 20F6540083\n\tfor <dev@dpdk.org>; Tue, 11 Sep 2018 10:03:16 +0200 (CEST)",
            "by mail.lysator.liu.se (Postfix, from userid 1004)\n\tid 0D8DD40012; Tue, 11 Sep 2018 10:03:15 +0200 (CEST)",
            "from isengard.friendlyfire.se\n\t(host-90-232-156-190.mobileonline.telia.com [90.232.156.190])\n\t(using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256\n\tbits)) (No client certificate requested)\n\tby mail.lysator.liu.se (Postfix) with ESMTPSA id 1EC2C4007F;\n\tTue, 11 Sep 2018 10:03:12 +0200 (CEST)"
        ],
        "X-Spam-Checker-Version": "SpamAssassin 3.4.1 (2015-04-28) on\n\tbernadotte.lysator.liu.se",
        "X-Spam-Level": "",
        "X-Spam-Status": "No, score=-0.9 required=5.0 tests=ALL_TRUSTED,AWL\n\tautolearn=disabled version=3.4.1",
        "X-Spam-Score": "-0.9",
        "From": "=?utf-8?q?Mattias_R=C3=B6nnblom?= <mattias.ronnblom@ericsson.com>",
        "To": "jerin.jacob@caviumnetworks.com",
        "Cc": "bruce.richardson@intel.com, dev@dpdk.org, =?utf-8?q?Mattias_R=C3=B6nnb?=\n\t=?utf-8?q?lom?= <mattias.ronnblom@ericsson.com>",
        "Date": "Tue, 11 Sep 2018 10:02:11 +0200",
        "Message-Id": "<20180911080216.3017-6-mattias.ronnblom@ericsson.com>",
        "X-Mailer": "git-send-email 2.17.1",
        "In-Reply-To": "<20180911080216.3017-1-mattias.ronnblom@ericsson.com>",
        "References": "<20180911080216.3017-1-mattias.ronnblom@ericsson.com>",
        "MIME-Version": "1.0",
        "Content-Type": "text/plain; charset=UTF-8",
        "Content-Transfer-Encoding": "8bit",
        "X-Virus-Scanned": "ClamAV using ClamSMTP",
        "Subject": "[dpdk-dev] [PATCH v3 05/10] event/dsw: add DSW event scheduling and\n\tdevice start/stop",
        "X-BeenThere": "dev@dpdk.org",
        "X-Mailman-Version": "2.1.15",
        "Precedence": "list",
        "List-Id": "DPDK patches and discussions <dev.dpdk.org>",
        "List-Unsubscribe": "<https://mails.dpdk.org/options/dev>,\n\t<mailto:dev-request@dpdk.org?subject=unsubscribe>",
        "List-Archive": "<http://mails.dpdk.org/archives/dev/>",
        "List-Post": "<mailto:dev@dpdk.org>",
        "List-Help": "<mailto:dev-request@dpdk.org?subject=help>",
        "List-Subscribe": "<https://mails.dpdk.org/listinfo/dev>,\n\t<mailto:dev-request@dpdk.org?subject=subscribe>",
        "Errors-To": "dev-bounces@dpdk.org",
        "Sender": "\"dev\" <dev-bounces@dpdk.org>"
    },
    "content": "With this patch, the DSW event device can be started and stopped,\nand also supports scheduling events between ports.\n\nSigned-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>\n---\n drivers/event/dsw/Makefile    |   2 +-\n drivers/event/dsw/dsw_evdev.c | 125 ++++++++++++\n drivers/event/dsw/dsw_evdev.h |  56 ++++++\n drivers/event/dsw/dsw_event.c | 359 ++++++++++++++++++++++++++++++++++\n drivers/event/dsw/meson.build |   2 +-\n 5 files changed, 542 insertions(+), 2 deletions(-)\n create mode 100644 drivers/event/dsw/dsw_event.c",
    "diff": "diff --git a/drivers/event/dsw/Makefile b/drivers/event/dsw/Makefile\nindex 5cbf488ff..6374a454e 100644\n--- a/drivers/event/dsw/Makefile\n+++ b/drivers/event/dsw/Makefile\n@@ -21,6 +21,6 @@ LIBABIVER := 1\n \n EXPORT_MAP := rte_pmd_dsw_event_version.map\n \n-SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw_evdev.c\n+SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw_evdev.c dsw_event.c\n \n include $(RTE_SDK)/mk/rte.lib.mk\ndiff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c\nindex 5dccc232a..40a7435be 100644\n--- a/drivers/event/dsw/dsw_evdev.c\n+++ b/drivers/event/dsw/dsw_evdev.c\n@@ -6,6 +6,7 @@\n \n #include <rte_eventdev_pmd.h>\n #include <rte_eventdev_pmd_vdev.h>\n+#include <rte_random.h>\n \n #include \"dsw_evdev.h\"\n \n@@ -201,10 +202,125 @@ dsw_configure(const struct rte_eventdev *dev)\n {\n \tstruct dsw_evdev *dsw = dsw_pmd_priv(dev);\n \tconst struct rte_event_dev_config *conf = &dev->data->dev_conf;\n+\tint32_t min_max_in_flight;\n \n \tdsw->num_ports = conf->nb_event_ports;\n \tdsw->num_queues = conf->nb_event_queues;\n \n+\t/* Avoid a situation where consumer ports are holding all the\n+\t * credits, without making use of them.\n+\t */\n+\tmin_max_in_flight = conf->nb_event_ports * DSW_PORT_MAX_CREDITS;\n+\n+\tdsw->max_inflight = RTE_MAX(conf->nb_events_limit, min_max_in_flight);\n+\n+\treturn 0;\n+}\n+\n+\n+static void\n+initial_flow_to_port_assignment(struct dsw_evdev *dsw)\n+{\n+\tuint8_t queue_id;\n+\tfor (queue_id = 0; queue_id < dsw->num_queues; queue_id++) {\n+\t\tstruct dsw_queue *queue = &dsw->queues[queue_id];\n+\t\tuint16_t flow_hash;\n+\t\tfor (flow_hash = 0; flow_hash < DSW_MAX_FLOWS; flow_hash++) {\n+\t\t\tuint8_t port_idx =\n+\t\t\t\trte_rand() % queue->num_serving_ports;\n+\t\t\tuint8_t port_id =\n+\t\t\t\tqueue->serving_ports[port_idx];\n+\t\t\tdsw->queues[queue_id].flow_to_port_map[flow_hash] =\n+\t\t\t\tport_id;\n+\t\t}\n+\t}\n+}\n+\n+static int\n+dsw_start(struct rte_eventdev *dev)\n+{\n+\tstruct dsw_evdev *dsw = dsw_pmd_priv(dev);\n+\n+\trte_atomic32_init(&dsw->credits_on_loan);\n+\n+\tinitial_flow_to_port_assignment(dsw);\n+\n+\treturn 0;\n+}\n+\n+static void\n+dsw_port_drain_buf(uint8_t dev_id, struct rte_event *buf, uint16_t buf_len,\n+\t\t   eventdev_stop_flush_t flush, void *flush_arg)\n+{\n+\tuint16_t i;\n+\n+\tfor (i = 0; i < buf_len; i++)\n+\t\tflush(dev_id, buf[i], flush_arg);\n+}\n+\n+static void\n+dsw_port_drain_out(uint8_t dev_id, struct dsw_evdev *dsw, struct dsw_port *port,\n+\t\t   eventdev_stop_flush_t flush, void *flush_arg)\n+{\n+\tuint16_t dport_id;\n+\n+\tfor (dport_id = 0; dport_id < dsw->num_ports; dport_id++)\n+\t\tif (dport_id != port->id)\n+\t\t\tdsw_port_drain_buf(dev_id, port->out_buffer[dport_id],\n+\t\t\t\t\t   port->out_buffer_len[dport_id],\n+\t\t\t\t\t   flush, flush_arg);\n+}\n+\n+static void\n+dsw_port_drain_in_ring(uint8_t dev_id, struct dsw_port *port,\n+\t\t       eventdev_stop_flush_t flush, void *flush_arg)\n+{\n+\tstruct rte_event ev;\n+\n+\twhile (rte_event_ring_dequeue_burst(port->in_ring, &ev, 1, NULL))\n+\t\tflush(dev_id, ev, flush_arg);\n+}\n+\n+static void\n+dsw_drain(uint8_t dev_id, struct dsw_evdev *dsw,\n+\t  eventdev_stop_flush_t flush, void *flush_arg)\n+{\n+\tuint16_t port_id;\n+\n+\tif (flush == NULL)\n+\t\treturn;\n+\n+\tfor (port_id = 0; port_id < dsw->num_ports; port_id++) {\n+\t\tstruct dsw_port *port = &dsw->ports[port_id];\n+\n+\t\tdsw_port_drain_out(dev_id, dsw, port, flush, flush_arg);\n+\t\tdsw_port_drain_in_ring(dev_id, port, flush, flush_arg);\n+\t}\n+}\n+\n+static void\n+dsw_stop(struct rte_eventdev *dev)\n+{\n+\tstruct dsw_evdev *dsw = dsw_pmd_priv(dev);\n+\tuint8_t dev_id;\n+\teventdev_stop_flush_t flush;\n+\tvoid *flush_arg;\n+\n+\tdev_id = dev->data->dev_id;\n+\tflush = dev->dev_ops->dev_stop_flush;\n+\tflush_arg = dev->data->dev_stop_flush_arg;\n+\n+\tdsw_drain(dev_id, dsw, flush, flush_arg);\n+}\n+\n+static int\n+dsw_close(struct rte_eventdev *dev)\n+{\n+\tstruct dsw_evdev *dsw = dsw_pmd_priv(dev);\n+\n+\tdsw->num_ports = 0;\n+\tdsw->num_queues = 0;\n+\n \treturn 0;\n }\n \n@@ -219,6 +335,9 @@ static struct rte_eventdev_ops dsw_evdev_ops = {\n \t.port_unlink = dsw_port_unlink,\n \t.dev_infos_get = dsw_info_get,\n \t.dev_configure = dsw_configure,\n+\t.dev_start = dsw_start,\n+\t.dev_stop = dsw_stop,\n+\t.dev_close = dsw_close\n };\n \n static int\n@@ -236,6 +355,12 @@ dsw_probe(struct rte_vdev_device *vdev)\n \t\treturn -EFAULT;\n \n \tdev->dev_ops = &dsw_evdev_ops;\n+\tdev->enqueue = dsw_event_enqueue;\n+\tdev->enqueue_burst = dsw_event_enqueue_burst;\n+\tdev->enqueue_new_burst = dsw_event_enqueue_new_burst;\n+\tdev->enqueue_forward_burst = dsw_event_enqueue_forward_burst;\n+\tdev->dequeue = dsw_event_dequeue;\n+\tdev->dequeue_burst = dsw_event_dequeue_burst;\n \n \tif (rte_eal_process_type() != RTE_PROC_PRIMARY)\n \t\treturn 0;\ndiff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h\nindex ad0f857cc..f8e94e4a4 100644\n--- a/drivers/event/dsw/dsw_evdev.h\n+++ b/drivers/event/dsw/dsw_evdev.h\n@@ -14,6 +14,7 @@\n #define DSW_MAX_PORTS (64)\n #define DSW_MAX_PORT_DEQUEUE_DEPTH (128)\n #define DSW_MAX_PORT_ENQUEUE_DEPTH (128)\n+#define DSW_MAX_PORT_OUT_BUFFER (32)\n \n #define DSW_MAX_QUEUES (16)\n \n@@ -24,6 +25,24 @@\n #define DSW_MAX_FLOWS (1<<(DSW_MAX_FLOWS_BITS))\n #define DSW_MAX_FLOWS_MASK (DSW_MAX_FLOWS-1)\n \n+/* Eventdev RTE_SCHED_TYPE_PARALLEL doesn't have a concept of flows,\n+ * but the 'dsw' scheduler (more or less) randomly assign flow id to\n+ * events on parallel queues, to be able to reuse some of the\n+ * migration mechanism and scheduling logic from\n+ * RTE_SCHED_TYPE_ATOMIC. By moving one of the parallel \"flows\" from a\n+ * particular port, the likely-hood of events being scheduled to this\n+ * port is reduced, and thus a kind of statistical load balancing is\n+ * achieved.\n+ */\n+#define DSW_PARALLEL_FLOWS (1024)\n+\n+/* Avoid making small 'loans' from the central in-flight event credit\n+ * pool, to improve efficiency.\n+ */\n+#define DSW_MIN_CREDIT_LOAN (64)\n+#define DSW_PORT_MAX_CREDITS (2*DSW_MIN_CREDIT_LOAN)\n+#define DSW_PORT_MIN_CREDITS (DSW_MIN_CREDIT_LOAN)\n+\n /* The rings are dimensioned so that all in-flight events can reside\n  * on any one of the port rings, to avoid the trouble of having to\n  * care about the case where there's no room on the destination port's\n@@ -44,8 +63,17 @@ struct dsw_port {\n \tuint16_t dequeue_depth;\n \tuint16_t enqueue_depth;\n \n+\tint32_t inflight_credits;\n+\n \tint32_t new_event_threshold;\n \n+\tuint16_t pending_releases;\n+\n+\tuint16_t next_parallel_flow_id;\n+\n+\tuint16_t out_buffer_len[DSW_MAX_PORTS];\n+\tstruct rte_event out_buffer[DSW_MAX_PORTS][DSW_MAX_PORT_OUT_BUFFER];\n+\n \tstruct rte_event_ring *in_ring __rte_cache_aligned;\n } __rte_cache_aligned;\n \n@@ -53,6 +81,8 @@ struct dsw_queue {\n \tuint8_t schedule_type;\n \tuint8_t serving_ports[DSW_MAX_PORTS];\n \tuint16_t num_serving_ports;\n+\n+\tuint8_t flow_to_port_map[DSW_MAX_FLOWS] __rte_cache_aligned;\n };\n \n struct dsw_evdev {\n@@ -62,12 +92,38 @@ struct dsw_evdev {\n \tuint16_t num_ports;\n \tstruct dsw_queue queues[DSW_MAX_QUEUES];\n \tuint8_t num_queues;\n+\tint32_t max_inflight;\n+\n+\trte_atomic32_t credits_on_loan __rte_cache_aligned;\n };\n \n+uint16_t dsw_event_enqueue(void *port, const struct rte_event *event);\n+uint16_t dsw_event_enqueue_burst(void *port,\n+\t\t\t\t const struct rte_event events[],\n+\t\t\t\t uint16_t events_len);\n+uint16_t dsw_event_enqueue_new_burst(void *port,\n+\t\t\t\t     const struct rte_event events[],\n+\t\t\t\t     uint16_t events_len);\n+uint16_t dsw_event_enqueue_forward_burst(void *port,\n+\t\t\t\t\t const struct rte_event events[],\n+\t\t\t\t\t uint16_t events_len);\n+\n+uint16_t dsw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait);\n+uint16_t dsw_event_dequeue_burst(void *port, struct rte_event *events,\n+\t\t\t\t uint16_t num, uint64_t wait);\n+\n static inline struct dsw_evdev *\n dsw_pmd_priv(const struct rte_eventdev *eventdev)\n {\n \treturn eventdev->data->dev_private;\n }\n \n+#define DSW_LOG_DP(level, fmt, args...)\t\t\t\t\t\\\n+\tRTE_LOG_DP(level, EVENTDEV, \"[%s] %s() line %u: \" fmt,\t\t\\\n+\t\t   DSW_PMD_NAME,\t\t\t\t\t\\\n+\t\t   __func__, __LINE__, ## args)\n+\n+#define DSW_LOG_DP_PORT(level, port_id, fmt, args...)\t\t\\\n+\tDSW_LOG_DP(level, \"<Port %d> \" fmt, port_id, ## args)\n+\n #endif\ndiff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c\nnew file mode 100644\nindex 000000000..4a3af8ecd\n--- /dev/null\n+++ b/drivers/event/dsw/dsw_event.c\n@@ -0,0 +1,359 @@\n+/* SPDX-License-Identifier: BSD-3-Clause\n+ * Copyright(c) 2018 Ericsson AB\n+ */\n+\n+#include \"dsw_evdev.h\"\n+\n+#include <stdbool.h>\n+\n+#include <rte_atomic.h>\n+#include <rte_random.h>\n+\n+static bool\n+dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,\n+\t\t\t int32_t credits)\n+{\n+\tint32_t inflight_credits = port->inflight_credits;\n+\tint32_t missing_credits = credits - inflight_credits;\n+\tint32_t total_on_loan;\n+\tint32_t available;\n+\tint32_t acquired_credits;\n+\tint32_t new_total_on_loan;\n+\n+\tif (likely(missing_credits <= 0)) {\n+\t\tport->inflight_credits -= credits;\n+\t\treturn true;\n+\t}\n+\n+\ttotal_on_loan = rte_atomic32_read(&dsw->credits_on_loan);\n+\tavailable = dsw->max_inflight - total_on_loan;\n+\tacquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);\n+\n+\tif (available < acquired_credits)\n+\t\treturn false;\n+\n+\t/* This is a race, no locks are involved, and thus some other\n+\t * thread can allocate tokens in between the check and the\n+\t * allocation.\n+\t */\n+\tnew_total_on_loan = rte_atomic32_add_return(&dsw->credits_on_loan,\n+\t\t\t\t\t\t    acquired_credits);\n+\n+\tif (unlikely(new_total_on_loan > dsw->max_inflight)) {\n+\t\t/* Some other port took the last credits */\n+\t\trte_atomic32_sub(&dsw->credits_on_loan, acquired_credits);\n+\t\treturn false;\n+\t}\n+\n+\tDSW_LOG_DP_PORT(DEBUG, port->id, \"Acquired %d tokens from pool.\\n\",\n+\t\t\tacquired_credits);\n+\n+\tport->inflight_credits += acquired_credits;\n+\tport->inflight_credits -= credits;\n+\n+\treturn true;\n+}\n+\n+static void\n+dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,\n+\t\t\tint32_t credits)\n+{\n+\tport->inflight_credits += credits;\n+\n+\tif (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {\n+\t\tint32_t leave_credits = DSW_PORT_MIN_CREDITS;\n+\t\tint32_t return_credits =\n+\t\t\tport->inflight_credits - leave_credits;\n+\n+\t\tport->inflight_credits = leave_credits;\n+\n+\t\trte_atomic32_sub(&dsw->credits_on_loan, return_credits);\n+\n+\t\tDSW_LOG_DP_PORT(DEBUG, port->id,\n+\t\t\t\t\"Returned %d tokens to pool.\\n\",\n+\t\t\t\treturn_credits);\n+\t}\n+}\n+\n+static uint8_t\n+dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)\n+{\n+\tstruct dsw_queue *queue = &dsw->queues[queue_id];\n+\tuint8_t port_id;\n+\n+\tif (queue->num_serving_ports > 1)\n+\t\tport_id = queue->flow_to_port_map[flow_hash];\n+\telse\n+\t\t/* A single-link queue, or atomic/ordered/parallel but\n+\t\t * with just a single serving port.\n+\t\t */\n+\t\tport_id = queue->serving_ports[0];\n+\n+\tDSW_LOG_DP(DEBUG, \"Event with queue_id %d flow_hash %d is scheduled \"\n+\t\t   \"to port %d.\\n\", queue_id, flow_hash, port_id);\n+\n+\treturn port_id;\n+}\n+\n+static void\n+dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,\n+\t\t\t   uint8_t dest_port_id)\n+{\n+\tstruct dsw_port *dest_port = &(dsw->ports[dest_port_id]);\n+\tuint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];\n+\tstruct rte_event *buffer = source_port->out_buffer[dest_port_id];\n+\tuint16_t enqueued = 0;\n+\n+\tif (*buffer_len == 0)\n+\t\treturn;\n+\n+\t/* The rings are dimensioned to fit all in-flight events (even\n+\t * on a single ring), so looping will work.\n+\t */\n+\tdo {\n+\t\tenqueued +=\n+\t\t\trte_event_ring_enqueue_burst(dest_port->in_ring,\n+\t\t\t\t\t\t     buffer+enqueued,\n+\t\t\t\t\t\t     *buffer_len-enqueued,\n+\t\t\t\t\t\t     NULL);\n+\t} while (unlikely(enqueued != *buffer_len));\n+\n+\t(*buffer_len) = 0;\n+}\n+\n+static uint16_t\n+dsw_port_get_parallel_flow_id(struct dsw_port *port)\n+{\n+\tuint16_t flow_id = port->next_parallel_flow_id;\n+\n+\tport->next_parallel_flow_id =\n+\t\t(port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;\n+\n+\treturn flow_id;\n+}\n+\n+static void\n+dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,\n+\t\t\t   uint8_t dest_port_id, const struct rte_event *event)\n+{\n+\tstruct rte_event *buffer = source_port->out_buffer[dest_port_id];\n+\tuint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];\n+\n+\tif (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)\n+\t\tdsw_port_transmit_buffered(dsw, source_port, dest_port_id);\n+\n+\tbuffer[*buffer_len] = *event;\n+\n+\t(*buffer_len)++;\n+}\n+\n+#define DSW_FLOW_ID_BITS (24)\n+static uint16_t\n+dsw_flow_id_hash(uint32_t flow_id)\n+{\n+\tuint16_t hash = 0;\n+\tuint16_t offset = 0;\n+\n+\tdo {\n+\t\thash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);\n+\t\toffset += DSW_MAX_FLOWS_BITS;\n+\t} while (offset < DSW_FLOW_ID_BITS);\n+\n+\treturn hash;\n+}\n+\n+static void\n+dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,\n+\t\t\t struct rte_event event)\n+{\n+\tuint8_t dest_port_id;\n+\n+\tevent.flow_id = dsw_port_get_parallel_flow_id(source_port);\n+\n+\tdest_port_id = dsw_schedule(dsw, event.queue_id,\n+\t\t\t\t    dsw_flow_id_hash(event.flow_id));\n+\n+\tdsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);\n+}\n+\n+static void\n+dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,\n+\t\t      const struct rte_event *event)\n+{\n+\tuint16_t flow_hash;\n+\tuint8_t dest_port_id;\n+\n+\tif (unlikely(dsw->queues[event->queue_id].schedule_type ==\n+\t\t     RTE_SCHED_TYPE_PARALLEL)) {\n+\t\tdsw_port_buffer_parallel(dsw, source_port, *event);\n+\t\treturn;\n+\t}\n+\n+\tflow_hash = dsw_flow_id_hash(event->flow_id);\n+\n+\tdest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);\n+\n+\tdsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);\n+}\n+\n+static void\n+dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)\n+{\n+\tuint16_t dest_port_id;\n+\n+\tfor (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)\n+\t\tdsw_port_transmit_buffered(dsw, source_port, dest_port_id);\n+}\n+\n+uint16_t\n+dsw_event_enqueue(void *port, const struct rte_event *ev)\n+{\n+\treturn dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);\n+}\n+\n+static __rte_always_inline uint16_t\n+dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],\n+\t\t\t\tuint16_t events_len, bool op_types_known,\n+\t\t\t\tuint16_t num_new, uint16_t num_release,\n+\t\t\t\tuint16_t num_non_release)\n+{\n+\tstruct dsw_port *source_port = port;\n+\tstruct dsw_evdev *dsw = source_port->dsw;\n+\tbool enough_credits;\n+\tuint16_t i;\n+\n+\tDSW_LOG_DP_PORT(DEBUG, source_port->id, \"Attempting to enqueue %d \"\n+\t\t\t\"events to port %d.\\n\", events_len, source_port->id);\n+\n+\t/* XXX: For performance (=ring efficiency) reasons, the\n+\t * scheduler relies on internal non-ring buffers instead of\n+\t * immediately sending the event to the destination ring. For\n+\t * a producer that doesn't intend to produce or consume any\n+\t * more events, the scheduler provides a way to flush the\n+\t * buffer, by means of doing an enqueue of zero events. In\n+\t * addition, a port cannot be left \"unattended\" (e.g. unused)\n+\t * for long periods of time, since that would stall\n+\t * migration. Eventdev API extensions to provide a cleaner way\n+\t * to archieve both of these functions should be\n+\t * considered.\n+\t */\n+\tif (unlikely(events_len == 0)) {\n+\t\tdsw_port_flush_out_buffers(dsw, source_port);\n+\t\treturn 0;\n+\t}\n+\n+\tif (unlikely(events_len > source_port->enqueue_depth))\n+\t\tevents_len = source_port->enqueue_depth;\n+\n+\tif (!op_types_known)\n+\t\tfor (i = 0; i < events_len; i++) {\n+\t\t\tswitch (events[i].op) {\n+\t\t\tcase RTE_EVENT_OP_RELEASE:\n+\t\t\t\tnum_release++;\n+\t\t\t\tbreak;\n+\t\t\tcase RTE_EVENT_OP_NEW:\n+\t\t\t\tnum_new++;\n+\t\t\t\t/* Falls through. */\n+\t\t\tdefault:\n+\t\t\t\tnum_non_release++;\n+\t\t\t\tbreak;\n+\t\t\t}\n+\t\t}\n+\n+\t/* Technically, we could allow the non-new events up to the\n+\t * first new event in the array into the system, but for\n+\t * simplicity reasons, we deny the whole burst if the port is\n+\t * above the water mark.\n+\t */\n+\tif (unlikely(num_new > 0 && rte_atomic32_read(&dsw->credits_on_loan) >\n+\t\t     source_port->new_event_threshold))\n+\t\treturn 0;\n+\n+\tenough_credits = dsw_port_acquire_credits(dsw, source_port,\n+\t\t\t\t\t\t  num_non_release);\n+\tif (unlikely(!enough_credits))\n+\t\treturn 0;\n+\n+\tsource_port->pending_releases -= num_release;\n+\n+\tfor (i = 0; i < events_len; i++) {\n+\t\tconst struct rte_event *event = &events[i];\n+\n+\t\tif (likely(num_release == 0 ||\n+\t\t\t   event->op != RTE_EVENT_OP_RELEASE))\n+\t\t\tdsw_port_buffer_event(dsw, source_port, event);\n+\t}\n+\n+\tDSW_LOG_DP_PORT(DEBUG, source_port->id, \"%d non-release events \"\n+\t\t\t\"accepted.\\n\", num_non_release);\n+\n+\treturn num_non_release;\n+}\n+\n+uint16_t\n+dsw_event_enqueue_burst(void *port, const struct rte_event events[],\n+\t\t\tuint16_t events_len)\n+{\n+\treturn dsw_event_enqueue_burst_generic(port, events, events_len, false,\n+\t\t\t\t\t       0, 0, 0);\n+}\n+\n+uint16_t\n+dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],\n+\t\t\t    uint16_t events_len)\n+{\n+\treturn dsw_event_enqueue_burst_generic(port, events, events_len, true,\n+\t\t\t\t\t       events_len, 0, events_len);\n+}\n+\n+uint16_t\n+dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],\n+\t\t\t\tuint16_t events_len)\n+{\n+\treturn dsw_event_enqueue_burst_generic(port, events, events_len, true,\n+\t\t\t\t\t       0, 0, events_len);\n+}\n+\n+uint16_t\n+dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)\n+{\n+\treturn dsw_event_dequeue_burst(port, events, 1, wait);\n+}\n+\n+static uint16_t\n+dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,\n+\t\t       uint16_t num)\n+{\n+\treturn rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);\n+}\n+\n+uint16_t\n+dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,\n+\t\t\tuint64_t wait __rte_unused)\n+{\n+\tstruct dsw_port *source_port = port;\n+\tstruct dsw_evdev *dsw = source_port->dsw;\n+\tuint16_t dequeued;\n+\n+\tsource_port->pending_releases = 0;\n+\n+\tif (unlikely(num > source_port->dequeue_depth))\n+\t\tnum = source_port->dequeue_depth;\n+\n+\tdequeued = dsw_port_dequeue_burst(source_port, events, num);\n+\n+\tsource_port->pending_releases = dequeued;\n+\n+\tif (dequeued > 0) {\n+\t\tDSW_LOG_DP_PORT(DEBUG, source_port->id, \"Dequeued %d events.\\n\",\n+\t\t\t\tdequeued);\n+\n+\t\tdsw_port_return_credits(dsw, source_port, dequeued);\n+\t}\n+\t/* XXX: Assuming the port can't produce any more work,\n+\t *\tconsider flushing the output buffer, on dequeued ==\n+\t *\t0.\n+\t */\n+\n+\treturn dequeued;\n+}\ndiff --git a/drivers/event/dsw/meson.build b/drivers/event/dsw/meson.build\nindex 275d051c3..bd2e4c809 100644\n--- a/drivers/event/dsw/meson.build\n+++ b/drivers/event/dsw/meson.build\n@@ -3,4 +3,4 @@\n \n allow_experimental_apis = true\n deps += ['bus_vdev']\n-sources = files('dsw_evdev.c')\n+sources = files('dsw_evdev.c', 'dsw_event.c')\n",
    "prefixes": [
        "v3",
        "05/10"
    ]
}