[v4,07/10] event/dsw: add load balancing to the DSW event device

Message ID 20180918124514.10615-8-mattias.ronnblom@ericsson.com (mailing list archive)
State Accepted, archived
Delegated to: Jerin Jacob
Headers
Series A Distributed Software Event Device |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Mattias Rönnblom Sept. 18, 2018, 12:45 p.m. UTC
  The DSW event device will now attempt to migrate (move) flows between
ports in order to balance the load.

Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
---
 drivers/event/dsw/dsw_evdev.c |  27 ++
 drivers/event/dsw/dsw_evdev.h |  80 ++++
 drivers/event/dsw/dsw_event.c | 735 +++++++++++++++++++++++++++++++++-
 3 files changed, 838 insertions(+), 4 deletions(-)
  

Patch

diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c
index bcfa17bab..2ecb365ba 100644
--- a/drivers/event/dsw/dsw_evdev.c
+++ b/drivers/event/dsw/dsw_evdev.c
@@ -20,6 +20,7 @@  dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
 	struct dsw_port *port;
 	struct rte_event_ring *in_ring;
+	struct rte_ring *ctl_in_ring;
 	char ring_name[RTE_RING_NAMESIZE];
 
 	port = &dsw->ports[port_id];
@@ -42,13 +43,29 @@  dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
 	if (in_ring == NULL)
 		return -ENOMEM;
 
+	snprintf(ring_name, sizeof(ring_name), "dswctl%d_p%u",
+		 dev->data->dev_id, port_id);
+
+	ctl_in_ring = rte_ring_create(ring_name, DSW_CTL_IN_RING_SIZE,
+				      dev->data->socket_id,
+				      RING_F_SC_DEQ|RING_F_EXACT_SZ);
+
+	if (ctl_in_ring == NULL) {
+		rte_event_ring_free(in_ring);
+		return -ENOMEM;
+	}
+
 	port->in_ring = in_ring;
+	port->ctl_in_ring = ctl_in_ring;
 
 	rte_atomic16_init(&port->load);
 
 	port->load_update_interval =
 		(DSW_LOAD_UPDATE_INTERVAL * rte_get_timer_hz()) / US_PER_S;
 
+	port->migration_interval =
+		(DSW_MIGRATION_INTERVAL * rte_get_timer_hz()) / US_PER_S;
+
 	dev->data->ports[port_id] = port;
 
 	return 0;
@@ -72,6 +89,7 @@  dsw_port_release(void *p)
 	struct dsw_port *port = p;
 
 	rte_event_ring_free(port->in_ring);
+	rte_ring_free(port->ctl_in_ring);
 }
 
 static int
@@ -272,6 +290,14 @@  dsw_port_drain_buf(uint8_t dev_id, struct rte_event *buf, uint16_t buf_len,
 		flush(dev_id, buf[i], flush_arg);
 }
 
+static void
+dsw_port_drain_paused(uint8_t dev_id, struct dsw_port *port,
+		      eventdev_stop_flush_t flush, void *flush_arg)
+{
+	dsw_port_drain_buf(dev_id, port->paused_events, port->paused_events_len,
+			   flush, flush_arg);
+}
+
 static void
 dsw_port_drain_out(uint8_t dev_id, struct dsw_evdev *dsw, struct dsw_port *port,
 		   eventdev_stop_flush_t flush, void *flush_arg)
@@ -308,6 +334,7 @@  dsw_drain(uint8_t dev_id, struct dsw_evdev *dsw,
 		struct dsw_port *port = &dsw->ports[port_id];
 
 		dsw_port_drain_out(dev_id, dsw, port, flush, flush_arg);
+		dsw_port_drain_paused(dev_id, port, flush, flush_arg);
 		dsw_port_drain_in_ring(dev_id, port, flush, flush_arg);
 	}
 }
diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
index a5399dda5..783c418bf 100644
--- a/drivers/event/dsw/dsw_evdev.h
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -73,7 +73,37 @@ 
 #define DSW_LOAD_UPDATE_INTERVAL (DSW_MIGRATION_INTERVAL/4)
 #define DSW_OLD_LOAD_WEIGHT (1)
 
+/* The minimum time (in us) between two flow migrations. What puts an
+ * upper limit on the actual migration rate is primarily the pace in
+ * which the ports send and receive control messages, which in turn is
+ * largely a function of how much cycles are spent the processing of
+ * an event burst.
+ */
 #define DSW_MIGRATION_INTERVAL (1000)
+#define DSW_MIN_SOURCE_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(70))
+#define DSW_MAX_TARGET_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(95))
+
+#define DSW_MAX_EVENTS_RECORDED (128)
+
+/* Only one outstanding migration per port is allowed */
+#define DSW_MAX_PAUSED_FLOWS (DSW_MAX_PORTS)
+
+/* Enough room for paus request/confirm and unpaus request/confirm for
+ * all possible senders.
+ */
+#define DSW_CTL_IN_RING_SIZE ((DSW_MAX_PORTS-1)*4)
+
+struct dsw_queue_flow {
+	uint8_t queue_id;
+	uint16_t flow_hash;
+};
+
+enum dsw_migration_state {
+	DSW_MIGRATION_STATE_IDLE,
+	DSW_MIGRATION_STATE_PAUSING,
+	DSW_MIGRATION_STATE_FORWARDING,
+	DSW_MIGRATION_STATE_UNPAUSING
+};
 
 struct dsw_port {
 	uint16_t id;
@@ -98,6 +128,7 @@  struct dsw_port {
 
 	uint16_t ops_since_bg_task;
 
+	/* most recent 'background' processing */
 	uint64_t last_bg;
 
 	/* For port load measurement. */
@@ -108,11 +139,46 @@  struct dsw_port {
 	uint64_t busy_cycles;
 	uint64_t total_busy_cycles;
 
+	/* For the ctl interface and flow migration mechanism. */
+	uint64_t next_migration;
+	uint64_t migration_interval;
+	enum dsw_migration_state migration_state;
+
+	uint64_t migration_start;
+	uint64_t migrations;
+	uint64_t migration_latency;
+
+	uint8_t migration_target_port_id;
+	struct dsw_queue_flow migration_target_qf;
+	uint8_t cfm_cnt;
+
+	uint16_t paused_flows_len;
+	struct dsw_queue_flow paused_flows[DSW_MAX_PAUSED_FLOWS];
+
+	/* In a very contrived worst case all inflight events can be
+	 * laying around paused here.
+	 */
+	uint16_t paused_events_len;
+	struct rte_event paused_events[DSW_MAX_EVENTS];
+
+	uint16_t seen_events_len;
+	uint16_t seen_events_idx;
+	struct dsw_queue_flow seen_events[DSW_MAX_EVENTS_RECORDED];
+
 	uint16_t out_buffer_len[DSW_MAX_PORTS];
 	struct rte_event out_buffer[DSW_MAX_PORTS][DSW_MAX_PORT_OUT_BUFFER];
 
+	uint16_t in_buffer_len;
+	uint16_t in_buffer_start;
+	/* This buffer may contain events that were read up from the
+	 * in_ring during the flow migration process.
+	 */
+	struct rte_event in_buffer[DSW_MAX_EVENTS];
+
 	struct rte_event_ring *in_ring __rte_cache_aligned;
 
+	struct rte_ring *ctl_in_ring __rte_cache_aligned;
+
 	/* Estimate of current port load. */
 	rte_atomic16_t load __rte_cache_aligned;
 } __rte_cache_aligned;
@@ -137,6 +203,20 @@  struct dsw_evdev {
 	rte_atomic32_t credits_on_loan __rte_cache_aligned;
 };
 
+#define DSW_CTL_PAUS_REQ (0)
+#define DSW_CTL_UNPAUS_REQ (1)
+#define DSW_CTL_CFM (2)
+
+/* sizeof(struct dsw_ctl_msg) must be equal or less than
+ * sizeof(void *), to fit on the control ring.
+ */
+struct dsw_ctl_msg {
+	uint8_t type:2;
+	uint8_t originating_port_id:6;
+	uint8_t queue_id;
+	uint16_t flow_hash;
+} __rte_packed;
+
 uint16_t dsw_event_enqueue(void *port, const struct rte_event *event);
 uint16_t dsw_event_enqueue_burst(void *port,
 				 const struct rte_event events[],
diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c
index f326147c9..f0347592d 100644
--- a/drivers/event/dsw/dsw_event.c
+++ b/drivers/event/dsw/dsw_event.c
@@ -5,9 +5,11 @@ 
 #include "dsw_evdev.h"
 
 #include <stdbool.h>
+#include <string.h>
 
 #include <rte_atomic.h>
 #include <rte_cycles.h>
+#include <rte_memcpy.h>
 #include <rte_random.h>
 
 static bool
@@ -140,6 +142,269 @@  dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
 	dsw_port_load_update(port, now);
 }
 
+static void
+dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
+{
+	void *raw_msg;
+
+	memcpy(&raw_msg, msg, sizeof(*msg));
+
+	/* there's always room on the ring */
+	while (rte_ring_enqueue(port->ctl_in_ring, raw_msg) != 0)
+		rte_pause();
+}
+
+static int
+dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
+{
+	void *raw_msg;
+	int rc;
+
+	rc = rte_ring_dequeue(port->ctl_in_ring, &raw_msg);
+
+	if (rc == 0)
+		memcpy(msg, &raw_msg, sizeof(*msg));
+
+	return rc;
+}
+
+static void
+dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
+		       uint8_t type, uint8_t queue_id, uint16_t flow_hash)
+{
+	uint16_t port_id;
+	struct dsw_ctl_msg msg = {
+		.type = type,
+		.originating_port_id = source_port->id,
+		.queue_id = queue_id,
+		.flow_hash = flow_hash
+	};
+
+	for (port_id = 0; port_id < dsw->num_ports; port_id++)
+		if (port_id != source_port->id)
+			dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
+}
+
+static bool
+dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
+			uint16_t flow_hash)
+{
+	uint16_t i;
+
+	for (i = 0; i < port->paused_flows_len; i++) {
+		struct dsw_queue_flow *qf = &port->paused_flows[i];
+		if (qf->queue_id == queue_id &&
+		    qf->flow_hash == flow_hash)
+			return true;
+	}
+	return false;
+}
+
+static void
+dsw_port_add_paused_flow(struct dsw_port *port, uint8_t queue_id,
+			 uint16_t paused_flow_hash)
+{
+	port->paused_flows[port->paused_flows_len] = (struct dsw_queue_flow) {
+		.queue_id = queue_id,
+		.flow_hash = paused_flow_hash
+	};
+	port->paused_flows_len++;
+}
+
+static void
+dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id,
+			    uint16_t paused_flow_hash)
+{
+	uint16_t i;
+
+	for (i = 0; i < port->paused_flows_len; i++) {
+		struct dsw_queue_flow *qf = &port->paused_flows[i];
+
+		if (qf->queue_id == queue_id &&
+		    qf->flow_hash == paused_flow_hash) {
+			uint16_t last_idx = port->paused_flows_len-1;
+			if (i != last_idx)
+				port->paused_flows[i] =
+					port->paused_flows[last_idx];
+			port->paused_flows_len--;
+			break;
+		}
+	}
+}
+
+static void
+dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
+
+static void
+dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
+			   uint8_t originating_port_id, uint8_t queue_id,
+			   uint16_t paused_flow_hash)
+{
+	struct dsw_ctl_msg cfm = {
+		.type = DSW_CTL_CFM,
+		.originating_port_id = port->id,
+		.queue_id = queue_id,
+		.flow_hash = paused_flow_hash
+	};
+
+	DSW_LOG_DP_PORT(DEBUG, port->id, "Pausing queue_id %d flow_hash %d.\n",
+			queue_id, paused_flow_hash);
+
+	/* There might be already-scheduled events belonging to the
+	 * paused flow in the output buffers.
+	 */
+	dsw_port_flush_out_buffers(dsw, port);
+
+	dsw_port_add_paused_flow(port, queue_id, paused_flow_hash);
+
+	/* Make sure any stores to the original port's in_ring is seen
+	 * before the ctl message.
+	 */
+	rte_smp_wmb();
+
+	dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
+}
+
+static void
+dsw_find_lowest_load_port(uint8_t *port_ids, uint16_t num_port_ids,
+			  uint8_t exclude_port_id, int16_t *port_loads,
+			  uint8_t *target_port_id, int16_t *target_load)
+{
+	int16_t candidate_port_id = -1;
+	int16_t candidate_load = DSW_MAX_LOAD;
+	uint16_t i;
+
+	for (i = 0; i < num_port_ids; i++) {
+		uint8_t port_id = port_ids[i];
+		if (port_id != exclude_port_id) {
+			int16_t load = port_loads[port_id];
+			if (candidate_port_id == -1 ||
+			    load < candidate_load) {
+				candidate_port_id = port_id;
+				candidate_load = load;
+			}
+		}
+	}
+	*target_port_id = candidate_port_id;
+	*target_load = candidate_load;
+}
+
+struct dsw_queue_flow_burst {
+	struct dsw_queue_flow queue_flow;
+	uint16_t count;
+};
+
+static inline int
+dsw_cmp_burst(const void *v_burst_a, const void *v_burst_b)
+{
+	const struct dsw_queue_flow_burst *burst_a = v_burst_a;
+	const struct dsw_queue_flow_burst *burst_b = v_burst_b;
+
+	int a_count = burst_a->count;
+	int b_count = burst_b->count;
+
+	return a_count - b_count;
+}
+
+#define DSW_QF_TO_INT(_qf)					\
+	((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
+
+static inline int
+dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
+{
+	const struct dsw_queue_flow *qf_a = v_qf_a;
+	const struct dsw_queue_flow *qf_b = v_qf_b;
+
+	return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
+}
+
+static uint16_t
+dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
+		       struct dsw_queue_flow_burst *bursts)
+{
+	uint16_t i;
+	struct dsw_queue_flow_burst *current_burst = NULL;
+	uint16_t num_bursts = 0;
+
+	/* We don't need the stable property, and the list is likely
+	 * large enough for qsort() to outperform dsw_stable_sort(),
+	 * so we use qsort() here.
+	 */
+	qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
+
+	/* arrange the (now-consecutive) events into bursts */
+	for (i = 0; i < qfs_len; i++) {
+		if (i == 0 ||
+		    dsw_cmp_qf(&qfs[i], &current_burst->queue_flow) != 0) {
+			current_burst = &bursts[num_bursts];
+			current_burst->queue_flow = qfs[i];
+			current_burst->count = 0;
+			num_bursts++;
+		}
+		current_burst->count++;
+	}
+
+	qsort(bursts, num_bursts, sizeof(bursts[0]), dsw_cmp_burst);
+
+	return num_bursts;
+}
+
+static bool
+dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
+			int16_t load_limit)
+{
+	bool below_limit = false;
+	uint16_t i;
+
+	for (i = 0; i < dsw->num_ports; i++) {
+		int16_t load = rte_atomic16_read(&dsw->ports[i].load);
+		if (load < load_limit)
+			below_limit = true;
+		port_loads[i] = load;
+	}
+	return below_limit;
+}
+
+static bool
+dsw_select_migration_target(struct dsw_evdev *dsw,
+			    struct dsw_port *source_port,
+			    struct dsw_queue_flow_burst *bursts,
+			    uint16_t num_bursts, int16_t *port_loads,
+			    int16_t max_load, struct dsw_queue_flow *target_qf,
+			    uint8_t *target_port_id)
+{
+	uint16_t source_load = port_loads[source_port->id];
+	uint16_t i;
+
+	for (i = 0; i < num_bursts; i++) {
+		struct dsw_queue_flow *qf = &bursts[i].queue_flow;
+
+		if (dsw_port_is_flow_paused(source_port, qf->queue_id,
+					    qf->flow_hash))
+			continue;
+
+		struct dsw_queue *queue = &dsw->queues[qf->queue_id];
+		int16_t target_load;
+
+		dsw_find_lowest_load_port(queue->serving_ports,
+					  queue->num_serving_ports,
+					  source_port->id, port_loads,
+					  target_port_id, &target_load);
+
+		if (target_load < source_load &&
+		    target_load < max_load) {
+			*target_qf = *qf;
+			return true;
+		}
+	}
+
+	DSW_LOG_DP_PORT(DEBUG, source_port->id, "For the %d flows considered, "
+			"no target port found with load less than %d.\n",
+			num_bursts, DSW_LOAD_TO_PERCENT(max_load));
+
+	return false;
+}
+
 static uint8_t
 dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
 {
@@ -197,6 +462,14 @@  dsw_port_get_parallel_flow_id(struct dsw_port *port)
 	return flow_id;
 }
 
+static void
+dsw_port_buffer_paused(struct dsw_port *port,
+		       const struct rte_event *paused_event)
+{
+	port->paused_events[port->paused_events_len] = *paused_event;
+	port->paused_events_len++;
+}
+
 static void
 dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
 			   uint8_t dest_port_id, const struct rte_event *event)
@@ -256,11 +529,401 @@  dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
 
 	flow_hash = dsw_flow_id_hash(event->flow_id);
 
+	if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
+					     flow_hash))) {
+		dsw_port_buffer_paused(source_port, event);
+		return;
+	}
+
 	dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
 
 	dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
 }
 
+static void
+dsw_port_flush_paused_events(struct dsw_evdev *dsw,
+			     struct dsw_port *source_port,
+			     uint8_t queue_id, uint16_t paused_flow_hash)
+{
+	uint16_t paused_events_len = source_port->paused_events_len;
+	struct rte_event paused_events[paused_events_len];
+	uint8_t dest_port_id;
+	uint16_t i;
+
+	if (paused_events_len == 0)
+		return;
+
+	if (dsw_port_is_flow_paused(source_port, queue_id, paused_flow_hash))
+		return;
+
+	rte_memcpy(paused_events, source_port->paused_events,
+		   paused_events_len * sizeof(struct rte_event));
+
+	source_port->paused_events_len = 0;
+
+	dest_port_id = dsw_schedule(dsw, queue_id, paused_flow_hash);
+
+	for (i = 0; i < paused_events_len; i++) {
+		struct rte_event *event = &paused_events[i];
+		uint16_t flow_hash;
+
+		flow_hash = dsw_flow_id_hash(event->flow_id);
+
+		if (event->queue_id == queue_id &&
+		    flow_hash == paused_flow_hash)
+			dsw_port_buffer_non_paused(dsw, source_port,
+						   dest_port_id, event);
+		else
+			dsw_port_buffer_paused(source_port, event);
+	}
+}
+
+static void
+dsw_port_migration_stats(struct dsw_port *port)
+{
+	uint64_t migration_latency;
+
+	migration_latency = (rte_get_timer_cycles() - port->migration_start);
+	port->migration_latency += migration_latency;
+	port->migrations++;
+}
+
+static void
+dsw_port_end_migration(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+	uint8_t queue_id = port->migration_target_qf.queue_id;
+	uint16_t flow_hash = port->migration_target_qf.flow_hash;
+
+	port->migration_state = DSW_MIGRATION_STATE_IDLE;
+	port->seen_events_len = 0;
+
+	dsw_port_migration_stats(port);
+
+	if (dsw->queues[queue_id].schedule_type != RTE_SCHED_TYPE_PARALLEL) {
+		dsw_port_remove_paused_flow(port, queue_id, flow_hash);
+		dsw_port_flush_paused_events(dsw, port, queue_id, flow_hash);
+	}
+
+	DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for queue_id "
+			"%d flow_hash %d.\n", queue_id, flow_hash);
+}
+
+static void
+dsw_port_consider_migration(struct dsw_evdev *dsw,
+			    struct dsw_port *source_port,
+			    uint64_t now)
+{
+	bool any_port_below_limit;
+	struct dsw_queue_flow *seen_events = source_port->seen_events;
+	uint16_t seen_events_len = source_port->seen_events_len;
+	struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
+	uint16_t num_bursts;
+	int16_t source_port_load;
+	int16_t port_loads[dsw->num_ports];
+
+	if (now < source_port->next_migration)
+		return;
+
+	if (dsw->num_ports == 1)
+		return;
+
+	DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering migration.\n");
+
+	/* Randomize interval to avoid having all threads considering
+	 * migration at the same in point in time, which might lead to
+	 * all choosing the same target port.
+	 */
+	source_port->next_migration = now +
+		source_port->migration_interval / 2 +
+		rte_rand() % source_port->migration_interval;
+
+	if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
+		DSW_LOG_DP_PORT(DEBUG, source_port->id,
+				"Migration already in progress.\n");
+		return;
+	}
+
+	/* For simplicity, avoid migration in the unlikely case there
+	 * is still events to consume in the in_buffer (from the last
+	 * migration).
+	 */
+	if (source_port->in_buffer_len > 0) {
+		DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
+				"events in the input buffer.\n");
+		return;
+	}
+
+	source_port_load = rte_atomic16_read(&source_port->load);
+	if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
+		DSW_LOG_DP_PORT(DEBUG, source_port->id,
+				"Load %d is below threshold level %d.\n",
+				DSW_LOAD_TO_PERCENT(source_port_load),
+		       DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
+		return;
+	}
+
+	/* Avoid starting any expensive operations (sorting etc), in
+	 * case of a scenario with all ports above the load limit.
+	 */
+	any_port_below_limit =
+		dsw_retrieve_port_loads(dsw, port_loads,
+					DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
+	if (!any_port_below_limit) {
+		DSW_LOG_DP_PORT(DEBUG, source_port->id,
+				"Candidate target ports are all too highly "
+				"loaded.\n");
+		return;
+	}
+
+	/* Sort flows into 'bursts' to allow attempting to migrating
+	 * small (but still active) flows first - this it to avoid
+	 * having large flows moving around the worker cores too much
+	 * (to avoid cache misses, among other things). Of course, the
+	 * number of recorded events (queue+flow ids) are limited, and
+	 * provides only a snapshot, so only so many conclusions can
+	 * be drawn from this data.
+	 */
+	num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
+					    bursts);
+	/* For non-big-little systems, there's no point in moving the
+	 * only (known) flow.
+	 */
+	if (num_bursts < 2) {
+		DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
+				"queue_id %d flow_hash %d has been seen.\n",
+				bursts[0].queue_flow.queue_id,
+				bursts[0].queue_flow.flow_hash);
+		return;
+	}
+
+	/* The strategy is to first try to find a flow to move to a
+	 * port with low load (below the migration-attempt
+	 * threshold). If that fails, we try to find a port which is
+	 * below the max threshold, and also less loaded than this
+	 * port is.
+	 */
+	if (!dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
+					 port_loads,
+					 DSW_MIN_SOURCE_LOAD_FOR_MIGRATION,
+					 &source_port->migration_target_qf,
+					 &source_port->migration_target_port_id)
+	    &&
+	    !dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
+					 port_loads,
+					 DSW_MAX_TARGET_LOAD_FOR_MIGRATION,
+					 &source_port->migration_target_qf,
+				       &source_port->migration_target_port_id))
+		return;
+
+	DSW_LOG_DP_PORT(DEBUG, source_port->id, "Migrating queue_id %d "
+			"flow_hash %d from port %d to port %d.\n",
+			source_port->migration_target_qf.queue_id,
+			source_port->migration_target_qf.flow_hash,
+			source_port->id, source_port->migration_target_port_id);
+
+	/* We have a winner. */
+
+	source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
+	source_port->migration_start = rte_get_timer_cycles();
+
+	/* No need to go through the whole pause procedure for
+	 * parallel queues, since atomic/ordered semantics need not to
+	 * be maintained.
+	 */
+
+	if (dsw->queues[source_port->migration_target_qf.queue_id].schedule_type
+	    == RTE_SCHED_TYPE_PARALLEL) {
+		uint8_t queue_id = source_port->migration_target_qf.queue_id;
+		uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
+		uint8_t dest_port_id = source_port->migration_target_port_id;
+
+		/* Single byte-sized stores are always atomic. */
+		dsw->queues[queue_id].flow_to_port_map[flow_hash] =
+			dest_port_id;
+		rte_smp_wmb();
+
+		dsw_port_end_migration(dsw, source_port);
+
+		return;
+	}
+
+	/* There might be 'loopback' events already scheduled in the
+	 * output buffers.
+	 */
+	dsw_port_flush_out_buffers(dsw, source_port);
+
+	dsw_port_add_paused_flow(source_port,
+				 source_port->migration_target_qf.queue_id,
+				 source_port->migration_target_qf.flow_hash);
+
+	dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
+			       source_port->migration_target_qf.queue_id,
+			       source_port->migration_target_qf.flow_hash);
+	source_port->cfm_cnt = 0;
+}
+
+static void
+dsw_port_flush_paused_events(struct dsw_evdev *dsw,
+			     struct dsw_port *source_port,
+			     uint8_t queue_id, uint16_t paused_flow_hash);
+
+static void
+dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
+			     uint8_t originating_port_id, uint8_t queue_id,
+			     uint16_t paused_flow_hash)
+{
+	struct dsw_ctl_msg cfm = {
+		.type = DSW_CTL_CFM,
+		.originating_port_id = port->id,
+		.queue_id = queue_id,
+		.flow_hash = paused_flow_hash
+	};
+
+	DSW_LOG_DP_PORT(DEBUG, port->id, "Un-pausing queue_id %d flow_hash %d.\n",
+			queue_id, paused_flow_hash);
+
+	dsw_port_remove_paused_flow(port, queue_id, paused_flow_hash);
+
+	rte_smp_rmb();
+
+	dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
+
+	dsw_port_flush_paused_events(dsw, port, queue_id, paused_flow_hash);
+}
+
+#define FORWARD_BURST_SIZE (32)
+
+static void
+dsw_port_forward_migrated_flow(struct dsw_port *source_port,
+			       struct rte_event_ring *dest_ring,
+			       uint8_t queue_id,
+			       uint16_t flow_hash)
+{
+	uint16_t events_left;
+
+	/* Control ring message should been seen before the ring count
+	 * is read on the port's in_ring.
+	 */
+	rte_smp_rmb();
+
+	events_left = rte_event_ring_count(source_port->in_ring);
+
+	while (events_left > 0) {
+		uint16_t in_burst_size =
+			RTE_MIN(FORWARD_BURST_SIZE, events_left);
+		struct rte_event in_burst[in_burst_size];
+		uint16_t in_len;
+		uint16_t i;
+
+		in_len = rte_event_ring_dequeue_burst(source_port->in_ring,
+						      in_burst,
+						      in_burst_size, NULL);
+		/* No need to care about bursting forwarded events (to
+		 * the destination port's in_ring), since migration
+		 * doesn't happen very often, and also the majority of
+		 * the dequeued events will likely *not* be forwarded.
+		 */
+		for (i = 0; i < in_len; i++) {
+			struct rte_event *e = &in_burst[i];
+			if (e->queue_id == queue_id &&
+			    dsw_flow_id_hash(e->flow_id) == flow_hash) {
+				while (rte_event_ring_enqueue_burst(dest_ring,
+								    e, 1,
+								    NULL) != 1)
+					rte_pause();
+			} else {
+				uint16_t last_idx = source_port->in_buffer_len;
+				source_port->in_buffer[last_idx] = *e;
+				source_port->in_buffer_len++;
+			}
+		}
+
+		events_left -= in_len;
+	}
+}
+
+static void
+dsw_port_move_migrating_flow(struct dsw_evdev *dsw,
+			     struct dsw_port *source_port)
+{
+	uint8_t queue_id = source_port->migration_target_qf.queue_id;
+	uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
+	uint8_t dest_port_id = source_port->migration_target_port_id;
+	struct dsw_port *dest_port = &dsw->ports[dest_port_id];
+
+	dsw_port_flush_out_buffers(dsw, source_port);
+
+	rte_smp_wmb();
+
+	dsw->queues[queue_id].flow_to_port_map[flow_hash] =
+		dest_port_id;
+
+	dsw_port_forward_migrated_flow(source_port, dest_port->in_ring,
+				       queue_id, flow_hash);
+
+	/* Flow table update and migration destination port's enqueues
+	 * must be seen before the control message.
+	 */
+	rte_smp_wmb();
+
+	dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, queue_id,
+			       flow_hash);
+	source_port->cfm_cnt = 0;
+	source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
+}
+
+static void
+dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+	port->cfm_cnt++;
+
+	if (port->cfm_cnt == (dsw->num_ports-1)) {
+		switch (port->migration_state) {
+		case DSW_MIGRATION_STATE_PAUSING:
+			DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding "
+					"migration state.\n");
+			port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
+			break;
+		case DSW_MIGRATION_STATE_UNPAUSING:
+			dsw_port_end_migration(dsw, port);
+			break;
+		default:
+			RTE_ASSERT(0);
+			break;
+		}
+	}
+}
+
+static void
+dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+	struct dsw_ctl_msg msg;
+
+	/* So any table loads happens before the ring dequeue, in the
+	 * case of a 'paus' message.
+	 */
+	rte_smp_rmb();
+
+	if (dsw_port_ctl_dequeue(port, &msg) == 0) {
+		switch (msg.type) {
+		case DSW_CTL_PAUS_REQ:
+			dsw_port_handle_pause_flow(dsw, port,
+						   msg.originating_port_id,
+						   msg.queue_id, msg.flow_hash);
+			break;
+		case DSW_CTL_UNPAUS_REQ:
+			dsw_port_handle_unpause_flow(dsw, port,
+						     msg.originating_port_id,
+						     msg.queue_id,
+						     msg.flow_hash);
+			break;
+		case DSW_CTL_CFM:
+			dsw_port_handle_confirm(dsw, port);
+			break;
+		}
+	}
+}
+
 static void
 dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
 {
@@ -270,12 +933,24 @@  dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
 	port->ops_since_bg_task += (num_events+1);
 }
 
-static void
-dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
-
 static void
 dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
 {
+	if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
+		     port->pending_releases == 0))
+		dsw_port_move_migrating_flow(dsw, port);
+
+	/* Polling the control ring is relatively inexpensive, and
+	 * polling it often helps bringing down migration latency, so
+	 * do this for every iteration.
+	 */
+	dsw_port_ctl_process(dsw, port);
+
+	/* To avoid considering migration and flushing output buffers
+	 * on every dequeue/enqueue call, the scheduler only performs
+	 * such 'background' tasks every nth
+	 * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
+	 */
 	if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
 		uint64_t now;
 
@@ -290,6 +965,8 @@  dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
 
 		dsw_port_consider_load_update(port, now);
 
+		dsw_port_consider_migration(dsw, port, now);
+
 		port->ops_since_bg_task = 0;
 	}
 }
@@ -339,7 +1016,6 @@  dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
 	 */
 	if (unlikely(events_len == 0)) {
 		dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
-		dsw_port_flush_out_buffers(dsw, source_port);
 		return 0;
 	}
 
@@ -423,10 +1099,52 @@  dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
 	return dsw_event_dequeue_burst(port, events, 1, wait);
 }
 
+static void
+dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
+			    uint16_t num)
+{
+	uint16_t i;
+
+	for (i = 0; i < num; i++) {
+		uint16_t l_idx = port->seen_events_idx;
+		struct dsw_queue_flow *qf = &port->seen_events[l_idx];
+		struct rte_event *event = &events[i];
+		qf->queue_id = event->queue_id;
+		qf->flow_hash = dsw_flow_id_hash(event->flow_id);
+
+		port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
+	}
+
+	if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
+		port->seen_events_len =
+			RTE_MIN(port->seen_events_len + num,
+				DSW_MAX_EVENTS_RECORDED);
+}
+
 static uint16_t
 dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
 		       uint16_t num)
 {
+	struct dsw_port *source_port = port;
+	struct dsw_evdev *dsw = source_port->dsw;
+
+	dsw_port_ctl_process(dsw, source_port);
+
+	if (unlikely(port->in_buffer_len > 0)) {
+		uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
+
+		rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
+			   dequeued * sizeof(struct rte_event));
+
+		port->in_buffer_start += dequeued;
+		port->in_buffer_len -= dequeued;
+
+		if (port->in_buffer_len == 0)
+			port->in_buffer_start = 0;
+
+		return dequeued;
+	}
+
 	return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
 }
 
@@ -458,6 +1176,15 @@  dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
 				dequeued);
 
 		dsw_port_return_credits(dsw, source_port, dequeued);
+
+		/* One potential optimization one might think of is to
+		 * add a migration state (prior to 'pausing'), and
+		 * only record seen events when the port is in this
+		 * state (and transit to 'pausing' when enough events
+		 * have been gathered). However, that schema doesn't
+		 * seem to improve performance.
+		 */
+		dsw_port_record_seen_events(port, events, dequeued);
 	}
 	/* XXX: Assuming the port can't produce any more work,
 	 *	consider flushing the output buffer, on dequeued ==