@@ -128,7 +128,6 @@ struct dsw_queue_flow {
enum dsw_migration_state {
DSW_MIGRATION_STATE_IDLE,
DSW_MIGRATION_STATE_PAUSING,
- DSW_MIGRATION_STATE_FORWARDING,
DSW_MIGRATION_STATE_UNPAUSING
};
@@ -192,6 +191,13 @@ struct dsw_port {
uint16_t paused_events_len;
struct rte_event paused_events[DSW_MAX_EVENTS];
+ uint16_t emigrating_events_len;
+ /* Buffer for not-yet-processed events pertaining to a flow
+ * emigrating from this port. These events will be forwarded
+ * to the target port.
+ */
+ struct rte_event emigrating_events[DSW_MAX_EVENTS];
+
uint16_t seen_events_len;
uint16_t seen_events_idx;
struct dsw_queue_flow seen_events[DSW_MAX_EVENTS_RECORDED];
@@ -237,6 +237,15 @@ dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
queue_id, flow_hash);
}
+static __rte_always_inline bool
+dsw_port_is_flow_migrating(struct dsw_port *port, uint8_t queue_id,
+ uint16_t flow_hash)
+{
+ return dsw_is_queue_flow_in_ary(port->emigration_target_qfs,
+ port->emigration_targets_len,
+ queue_id, flow_hash);
+}
+
static void
dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs,
uint8_t qfs_len)
@@ -271,9 +280,19 @@ dsw_port_remove_paused_flow(struct dsw_port *port,
port->paused_flows[i] =
port->paused_flows[last_idx];
port->paused_flows_len--;
- break;
+
+ DSW_LOG_DP_PORT(DEBUG, port->id,
+ "Unpausing queue_id %d flow_hash %d.\n",
+ target_qf->queue_id,
+ target_qf->flow_hash);
+
+ return;
}
}
+
+ DSW_LOG_DP_PORT(ERR, port->id,
+ "Failed to unpause queue_id %d flow_hash %d.\n",
+ target_qf->queue_id, target_qf->flow_hash);
}
static void
@@ -284,7 +303,6 @@ dsw_port_remove_paused_flows(struct dsw_port *port,
for (i = 0; i < qfs_len; i++)
dsw_port_remove_paused_flow(port, &qfs[i]);
-
}
static void
@@ -439,14 +457,15 @@ dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id)
static bool
dsw_select_emigration_target(struct dsw_evdev *dsw,
- struct dsw_queue_flow_burst *bursts,
- uint16_t num_bursts, uint8_t source_port_id,
- int16_t *port_loads, uint16_t num_ports,
- uint8_t *target_port_ids,
- struct dsw_queue_flow *target_qfs,
- uint8_t *targets_len)
-{
- int16_t source_port_load = port_loads[source_port_id];
+ struct dsw_port *source_port,
+ struct dsw_queue_flow_burst *bursts,
+ uint16_t num_bursts,
+ int16_t *port_loads, uint16_t num_ports,
+ uint8_t *target_port_ids,
+ struct dsw_queue_flow *target_qfs,
+ uint8_t *targets_len)
+{
+ int16_t source_port_load = port_loads[source_port->id];
struct dsw_queue_flow *candidate_qf = NULL;
uint8_t candidate_port_id = 0;
int16_t candidate_weight = -1;
@@ -471,7 +490,7 @@ dsw_select_emigration_target(struct dsw_evdev *dsw,
for (port_id = 0; port_id < num_ports; port_id++) {
int16_t weight;
- if (port_id == source_port_id)
+ if (port_id == source_port->id)
continue;
if (!dsw_is_serving_port(dsw, port_id, qf->queue_id))
@@ -493,7 +512,7 @@ dsw_select_emigration_target(struct dsw_evdev *dsw,
if (candidate_weight < 0)
return false;
- DSW_LOG_DP_PORT(DEBUG, source_port_id, "Selected queue_id %d "
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "Selected queue_id %d "
"flow_hash %d (with flow load %d) for migration "
"to port %d.\n", candidate_qf->queue_id,
candidate_qf->flow_hash,
@@ -501,7 +520,7 @@ dsw_select_emigration_target(struct dsw_evdev *dsw,
candidate_port_id);
port_loads[candidate_port_id] += candidate_flow_load;
- port_loads[source_port_id] -= candidate_flow_load;
+ port_loads[source_port->id] -= candidate_flow_load;
target_port_ids[*targets_len] = candidate_port_id;
target_qfs[*targets_len] = *candidate_qf;
@@ -527,8 +546,8 @@ dsw_select_emigration_targets(struct dsw_evdev *dsw,
for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) {
bool found;
- found = dsw_select_emigration_target(dsw, bursts, num_bursts,
- source_port->id,
+ found = dsw_select_emigration_target(dsw, source_port,
+ bursts, num_bursts,
port_loads, dsw->num_ports,
target_port_ids,
target_qfs,
@@ -608,6 +627,7 @@ dsw_port_buffer_paused(struct dsw_port *port,
port->paused_events_len++;
}
+
static void
dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
uint8_t dest_port_id, const struct rte_event *event)
@@ -679,40 +699,39 @@ dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
}
static void
-dsw_port_flush_paused_events(struct dsw_evdev *dsw,
- struct dsw_port *source_port,
- const struct dsw_queue_flow *qf)
+dsw_port_flush_no_longer_paused_events(struct dsw_evdev *dsw,
+ struct dsw_port *source_port)
{
uint16_t paused_events_len = source_port->paused_events_len;
struct rte_event paused_events[paused_events_len];
- uint8_t dest_port_id;
uint16_t i;
if (paused_events_len == 0)
return;
- if (dsw_port_is_flow_paused(source_port, qf->queue_id, qf->flow_hash))
- return;
-
rte_memcpy(paused_events, source_port->paused_events,
paused_events_len * sizeof(struct rte_event));
source_port->paused_events_len = 0;
- dest_port_id = dsw_schedule(dsw, qf->queue_id, qf->flow_hash);
-
for (i = 0; i < paused_events_len; i++) {
struct rte_event *event = &paused_events[i];
uint16_t flow_hash;
flow_hash = dsw_flow_id_hash(event->flow_id);
- if (event->queue_id == qf->queue_id &&
- flow_hash == qf->flow_hash)
+ if (dsw_port_is_flow_paused(source_port, event->queue_id,
+ flow_hash))
+ dsw_port_buffer_paused(source_port, event);
+ else {
+ uint8_t dest_port_id;
+
+ dest_port_id = dsw_schedule(dsw, event->queue_id,
+ flow_hash);
+
dsw_port_buffer_non_paused(dsw, source_port,
dest_port_id, event);
- else
- dsw_port_buffer_paused(source_port, event);
+ }
}
}
@@ -755,11 +774,6 @@ dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port,
DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for "
"queue_id %d flow_hash %d.\n", queue_id,
flow_hash);
-
- if (queue_schedule_type == RTE_SCHED_TYPE_ATOMIC) {
- dsw_port_remove_paused_flow(port, qf);
- dsw_port_flush_paused_events(dsw, port, qf);
- }
}
finished = port->emigration_targets_len - left_qfs_len;
@@ -826,10 +840,31 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw,
if (dsw->num_ports == 1)
return;
- if (seen_events_len < DSW_MAX_EVENTS_RECORDED)
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n");
+
+ if (seen_events_len < DSW_MAX_EVENTS_RECORDED) {
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "Not enough events "
+ "are recorded to allow for a migration.\n");
return;
+ }
- DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n");
+ /* A flow migration cannot be initiated if there are paused
+ * events, since some/all of those events may be have been
+ * produced as a result of processing the flow(s) selected for
+ * migration. Moving such a flow would potentially introduced
+ * reordering, since processing the migrated flow on the
+ * receiving flow may commence before the to-be-enqueued-to
+
+ * flows are unpaused, leading to paused events on the second
+ * port as well, destined for the same paused flow(s). When
+ * those flows are unpaused, the resulting events are
+ * delivered the owning port in an undefined order.
+ */
+ if (source_port->paused_events_len > 0) {
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are "
+ "events in the paus buffer.\n");
+ return;
+ }
/* Randomize interval to avoid having all threads considering
* emigration at the same in point in time, which might lead
@@ -927,9 +962,8 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw,
}
static void
-dsw_port_flush_paused_events(struct dsw_evdev *dsw,
- struct dsw_port *source_port,
- const struct dsw_queue_flow *qf);
+dsw_port_flush_no_longer_paused_events(struct dsw_evdev *dsw,
+ struct dsw_port *source_port);
static void
dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
@@ -954,62 +988,123 @@ dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id)
port->immigrations++;
+ }
+
+ dsw_port_flush_no_longer_paused_events(dsw, port);
+}
+
+static void
+dsw_port_buffer_in_buffer(struct dsw_port *port,
+ const struct rte_event *event)
+
+{
+ RTE_ASSERT(port->in_buffer_start == 0);
+
+ port->in_buffer[port->in_buffer_len] = *event;
+ port->in_buffer_len++;
+}
+
+static void
+dsw_port_forward_emigrated_event(struct dsw_evdev *dsw,
+ struct dsw_port *source_port,
+ struct rte_event *event)
+{
+ uint16_t i;
+
+ for (i = 0; i < source_port->emigration_targets_len; i++) {
+ struct dsw_queue_flow *qf =
+ &source_port->emigration_target_qfs[i];
+ uint8_t dest_port_id =
+ source_port->emigration_target_port_ids[i];
+ struct dsw_port *dest_port = &dsw->ports[dest_port_id];
- dsw_port_flush_paused_events(dsw, port, qf);
+ if (event->queue_id == qf->queue_id &&
+ dsw_flow_id_hash(event->flow_id) == qf->flow_hash) {
+ /* No need to care about bursting forwarded
+ * events (to the destination port's in_ring),
+ * since migration doesn't happen very often,
+ * and also the majority of the dequeued
+ * events will likely *not* be forwarded.
+ */
+ while (rte_event_ring_enqueue_burst(dest_port->in_ring,
+ event, 1,
+ NULL) != 1)
+ rte_pause();
+ return;
+ }
}
+
+ /* Event did not belong to the emigrated flows */
+ dsw_port_buffer_in_buffer(source_port, event);
}
-#define FORWARD_BURST_SIZE (32)
+static void
+dsw_port_stash_migrating_event(struct dsw_port *port,
+ const struct rte_event *event)
+{
+ port->emigrating_events[port->emigrating_events_len] = *event;
+ port->emigrating_events_len++;
+}
+
+#define DRAIN_DEQUEUE_BURST_SIZE (32)
static void
-dsw_port_forward_emigrated_flow(struct dsw_port *source_port,
- struct rte_event_ring *dest_ring,
- uint8_t queue_id,
- uint16_t flow_hash)
+dsw_port_drain_in_ring(struct dsw_port *source_port)
{
- uint16_t events_left;
+ uint16_t num_events;
+ uint16_t dequeued;
/* Control ring message should been seen before the ring count
* is read on the port's in_ring.
*/
rte_smp_rmb();
- events_left = rte_event_ring_count(source_port->in_ring);
+ num_events = rte_event_ring_count(source_port->in_ring);
- while (events_left > 0) {
- uint16_t in_burst_size =
- RTE_MIN(FORWARD_BURST_SIZE, events_left);
- struct rte_event in_burst[in_burst_size];
- uint16_t in_len;
+ for (dequeued = 0; dequeued < num_events; ) {
+ uint16_t burst_size = RTE_MIN(DRAIN_DEQUEUE_BURST_SIZE,
+ num_events - dequeued);
+ struct rte_event events[burst_size];
+ uint16_t len;
uint16_t i;
- in_len = rte_event_ring_dequeue_burst(source_port->in_ring,
- in_burst,
- in_burst_size, NULL);
- /* No need to care about bursting forwarded events (to
- * the destination port's in_ring), since migration
- * doesn't happen very often, and also the majority of
- * the dequeued events will likely *not* be forwarded.
- */
- for (i = 0; i < in_len; i++) {
- struct rte_event *e = &in_burst[i];
- if (e->queue_id == queue_id &&
- dsw_flow_id_hash(e->flow_id) == flow_hash) {
- while (rte_event_ring_enqueue_burst(dest_ring,
- e, 1,
- NULL) != 1)
- rte_pause();
- } else {
- uint16_t last_idx = source_port->in_buffer_len;
- source_port->in_buffer[last_idx] = *e;
- source_port->in_buffer_len++;
- }
+ len = rte_event_ring_dequeue_burst(source_port->in_ring,
+ events, burst_size,
+ NULL);
+
+ for (i = 0; i < len; i++) {
+ struct rte_event *event = &events[i];
+ uint16_t flow_hash;
+
+ flow_hash = dsw_flow_id_hash(event->flow_id);
+
+ if (unlikely(dsw_port_is_flow_migrating(source_port,
+ event->queue_id,
+ flow_hash)))
+ dsw_port_stash_migrating_event(source_port,
+ event);
+ else
+ dsw_port_buffer_in_buffer(source_port, event);
}
- events_left -= in_len;
+ dequeued += len;
}
}
+static void
+dsw_port_forward_emigrated_flows(struct dsw_evdev *dsw,
+ struct dsw_port *source_port)
+{
+ uint16_t i;
+
+ for (i = 0; i < source_port->emigrating_events_len; i++) {
+ struct rte_event *event = &source_port->emigrating_events[i];
+
+ dsw_port_forward_emigrated_event(dsw, source_port, event);
+ }
+ source_port->emigrating_events_len = 0;
+}
+
static void
dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
struct dsw_port *source_port)
@@ -1018,22 +1113,27 @@ dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
dsw_port_flush_out_buffers(dsw, source_port);
- rte_smp_wmb();
-
for (i = 0; i < source_port->emigration_targets_len; i++) {
struct dsw_queue_flow *qf =
&source_port->emigration_target_qfs[i];
uint8_t dest_port_id =
source_port->emigration_target_port_ids[i];
- struct dsw_port *dest_port = &dsw->ports[dest_port_id];
dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] =
- dest_port_id;
-
- dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring,
- qf->queue_id, qf->flow_hash);
+ dest_port_id;
}
+ rte_smp_wmb();
+
+ dsw_port_drain_in_ring(source_port);
+ dsw_port_forward_emigrated_flows(dsw, source_port);
+
+ dsw_port_remove_paused_flows(source_port,
+ source_port->emigration_target_qfs,
+ source_port->emigration_targets_len);
+
+ dsw_port_flush_no_longer_paused_events(dsw, source_port);
+
/* Flow table update and migration destination port's enqueues
* must be seen before the control message.
*/
@@ -1054,9 +1154,7 @@ dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
if (port->cfm_cnt == (dsw->num_ports-1)) {
switch (port->migration_state) {
case DSW_MIGRATION_STATE_PAUSING:
- DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding "
- "migration state.\n");
- port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
+ dsw_port_move_emigrating_flows(dsw, port);
break;
case DSW_MIGRATION_STATE_UNPAUSING:
dsw_port_end_emigration(dsw, port,
@@ -1096,18 +1194,18 @@ dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
static void
dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
{
- /* To pull the control ring reasonably often on busy ports,
- * each dequeued/enqueued event is considered an 'op' too.
- */
port->ops_since_bg_task += (num_events+1);
}
static void
dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
{
- if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
- port->pending_releases == 0))
- dsw_port_move_emigrating_flows(dsw, port);
+ /* For simplicity (in the migration logic), avoid all
+ * background processing in case event processing is in
+ * progress.
+ */
+ if (port->pending_releases > 0)
+ return;
/* Polling the control ring is relatively inexpensive, and
* polling it often helps bringing down migration latency, so
@@ -1167,7 +1265,7 @@ dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
uint16_t i;
DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
- "events to port %d.\n", events_len, source_port->id);
+ "events.\n", events_len);
dsw_port_bg_process(dsw, source_port);
@@ -1351,6 +1449,38 @@ dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
}
+static void
+dsw_port_stash_migrating_events(struct dsw_port *port,
+ struct rte_event *events, uint16_t *num)
+{
+ uint16_t i;
+
+ /* The assumption here - performance-wise - is that events
+ * belonging to migrating flows are relatively rare.
+ */
+ for (i = 0; i < (*num); ) {
+ struct rte_event *event = &events[i];
+ uint16_t flow_hash;
+
+ flow_hash = dsw_flow_id_hash(event->flow_id);
+
+ if (unlikely(dsw_port_is_flow_migrating(port, event->queue_id,
+ flow_hash))) {
+ uint16_t left;
+
+ dsw_port_stash_migrating_event(port, event);
+
+ (*num)--;
+ left = *num - i;
+
+ if (left > 0)
+ memmove(event, event + 1,
+ left * sizeof(struct rte_event));
+ } else
+ i++;
+ }
+}
+
uint16_t
dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
uint64_t wait __rte_unused)
@@ -1368,6 +1498,11 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
dequeued = dsw_port_dequeue_burst(source_port, events, num);
+ if (unlikely(source_port->migration_state ==
+ DSW_MIGRATION_STATE_PAUSING))
+ dsw_port_stash_migrating_events(source_port, events,
+ &dequeued);
+
source_port->pending_releases = dequeued;
dsw_port_load_record(source_port, dequeued);