get:
Show a patch.

patch:
Update a patch.

put:
Update a patch.

GET /api/patches/44564/?format=api
HTTP 200 OK
Allow: GET, PUT, PATCH, HEAD, OPTIONS
Content-Type: application/json
Vary: Accept

{
    "id": 44564,
    "url": "http://patches.dpdk.org/api/patches/44564/?format=api",
    "web_url": "http://patches.dpdk.org/project/dpdk/patch/20180911080216.3017-8-mattias.ronnblom@ericsson.com/",
    "project": {
        "id": 1,
        "url": "http://patches.dpdk.org/api/projects/1/?format=api",
        "name": "DPDK",
        "link_name": "dpdk",
        "list_id": "dev.dpdk.org",
        "list_email": "dev@dpdk.org",
        "web_url": "http://core.dpdk.org",
        "scm_url": "git://dpdk.org/dpdk",
        "webscm_url": "http://git.dpdk.org/dpdk",
        "list_archive_url": "https://inbox.dpdk.org/dev",
        "list_archive_url_format": "https://inbox.dpdk.org/dev/{}",
        "commit_url_format": ""
    },
    "msgid": "<20180911080216.3017-8-mattias.ronnblom@ericsson.com>",
    "list_archive_url": "https://inbox.dpdk.org/dev/20180911080216.3017-8-mattias.ronnblom@ericsson.com",
    "date": "2018-09-11T08:02:13",
    "name": "[v3,07/10] event/dsw: add load balancing to the DSW event device",
    "commit_ref": null,
    "pull_url": null,
    "state": "changes-requested",
    "archived": true,
    "hash": "8e70a82d14b2a325fb3533fb07c17413ea5c390f",
    "submitter": {
        "id": 1077,
        "url": "http://patches.dpdk.org/api/people/1077/?format=api",
        "name": "Mattias Rönnblom",
        "email": "mattias.ronnblom@ericsson.com"
    },
    "delegate": {
        "id": 310,
        "url": "http://patches.dpdk.org/api/users/310/?format=api",
        "username": "jerin",
        "first_name": "Jerin",
        "last_name": "Jacob",
        "email": "jerinj@marvell.com"
    },
    "mbox": "http://patches.dpdk.org/project/dpdk/patch/20180911080216.3017-8-mattias.ronnblom@ericsson.com/mbox/",
    "series": [
        {
            "id": 1264,
            "url": "http://patches.dpdk.org/api/series/1264/?format=api",
            "web_url": "http://patches.dpdk.org/project/dpdk/list/?series=1264",
            "date": "2018-09-11T08:02:07",
            "name": "A Distributed Software Event Device",
            "version": 3,
            "mbox": "http://patches.dpdk.org/series/1264/mbox/"
        }
    ],
    "comments": "http://patches.dpdk.org/api/patches/44564/comments/",
    "check": "success",
    "checks": "http://patches.dpdk.org/api/patches/44564/checks/",
    "tags": {},
    "related": [],
    "headers": {
        "Return-Path": "<dev-bounces@dpdk.org>",
        "X-Original-To": "patchwork@dpdk.org",
        "Delivered-To": "patchwork@dpdk.org",
        "Received": [
            "from [92.243.14.124] (localhost [127.0.0.1])\n\tby dpdk.org (Postfix) with ESMTP id 4BC9F5B30;\n\tTue, 11 Sep 2018 10:03:29 +0200 (CEST)",
            "from mail.lysator.liu.se (mail.lysator.liu.se [130.236.254.3])\n\tby dpdk.org (Postfix) with ESMTP id 4464F4D3A\n\tfor <dev@dpdk.org>; Tue, 11 Sep 2018 10:03:19 +0200 (CEST)",
            "from mail.lysator.liu.se (localhost [127.0.0.1])\n\tby mail.lysator.liu.se (Postfix) with ESMTP id 049434007C\n\tfor <dev@dpdk.org>; Tue, 11 Sep 2018 10:03:19 +0200 (CEST)",
            "by mail.lysator.liu.se (Postfix, from userid 1004)\n\tid E241340012; Tue, 11 Sep 2018 10:03:18 +0200 (CEST)",
            "from isengard.friendlyfire.se\n\t(host-90-232-156-190.mobileonline.telia.com [90.232.156.190])\n\t(using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256\n\tbits)) (No client certificate requested)\n\tby mail.lysator.liu.se (Postfix) with ESMTPSA id 230D940081;\n\tTue, 11 Sep 2018 10:03:13 +0200 (CEST)"
        ],
        "X-Spam-Checker-Version": "SpamAssassin 3.4.1 (2015-04-28) on\n\tbernadotte.lysator.liu.se",
        "X-Spam-Level": "",
        "X-Spam-Status": "No, score=-0.9 required=5.0 tests=ALL_TRUSTED,AWL\n\tautolearn=disabled version=3.4.1",
        "X-Spam-Score": "-0.9",
        "From": "=?utf-8?q?Mattias_R=C3=B6nnblom?= <mattias.ronnblom@ericsson.com>",
        "To": "jerin.jacob@caviumnetworks.com",
        "Cc": "bruce.richardson@intel.com, dev@dpdk.org, =?utf-8?q?Mattias_R=C3=B6nnb?=\n\t=?utf-8?q?lom?= <mattias.ronnblom@ericsson.com>",
        "Date": "Tue, 11 Sep 2018 10:02:13 +0200",
        "Message-Id": "<20180911080216.3017-8-mattias.ronnblom@ericsson.com>",
        "X-Mailer": "git-send-email 2.17.1",
        "In-Reply-To": "<20180911080216.3017-1-mattias.ronnblom@ericsson.com>",
        "References": "<20180911080216.3017-1-mattias.ronnblom@ericsson.com>",
        "MIME-Version": "1.0",
        "Content-Type": "text/plain; charset=UTF-8",
        "Content-Transfer-Encoding": "8bit",
        "X-Virus-Scanned": "ClamAV using ClamSMTP",
        "Subject": "[dpdk-dev] [PATCH v3 07/10] event/dsw: add load balancing to the\n\tDSW event device",
        "X-BeenThere": "dev@dpdk.org",
        "X-Mailman-Version": "2.1.15",
        "Precedence": "list",
        "List-Id": "DPDK patches and discussions <dev.dpdk.org>",
        "List-Unsubscribe": "<https://mails.dpdk.org/options/dev>,\n\t<mailto:dev-request@dpdk.org?subject=unsubscribe>",
        "List-Archive": "<http://mails.dpdk.org/archives/dev/>",
        "List-Post": "<mailto:dev@dpdk.org>",
        "List-Help": "<mailto:dev-request@dpdk.org?subject=help>",
        "List-Subscribe": "<https://mails.dpdk.org/listinfo/dev>,\n\t<mailto:dev-request@dpdk.org?subject=subscribe>",
        "Errors-To": "dev-bounces@dpdk.org",
        "Sender": "\"dev\" <dev-bounces@dpdk.org>"
    },
    "content": "The DSW event device will now attempt to migrate (move) flows between\nports in order to balance the load.\n\nSigned-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>\n---\n drivers/event/dsw/dsw_evdev.c |  27 ++\n drivers/event/dsw/dsw_evdev.h |  80 ++++\n drivers/event/dsw/dsw_event.c | 735 +++++++++++++++++++++++++++++++++-\n 3 files changed, 838 insertions(+), 4 deletions(-)",
    "diff": "diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c\nindex bcfa17bab..2ecb365ba 100644\n--- a/drivers/event/dsw/dsw_evdev.c\n+++ b/drivers/event/dsw/dsw_evdev.c\n@@ -20,6 +20,7 @@ dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,\n \tstruct dsw_evdev *dsw = dsw_pmd_priv(dev);\n \tstruct dsw_port *port;\n \tstruct rte_event_ring *in_ring;\n+\tstruct rte_ring *ctl_in_ring;\n \tchar ring_name[RTE_RING_NAMESIZE];\n \n \tport = &dsw->ports[port_id];\n@@ -42,13 +43,29 @@ dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,\n \tif (in_ring == NULL)\n \t\treturn -ENOMEM;\n \n+\tsnprintf(ring_name, sizeof(ring_name), \"dswctl%d_p%u\",\n+\t\t dev->data->dev_id, port_id);\n+\n+\tctl_in_ring = rte_ring_create(ring_name, DSW_CTL_IN_RING_SIZE,\n+\t\t\t\t      dev->data->socket_id,\n+\t\t\t\t      RING_F_SC_DEQ|RING_F_EXACT_SZ);\n+\n+\tif (ctl_in_ring == NULL) {\n+\t\trte_event_ring_free(in_ring);\n+\t\treturn -ENOMEM;\n+\t}\n+\n \tport->in_ring = in_ring;\n+\tport->ctl_in_ring = ctl_in_ring;\n \n \trte_atomic16_init(&port->load);\n \n \tport->load_update_interval =\n \t\t(DSW_LOAD_UPDATE_INTERVAL * rte_get_timer_hz()) / US_PER_S;\n \n+\tport->migration_interval =\n+\t\t(DSW_MIGRATION_INTERVAL * rte_get_timer_hz()) / US_PER_S;\n+\n \tdev->data->ports[port_id] = port;\n \n \treturn 0;\n@@ -72,6 +89,7 @@ dsw_port_release(void *p)\n \tstruct dsw_port *port = p;\n \n \trte_event_ring_free(port->in_ring);\n+\trte_ring_free(port->ctl_in_ring);\n }\n \n static int\n@@ -272,6 +290,14 @@ dsw_port_drain_buf(uint8_t dev_id, struct rte_event *buf, uint16_t buf_len,\n \t\tflush(dev_id, buf[i], flush_arg);\n }\n \n+static void\n+dsw_port_drain_paused(uint8_t dev_id, struct dsw_port *port,\n+\t\t      eventdev_stop_flush_t flush, void *flush_arg)\n+{\n+\tdsw_port_drain_buf(dev_id, port->paused_events, port->paused_events_len,\n+\t\t\t   flush, flush_arg);\n+}\n+\n static void\n dsw_port_drain_out(uint8_t dev_id, struct dsw_evdev *dsw, struct dsw_port *port,\n \t\t   eventdev_stop_flush_t flush, void *flush_arg)\n@@ -308,6 +334,7 @@ dsw_drain(uint8_t dev_id, struct dsw_evdev *dsw,\n \t\tstruct dsw_port *port = &dsw->ports[port_id];\n \n \t\tdsw_port_drain_out(dev_id, dsw, port, flush, flush_arg);\n+\t\tdsw_port_drain_paused(dev_id, port, flush, flush_arg);\n \t\tdsw_port_drain_in_ring(dev_id, port, flush, flush_arg);\n \t}\n }\ndiff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h\nindex a5399dda5..783c418bf 100644\n--- a/drivers/event/dsw/dsw_evdev.h\n+++ b/drivers/event/dsw/dsw_evdev.h\n@@ -73,7 +73,37 @@\n #define DSW_LOAD_UPDATE_INTERVAL (DSW_MIGRATION_INTERVAL/4)\n #define DSW_OLD_LOAD_WEIGHT (1)\n \n+/* The minimum time (in us) between two flow migrations. What puts an\n+ * upper limit on the actual migration rate is primarily the pace in\n+ * which the ports send and receive control messages, which in turn is\n+ * largely a function of how much cycles are spent the processing of\n+ * an event burst.\n+ */\n #define DSW_MIGRATION_INTERVAL (1000)\n+#define DSW_MIN_SOURCE_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(70))\n+#define DSW_MAX_TARGET_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(95))\n+\n+#define DSW_MAX_EVENTS_RECORDED (128)\n+\n+/* Only one outstanding migration per port is allowed */\n+#define DSW_MAX_PAUSED_FLOWS (DSW_MAX_PORTS)\n+\n+/* Enough room for paus request/confirm and unpaus request/confirm for\n+ * all possible senders.\n+ */\n+#define DSW_CTL_IN_RING_SIZE ((DSW_MAX_PORTS-1)*4)\n+\n+struct dsw_queue_flow {\n+\tuint8_t queue_id;\n+\tuint16_t flow_hash;\n+};\n+\n+enum dsw_migration_state {\n+\tDSW_MIGRATION_STATE_IDLE,\n+\tDSW_MIGRATION_STATE_PAUSING,\n+\tDSW_MIGRATION_STATE_FORWARDING,\n+\tDSW_MIGRATION_STATE_UNPAUSING\n+};\n \n struct dsw_port {\n \tuint16_t id;\n@@ -98,6 +128,7 @@ struct dsw_port {\n \n \tuint16_t ops_since_bg_task;\n \n+\t/* most recent 'background' processing */\n \tuint64_t last_bg;\n \n \t/* For port load measurement. */\n@@ -108,11 +139,46 @@ struct dsw_port {\n \tuint64_t busy_cycles;\n \tuint64_t total_busy_cycles;\n \n+\t/* For the ctl interface and flow migration mechanism. */\n+\tuint64_t next_migration;\n+\tuint64_t migration_interval;\n+\tenum dsw_migration_state migration_state;\n+\n+\tuint64_t migration_start;\n+\tuint64_t migrations;\n+\tuint64_t migration_latency;\n+\n+\tuint8_t migration_target_port_id;\n+\tstruct dsw_queue_flow migration_target_qf;\n+\tuint8_t cfm_cnt;\n+\n+\tuint16_t paused_flows_len;\n+\tstruct dsw_queue_flow paused_flows[DSW_MAX_PAUSED_FLOWS];\n+\n+\t/* In a very contrived worst case all inflight events can be\n+\t * laying around paused here.\n+\t */\n+\tuint16_t paused_events_len;\n+\tstruct rte_event paused_events[DSW_MAX_EVENTS];\n+\n+\tuint16_t seen_events_len;\n+\tuint16_t seen_events_idx;\n+\tstruct dsw_queue_flow seen_events[DSW_MAX_EVENTS_RECORDED];\n+\n \tuint16_t out_buffer_len[DSW_MAX_PORTS];\n \tstruct rte_event out_buffer[DSW_MAX_PORTS][DSW_MAX_PORT_OUT_BUFFER];\n \n+\tuint16_t in_buffer_len;\n+\tuint16_t in_buffer_start;\n+\t/* This buffer may contain events that were read up from the\n+\t * in_ring during the flow migration process.\n+\t */\n+\tstruct rte_event in_buffer[DSW_MAX_EVENTS];\n+\n \tstruct rte_event_ring *in_ring __rte_cache_aligned;\n \n+\tstruct rte_ring *ctl_in_ring __rte_cache_aligned;\n+\n \t/* Estimate of current port load. */\n \trte_atomic16_t load __rte_cache_aligned;\n } __rte_cache_aligned;\n@@ -137,6 +203,20 @@ struct dsw_evdev {\n \trte_atomic32_t credits_on_loan __rte_cache_aligned;\n };\n \n+#define DSW_CTL_PAUS_REQ (0)\n+#define DSW_CTL_UNPAUS_REQ (1)\n+#define DSW_CTL_CFM (2)\n+\n+/* sizeof(struct dsw_ctl_msg) must be equal or less than\n+ * sizeof(void *), to fit on the control ring.\n+ */\n+struct dsw_ctl_msg {\n+\tuint8_t type:2;\n+\tuint8_t originating_port_id:6;\n+\tuint8_t queue_id;\n+\tuint16_t flow_hash;\n+} __rte_packed;\n+\n uint16_t dsw_event_enqueue(void *port, const struct rte_event *event);\n uint16_t dsw_event_enqueue_burst(void *port,\n \t\t\t\t const struct rte_event events[],\ndiff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c\nindex f326147c9..f0347592d 100644\n--- a/drivers/event/dsw/dsw_event.c\n+++ b/drivers/event/dsw/dsw_event.c\n@@ -5,9 +5,11 @@\n #include \"dsw_evdev.h\"\n \n #include <stdbool.h>\n+#include <string.h>\n \n #include <rte_atomic.h>\n #include <rte_cycles.h>\n+#include <rte_memcpy.h>\n #include <rte_random.h>\n \n static bool\n@@ -140,6 +142,269 @@ dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)\n \tdsw_port_load_update(port, now);\n }\n \n+static void\n+dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)\n+{\n+\tvoid *raw_msg;\n+\n+\tmemcpy(&raw_msg, msg, sizeof(*msg));\n+\n+\t/* there's always room on the ring */\n+\twhile (rte_ring_enqueue(port->ctl_in_ring, raw_msg) != 0)\n+\t\trte_pause();\n+}\n+\n+static int\n+dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)\n+{\n+\tvoid *raw_msg;\n+\tint rc;\n+\n+\trc = rte_ring_dequeue(port->ctl_in_ring, &raw_msg);\n+\n+\tif (rc == 0)\n+\t\tmemcpy(msg, &raw_msg, sizeof(*msg));\n+\n+\treturn rc;\n+}\n+\n+static void\n+dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,\n+\t\t       uint8_t type, uint8_t queue_id, uint16_t flow_hash)\n+{\n+\tuint16_t port_id;\n+\tstruct dsw_ctl_msg msg = {\n+\t\t.type = type,\n+\t\t.originating_port_id = source_port->id,\n+\t\t.queue_id = queue_id,\n+\t\t.flow_hash = flow_hash\n+\t};\n+\n+\tfor (port_id = 0; port_id < dsw->num_ports; port_id++)\n+\t\tif (port_id != source_port->id)\n+\t\t\tdsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);\n+}\n+\n+static bool\n+dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,\n+\t\t\tuint16_t flow_hash)\n+{\n+\tuint16_t i;\n+\n+\tfor (i = 0; i < port->paused_flows_len; i++) {\n+\t\tstruct dsw_queue_flow *qf = &port->paused_flows[i];\n+\t\tif (qf->queue_id == queue_id &&\n+\t\t    qf->flow_hash == flow_hash)\n+\t\t\treturn true;\n+\t}\n+\treturn false;\n+}\n+\n+static void\n+dsw_port_add_paused_flow(struct dsw_port *port, uint8_t queue_id,\n+\t\t\t uint16_t paused_flow_hash)\n+{\n+\tport->paused_flows[port->paused_flows_len] = (struct dsw_queue_flow) {\n+\t\t.queue_id = queue_id,\n+\t\t.flow_hash = paused_flow_hash\n+\t};\n+\tport->paused_flows_len++;\n+}\n+\n+static void\n+dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id,\n+\t\t\t    uint16_t paused_flow_hash)\n+{\n+\tuint16_t i;\n+\n+\tfor (i = 0; i < port->paused_flows_len; i++) {\n+\t\tstruct dsw_queue_flow *qf = &port->paused_flows[i];\n+\n+\t\tif (qf->queue_id == queue_id &&\n+\t\t    qf->flow_hash == paused_flow_hash) {\n+\t\t\tuint16_t last_idx = port->paused_flows_len-1;\n+\t\t\tif (i != last_idx)\n+\t\t\t\tport->paused_flows[i] =\n+\t\t\t\t\tport->paused_flows[last_idx];\n+\t\t\tport->paused_flows_len--;\n+\t\t\tbreak;\n+\t\t}\n+\t}\n+}\n+\n+static void\n+dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);\n+\n+static void\n+dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port,\n+\t\t\t   uint8_t originating_port_id, uint8_t queue_id,\n+\t\t\t   uint16_t paused_flow_hash)\n+{\n+\tstruct dsw_ctl_msg cfm = {\n+\t\t.type = DSW_CTL_CFM,\n+\t\t.originating_port_id = port->id,\n+\t\t.queue_id = queue_id,\n+\t\t.flow_hash = paused_flow_hash\n+\t};\n+\n+\tDSW_LOG_DP_PORT(DEBUG, port->id, \"Pausing queue_id %d flow_hash %d.\\n\",\n+\t\t\tqueue_id, paused_flow_hash);\n+\n+\t/* There might be already-scheduled events belonging to the\n+\t * paused flow in the output buffers.\n+\t */\n+\tdsw_port_flush_out_buffers(dsw, port);\n+\n+\tdsw_port_add_paused_flow(port, queue_id, paused_flow_hash);\n+\n+\t/* Make sure any stores to the original port's in_ring is seen\n+\t * before the ctl message.\n+\t */\n+\trte_smp_wmb();\n+\n+\tdsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);\n+}\n+\n+static void\n+dsw_find_lowest_load_port(uint8_t *port_ids, uint16_t num_port_ids,\n+\t\t\t  uint8_t exclude_port_id, int16_t *port_loads,\n+\t\t\t  uint8_t *target_port_id, int16_t *target_load)\n+{\n+\tint16_t candidate_port_id = -1;\n+\tint16_t candidate_load = DSW_MAX_LOAD;\n+\tuint16_t i;\n+\n+\tfor (i = 0; i < num_port_ids; i++) {\n+\t\tuint8_t port_id = port_ids[i];\n+\t\tif (port_id != exclude_port_id) {\n+\t\t\tint16_t load = port_loads[port_id];\n+\t\t\tif (candidate_port_id == -1 ||\n+\t\t\t    load < candidate_load) {\n+\t\t\t\tcandidate_port_id = port_id;\n+\t\t\t\tcandidate_load = load;\n+\t\t\t}\n+\t\t}\n+\t}\n+\t*target_port_id = candidate_port_id;\n+\t*target_load = candidate_load;\n+}\n+\n+struct dsw_queue_flow_burst {\n+\tstruct dsw_queue_flow queue_flow;\n+\tuint16_t count;\n+};\n+\n+static inline int\n+dsw_cmp_burst(const void *v_burst_a, const void *v_burst_b)\n+{\n+\tconst struct dsw_queue_flow_burst *burst_a = v_burst_a;\n+\tconst struct dsw_queue_flow_burst *burst_b = v_burst_b;\n+\n+\tint a_count = burst_a->count;\n+\tint b_count = burst_b->count;\n+\n+\treturn a_count - b_count;\n+}\n+\n+#define DSW_QF_TO_INT(_qf)\t\t\t\t\t\\\n+\t((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))\n+\n+static inline int\n+dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)\n+{\n+\tconst struct dsw_queue_flow *qf_a = v_qf_a;\n+\tconst struct dsw_queue_flow *qf_b = v_qf_b;\n+\n+\treturn DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);\n+}\n+\n+static uint16_t\n+dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,\n+\t\t       struct dsw_queue_flow_burst *bursts)\n+{\n+\tuint16_t i;\n+\tstruct dsw_queue_flow_burst *current_burst = NULL;\n+\tuint16_t num_bursts = 0;\n+\n+\t/* We don't need the stable property, and the list is likely\n+\t * large enough for qsort() to outperform dsw_stable_sort(),\n+\t * so we use qsort() here.\n+\t */\n+\tqsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);\n+\n+\t/* arrange the (now-consecutive) events into bursts */\n+\tfor (i = 0; i < qfs_len; i++) {\n+\t\tif (i == 0 ||\n+\t\t    dsw_cmp_qf(&qfs[i], &current_burst->queue_flow) != 0) {\n+\t\t\tcurrent_burst = &bursts[num_bursts];\n+\t\t\tcurrent_burst->queue_flow = qfs[i];\n+\t\t\tcurrent_burst->count = 0;\n+\t\t\tnum_bursts++;\n+\t\t}\n+\t\tcurrent_burst->count++;\n+\t}\n+\n+\tqsort(bursts, num_bursts, sizeof(bursts[0]), dsw_cmp_burst);\n+\n+\treturn num_bursts;\n+}\n+\n+static bool\n+dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,\n+\t\t\tint16_t load_limit)\n+{\n+\tbool below_limit = false;\n+\tuint16_t i;\n+\n+\tfor (i = 0; i < dsw->num_ports; i++) {\n+\t\tint16_t load = rte_atomic16_read(&dsw->ports[i].load);\n+\t\tif (load < load_limit)\n+\t\t\tbelow_limit = true;\n+\t\tport_loads[i] = load;\n+\t}\n+\treturn below_limit;\n+}\n+\n+static bool\n+dsw_select_migration_target(struct dsw_evdev *dsw,\n+\t\t\t    struct dsw_port *source_port,\n+\t\t\t    struct dsw_queue_flow_burst *bursts,\n+\t\t\t    uint16_t num_bursts, int16_t *port_loads,\n+\t\t\t    int16_t max_load, struct dsw_queue_flow *target_qf,\n+\t\t\t    uint8_t *target_port_id)\n+{\n+\tuint16_t source_load = port_loads[source_port->id];\n+\tuint16_t i;\n+\n+\tfor (i = 0; i < num_bursts; i++) {\n+\t\tstruct dsw_queue_flow *qf = &bursts[i].queue_flow;\n+\n+\t\tif (dsw_port_is_flow_paused(source_port, qf->queue_id,\n+\t\t\t\t\t    qf->flow_hash))\n+\t\t\tcontinue;\n+\n+\t\tstruct dsw_queue *queue = &dsw->queues[qf->queue_id];\n+\t\tint16_t target_load;\n+\n+\t\tdsw_find_lowest_load_port(queue->serving_ports,\n+\t\t\t\t\t  queue->num_serving_ports,\n+\t\t\t\t\t  source_port->id, port_loads,\n+\t\t\t\t\t  target_port_id, &target_load);\n+\n+\t\tif (target_load < source_load &&\n+\t\t    target_load < max_load) {\n+\t\t\t*target_qf = *qf;\n+\t\t\treturn true;\n+\t\t}\n+\t}\n+\n+\tDSW_LOG_DP_PORT(DEBUG, source_port->id, \"For the %d flows considered, \"\n+\t\t\t\"no target port found with load less than %d.\\n\",\n+\t\t\tnum_bursts, DSW_LOAD_TO_PERCENT(max_load));\n+\n+\treturn false;\n+}\n+\n static uint8_t\n dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)\n {\n@@ -197,6 +462,14 @@ dsw_port_get_parallel_flow_id(struct dsw_port *port)\n \treturn flow_id;\n }\n \n+static void\n+dsw_port_buffer_paused(struct dsw_port *port,\n+\t\t       const struct rte_event *paused_event)\n+{\n+\tport->paused_events[port->paused_events_len] = *paused_event;\n+\tport->paused_events_len++;\n+}\n+\n static void\n dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,\n \t\t\t   uint8_t dest_port_id, const struct rte_event *event)\n@@ -256,11 +529,401 @@ dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,\n \n \tflow_hash = dsw_flow_id_hash(event->flow_id);\n \n+\tif (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,\n+\t\t\t\t\t     flow_hash))) {\n+\t\tdsw_port_buffer_paused(source_port, event);\n+\t\treturn;\n+\t}\n+\n \tdest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);\n \n \tdsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);\n }\n \n+static void\n+dsw_port_flush_paused_events(struct dsw_evdev *dsw,\n+\t\t\t     struct dsw_port *source_port,\n+\t\t\t     uint8_t queue_id, uint16_t paused_flow_hash)\n+{\n+\tuint16_t paused_events_len = source_port->paused_events_len;\n+\tstruct rte_event paused_events[paused_events_len];\n+\tuint8_t dest_port_id;\n+\tuint16_t i;\n+\n+\tif (paused_events_len == 0)\n+\t\treturn;\n+\n+\tif (dsw_port_is_flow_paused(source_port, queue_id, paused_flow_hash))\n+\t\treturn;\n+\n+\trte_memcpy(paused_events, source_port->paused_events,\n+\t\t   paused_events_len * sizeof(struct rte_event));\n+\n+\tsource_port->paused_events_len = 0;\n+\n+\tdest_port_id = dsw_schedule(dsw, queue_id, paused_flow_hash);\n+\n+\tfor (i = 0; i < paused_events_len; i++) {\n+\t\tstruct rte_event *event = &paused_events[i];\n+\t\tuint16_t flow_hash;\n+\n+\t\tflow_hash = dsw_flow_id_hash(event->flow_id);\n+\n+\t\tif (event->queue_id == queue_id &&\n+\t\t    flow_hash == paused_flow_hash)\n+\t\t\tdsw_port_buffer_non_paused(dsw, source_port,\n+\t\t\t\t\t\t   dest_port_id, event);\n+\t\telse\n+\t\t\tdsw_port_buffer_paused(source_port, event);\n+\t}\n+}\n+\n+static void\n+dsw_port_migration_stats(struct dsw_port *port)\n+{\n+\tuint64_t migration_latency;\n+\n+\tmigration_latency = (rte_get_timer_cycles() - port->migration_start);\n+\tport->migration_latency += migration_latency;\n+\tport->migrations++;\n+}\n+\n+static void\n+dsw_port_end_migration(struct dsw_evdev *dsw, struct dsw_port *port)\n+{\n+\tuint8_t queue_id = port->migration_target_qf.queue_id;\n+\tuint16_t flow_hash = port->migration_target_qf.flow_hash;\n+\n+\tport->migration_state = DSW_MIGRATION_STATE_IDLE;\n+\tport->seen_events_len = 0;\n+\n+\tdsw_port_migration_stats(port);\n+\n+\tif (dsw->queues[queue_id].schedule_type != RTE_SCHED_TYPE_PARALLEL) {\n+\t\tdsw_port_remove_paused_flow(port, queue_id, flow_hash);\n+\t\tdsw_port_flush_paused_events(dsw, port, queue_id, flow_hash);\n+\t}\n+\n+\tDSW_LOG_DP_PORT(DEBUG, port->id, \"Migration completed for queue_id \"\n+\t\t\t\"%d flow_hash %d.\\n\", queue_id, flow_hash);\n+}\n+\n+static void\n+dsw_port_consider_migration(struct dsw_evdev *dsw,\n+\t\t\t    struct dsw_port *source_port,\n+\t\t\t    uint64_t now)\n+{\n+\tbool any_port_below_limit;\n+\tstruct dsw_queue_flow *seen_events = source_port->seen_events;\n+\tuint16_t seen_events_len = source_port->seen_events_len;\n+\tstruct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];\n+\tuint16_t num_bursts;\n+\tint16_t source_port_load;\n+\tint16_t port_loads[dsw->num_ports];\n+\n+\tif (now < source_port->next_migration)\n+\t\treturn;\n+\n+\tif (dsw->num_ports == 1)\n+\t\treturn;\n+\n+\tDSW_LOG_DP_PORT(DEBUG, source_port->id, \"Considering migration.\\n\");\n+\n+\t/* Randomize interval to avoid having all threads considering\n+\t * migration at the same in point in time, which might lead to\n+\t * all choosing the same target port.\n+\t */\n+\tsource_port->next_migration = now +\n+\t\tsource_port->migration_interval / 2 +\n+\t\trte_rand() % source_port->migration_interval;\n+\n+\tif (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {\n+\t\tDSW_LOG_DP_PORT(DEBUG, source_port->id,\n+\t\t\t\t\"Migration already in progress.\\n\");\n+\t\treturn;\n+\t}\n+\n+\t/* For simplicity, avoid migration in the unlikely case there\n+\t * is still events to consume in the in_buffer (from the last\n+\t * migration).\n+\t */\n+\tif (source_port->in_buffer_len > 0) {\n+\t\tDSW_LOG_DP_PORT(DEBUG, source_port->id, \"There are still \"\n+\t\t\t\t\"events in the input buffer.\\n\");\n+\t\treturn;\n+\t}\n+\n+\tsource_port_load = rte_atomic16_read(&source_port->load);\n+\tif (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {\n+\t\tDSW_LOG_DP_PORT(DEBUG, source_port->id,\n+\t\t\t\t\"Load %d is below threshold level %d.\\n\",\n+\t\t\t\tDSW_LOAD_TO_PERCENT(source_port_load),\n+\t\t       DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));\n+\t\treturn;\n+\t}\n+\n+\t/* Avoid starting any expensive operations (sorting etc), in\n+\t * case of a scenario with all ports above the load limit.\n+\t */\n+\tany_port_below_limit =\n+\t\tdsw_retrieve_port_loads(dsw, port_loads,\n+\t\t\t\t\tDSW_MAX_TARGET_LOAD_FOR_MIGRATION);\n+\tif (!any_port_below_limit) {\n+\t\tDSW_LOG_DP_PORT(DEBUG, source_port->id,\n+\t\t\t\t\"Candidate target ports are all too highly \"\n+\t\t\t\t\"loaded.\\n\");\n+\t\treturn;\n+\t}\n+\n+\t/* Sort flows into 'bursts' to allow attempting to migrating\n+\t * small (but still active) flows first - this it to avoid\n+\t * having large flows moving around the worker cores too much\n+\t * (to avoid cache misses, among other things). Of course, the\n+\t * number of recorded events (queue+flow ids) are limited, and\n+\t * provides only a snapshot, so only so many conclusions can\n+\t * be drawn from this data.\n+\t */\n+\tnum_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,\n+\t\t\t\t\t    bursts);\n+\t/* For non-big-little systems, there's no point in moving the\n+\t * only (known) flow.\n+\t */\n+\tif (num_bursts < 2) {\n+\t\tDSW_LOG_DP_PORT(DEBUG, source_port->id, \"Only a single flow \"\n+\t\t\t\t\"queue_id %d flow_hash %d has been seen.\\n\",\n+\t\t\t\tbursts[0].queue_flow.queue_id,\n+\t\t\t\tbursts[0].queue_flow.flow_hash);\n+\t\treturn;\n+\t}\n+\n+\t/* The strategy is to first try to find a flow to move to a\n+\t * port with low load (below the migration-attempt\n+\t * threshold). If that fails, we try to find a port which is\n+\t * below the max threshold, and also less loaded than this\n+\t * port is.\n+\t */\n+\tif (!dsw_select_migration_target(dsw, source_port, bursts, num_bursts,\n+\t\t\t\t\t port_loads,\n+\t\t\t\t\t DSW_MIN_SOURCE_LOAD_FOR_MIGRATION,\n+\t\t\t\t\t &source_port->migration_target_qf,\n+\t\t\t\t\t &source_port->migration_target_port_id)\n+\t    &&\n+\t    !dsw_select_migration_target(dsw, source_port, bursts, num_bursts,\n+\t\t\t\t\t port_loads,\n+\t\t\t\t\t DSW_MAX_TARGET_LOAD_FOR_MIGRATION,\n+\t\t\t\t\t &source_port->migration_target_qf,\n+\t\t\t\t       &source_port->migration_target_port_id))\n+\t\treturn;\n+\n+\tDSW_LOG_DP_PORT(DEBUG, source_port->id, \"Migrating queue_id %d \"\n+\t\t\t\"flow_hash %d from port %d to port %d.\\n\",\n+\t\t\tsource_port->migration_target_qf.queue_id,\n+\t\t\tsource_port->migration_target_qf.flow_hash,\n+\t\t\tsource_port->id, source_port->migration_target_port_id);\n+\n+\t/* We have a winner. */\n+\n+\tsource_port->migration_state = DSW_MIGRATION_STATE_PAUSING;\n+\tsource_port->migration_start = rte_get_timer_cycles();\n+\n+\t/* No need to go through the whole pause procedure for\n+\t * parallel queues, since atomic/ordered semantics need not to\n+\t * be maintained.\n+\t */\n+\n+\tif (dsw->queues[source_port->migration_target_qf.queue_id].schedule_type\n+\t    == RTE_SCHED_TYPE_PARALLEL) {\n+\t\tuint8_t queue_id = source_port->migration_target_qf.queue_id;\n+\t\tuint16_t flow_hash = source_port->migration_target_qf.flow_hash;\n+\t\tuint8_t dest_port_id = source_port->migration_target_port_id;\n+\n+\t\t/* Single byte-sized stores are always atomic. */\n+\t\tdsw->queues[queue_id].flow_to_port_map[flow_hash] =\n+\t\t\tdest_port_id;\n+\t\trte_smp_wmb();\n+\n+\t\tdsw_port_end_migration(dsw, source_port);\n+\n+\t\treturn;\n+\t}\n+\n+\t/* There might be 'loopback' events already scheduled in the\n+\t * output buffers.\n+\t */\n+\tdsw_port_flush_out_buffers(dsw, source_port);\n+\n+\tdsw_port_add_paused_flow(source_port,\n+\t\t\t\t source_port->migration_target_qf.queue_id,\n+\t\t\t\t source_port->migration_target_qf.flow_hash);\n+\n+\tdsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,\n+\t\t\t       source_port->migration_target_qf.queue_id,\n+\t\t\t       source_port->migration_target_qf.flow_hash);\n+\tsource_port->cfm_cnt = 0;\n+}\n+\n+static void\n+dsw_port_flush_paused_events(struct dsw_evdev *dsw,\n+\t\t\t     struct dsw_port *source_port,\n+\t\t\t     uint8_t queue_id, uint16_t paused_flow_hash);\n+\n+static void\n+dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port,\n+\t\t\t     uint8_t originating_port_id, uint8_t queue_id,\n+\t\t\t     uint16_t paused_flow_hash)\n+{\n+\tstruct dsw_ctl_msg cfm = {\n+\t\t.type = DSW_CTL_CFM,\n+\t\t.originating_port_id = port->id,\n+\t\t.queue_id = queue_id,\n+\t\t.flow_hash = paused_flow_hash\n+\t};\n+\n+\tDSW_LOG_DP_PORT(DEBUG, port->id, \"Un-pausing queue_id %d flow_hash %d.\\n\",\n+\t\t\tqueue_id, paused_flow_hash);\n+\n+\tdsw_port_remove_paused_flow(port, queue_id, paused_flow_hash);\n+\n+\trte_smp_rmb();\n+\n+\tdsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);\n+\n+\tdsw_port_flush_paused_events(dsw, port, queue_id, paused_flow_hash);\n+}\n+\n+#define FORWARD_BURST_SIZE (32)\n+\n+static void\n+dsw_port_forward_migrated_flow(struct dsw_port *source_port,\n+\t\t\t       struct rte_event_ring *dest_ring,\n+\t\t\t       uint8_t queue_id,\n+\t\t\t       uint16_t flow_hash)\n+{\n+\tuint16_t events_left;\n+\n+\t/* Control ring message should been seen before the ring count\n+\t * is read on the port's in_ring.\n+\t */\n+\trte_smp_rmb();\n+\n+\tevents_left = rte_event_ring_count(source_port->in_ring);\n+\n+\twhile (events_left > 0) {\n+\t\tuint16_t in_burst_size =\n+\t\t\tRTE_MIN(FORWARD_BURST_SIZE, events_left);\n+\t\tstruct rte_event in_burst[in_burst_size];\n+\t\tuint16_t in_len;\n+\t\tuint16_t i;\n+\n+\t\tin_len = rte_event_ring_dequeue_burst(source_port->in_ring,\n+\t\t\t\t\t\t      in_burst,\n+\t\t\t\t\t\t      in_burst_size, NULL);\n+\t\t/* No need to care about bursting forwarded events (to\n+\t\t * the destination port's in_ring), since migration\n+\t\t * doesn't happen very often, and also the majority of\n+\t\t * the dequeued events will likely *not* be forwarded.\n+\t\t */\n+\t\tfor (i = 0; i < in_len; i++) {\n+\t\t\tstruct rte_event *e = &in_burst[i];\n+\t\t\tif (e->queue_id == queue_id &&\n+\t\t\t    dsw_flow_id_hash(e->flow_id) == flow_hash) {\n+\t\t\t\twhile (rte_event_ring_enqueue_burst(dest_ring,\n+\t\t\t\t\t\t\t\t    e, 1,\n+\t\t\t\t\t\t\t\t    NULL) != 1)\n+\t\t\t\t\trte_pause();\n+\t\t\t} else {\n+\t\t\t\tuint16_t last_idx = source_port->in_buffer_len;\n+\t\t\t\tsource_port->in_buffer[last_idx] = *e;\n+\t\t\t\tsource_port->in_buffer_len++;\n+\t\t\t}\n+\t\t}\n+\n+\t\tevents_left -= in_len;\n+\t}\n+}\n+\n+static void\n+dsw_port_move_migrating_flow(struct dsw_evdev *dsw,\n+\t\t\t     struct dsw_port *source_port)\n+{\n+\tuint8_t queue_id = source_port->migration_target_qf.queue_id;\n+\tuint16_t flow_hash = source_port->migration_target_qf.flow_hash;\n+\tuint8_t dest_port_id = source_port->migration_target_port_id;\n+\tstruct dsw_port *dest_port = &dsw->ports[dest_port_id];\n+\n+\tdsw_port_flush_out_buffers(dsw, source_port);\n+\n+\trte_smp_wmb();\n+\n+\tdsw->queues[queue_id].flow_to_port_map[flow_hash] =\n+\t\tdest_port_id;\n+\n+\tdsw_port_forward_migrated_flow(source_port, dest_port->in_ring,\n+\t\t\t\t       queue_id, flow_hash);\n+\n+\t/* Flow table update and migration destination port's enqueues\n+\t * must be seen before the control message.\n+\t */\n+\trte_smp_wmb();\n+\n+\tdsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, queue_id,\n+\t\t\t       flow_hash);\n+\tsource_port->cfm_cnt = 0;\n+\tsource_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;\n+}\n+\n+static void\n+dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)\n+{\n+\tport->cfm_cnt++;\n+\n+\tif (port->cfm_cnt == (dsw->num_ports-1)) {\n+\t\tswitch (port->migration_state) {\n+\t\tcase DSW_MIGRATION_STATE_PAUSING:\n+\t\t\tDSW_LOG_DP_PORT(DEBUG, port->id, \"Going into forwarding \"\n+\t\t\t\t\t\"migration state.\\n\");\n+\t\t\tport->migration_state = DSW_MIGRATION_STATE_FORWARDING;\n+\t\t\tbreak;\n+\t\tcase DSW_MIGRATION_STATE_UNPAUSING:\n+\t\t\tdsw_port_end_migration(dsw, port);\n+\t\t\tbreak;\n+\t\tdefault:\n+\t\t\tRTE_ASSERT(0);\n+\t\t\tbreak;\n+\t\t}\n+\t}\n+}\n+\n+static void\n+dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)\n+{\n+\tstruct dsw_ctl_msg msg;\n+\n+\t/* So any table loads happens before the ring dequeue, in the\n+\t * case of a 'paus' message.\n+\t */\n+\trte_smp_rmb();\n+\n+\tif (dsw_port_ctl_dequeue(port, &msg) == 0) {\n+\t\tswitch (msg.type) {\n+\t\tcase DSW_CTL_PAUS_REQ:\n+\t\t\tdsw_port_handle_pause_flow(dsw, port,\n+\t\t\t\t\t\t   msg.originating_port_id,\n+\t\t\t\t\t\t   msg.queue_id, msg.flow_hash);\n+\t\t\tbreak;\n+\t\tcase DSW_CTL_UNPAUS_REQ:\n+\t\t\tdsw_port_handle_unpause_flow(dsw, port,\n+\t\t\t\t\t\t     msg.originating_port_id,\n+\t\t\t\t\t\t     msg.queue_id,\n+\t\t\t\t\t\t     msg.flow_hash);\n+\t\t\tbreak;\n+\t\tcase DSW_CTL_CFM:\n+\t\t\tdsw_port_handle_confirm(dsw, port);\n+\t\t\tbreak;\n+\t\t}\n+\t}\n+}\n+\n static void\n dsw_port_note_op(struct dsw_port *port, uint16_t num_events)\n {\n@@ -270,12 +933,24 @@ dsw_port_note_op(struct dsw_port *port, uint16_t num_events)\n \tport->ops_since_bg_task += (num_events+1);\n }\n \n-static void\n-dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);\n-\n static void\n dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)\n {\n+\tif (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&\n+\t\t     port->pending_releases == 0))\n+\t\tdsw_port_move_migrating_flow(dsw, port);\n+\n+\t/* Polling the control ring is relatively inexpensive, and\n+\t * polling it often helps bringing down migration latency, so\n+\t * do this for every iteration.\n+\t */\n+\tdsw_port_ctl_process(dsw, port);\n+\n+\t/* To avoid considering migration and flushing output buffers\n+\t * on every dequeue/enqueue call, the scheduler only performs\n+\t * such 'background' tasks every nth\n+\t * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.\n+\t */\n \tif (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {\n \t\tuint64_t now;\n \n@@ -290,6 +965,8 @@ dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)\n \n \t\tdsw_port_consider_load_update(port, now);\n \n+\t\tdsw_port_consider_migration(dsw, port, now);\n+\n \t\tport->ops_since_bg_task = 0;\n \t}\n }\n@@ -339,7 +1016,6 @@ dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],\n \t */\n \tif (unlikely(events_len == 0)) {\n \t\tdsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);\n-\t\tdsw_port_flush_out_buffers(dsw, source_port);\n \t\treturn 0;\n \t}\n \n@@ -423,10 +1099,52 @@ dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)\n \treturn dsw_event_dequeue_burst(port, events, 1, wait);\n }\n \n+static void\n+dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,\n+\t\t\t    uint16_t num)\n+{\n+\tuint16_t i;\n+\n+\tfor (i = 0; i < num; i++) {\n+\t\tuint16_t l_idx = port->seen_events_idx;\n+\t\tstruct dsw_queue_flow *qf = &port->seen_events[l_idx];\n+\t\tstruct rte_event *event = &events[i];\n+\t\tqf->queue_id = event->queue_id;\n+\t\tqf->flow_hash = dsw_flow_id_hash(event->flow_id);\n+\n+\t\tport->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;\n+\t}\n+\n+\tif (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))\n+\t\tport->seen_events_len =\n+\t\t\tRTE_MIN(port->seen_events_len + num,\n+\t\t\t\tDSW_MAX_EVENTS_RECORDED);\n+}\n+\n static uint16_t\n dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,\n \t\t       uint16_t num)\n {\n+\tstruct dsw_port *source_port = port;\n+\tstruct dsw_evdev *dsw = source_port->dsw;\n+\n+\tdsw_port_ctl_process(dsw, source_port);\n+\n+\tif (unlikely(port->in_buffer_len > 0)) {\n+\t\tuint16_t dequeued = RTE_MIN(num, port->in_buffer_len);\n+\n+\t\trte_memcpy(events, &port->in_buffer[port->in_buffer_start],\n+\t\t\t   dequeued * sizeof(struct rte_event));\n+\n+\t\tport->in_buffer_start += dequeued;\n+\t\tport->in_buffer_len -= dequeued;\n+\n+\t\tif (port->in_buffer_len == 0)\n+\t\t\tport->in_buffer_start = 0;\n+\n+\t\treturn dequeued;\n+\t}\n+\n \treturn rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);\n }\n \n@@ -458,6 +1176,15 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,\n \t\t\t\tdequeued);\n \n \t\tdsw_port_return_credits(dsw, source_port, dequeued);\n+\n+\t\t/* One potential optimization one might think of is to\n+\t\t * add a migration state (prior to 'pausing'), and\n+\t\t * only record seen events when the port is in this\n+\t\t * state (and transit to 'pausing' when enough events\n+\t\t * have been gathered). However, that schema doesn't\n+\t\t * seem to improve performance.\n+\t\t */\n+\t\tdsw_port_record_seen_events(port, events, dequeued);\n \t}\n \t/* XXX: Assuming the port can't produce any more work,\n \t *\tconsider flushing the output buffer, on dequeued ==\n",
    "prefixes": [
        "v3",
        "07/10"
    ]
}