[dpdk-dev,v2,1/2] LACP control packet filtering offload

Message ID 20170629162022.9984-2-tomaszx.kulasek@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Ferruh Yigit
Headers

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/Intel-compilation fail Compilation issues

Commit Message

Tomasz Kulasek June 29, 2017, 4:20 p.m. UTC
  New API functions implemented:

   rte_eth_bond_8023ad_slow_pkt_hw_filter_enable(uint8_t port_id);
   rte_eth_bond_8023ad_slow_pkt_hw_filter_disable(uint8_t port_id);

rte_eth_bond_8023ad_slow_pkt_hw_filter_enable should be called before bonding
port start to enable new path.

When this option is enabled all slaves must support flow director's
filtering by ethernet type and support one additional queue on slaves
tx/rx.

Signed-off-by: Tomasz Kulasek <tomaszx.kulasek@intel.com>
---
v2 changes:
 - changed name of rte_eth_bond_8023ad_slow_queue_enable/disable to
   rte_eth_bond_8023ad_slow_pkt_hw_filter_enable/disable,
 - propagated number of tx/rx queues available for bonding based on the
   attached slaves and slow packet filtering requirements,
 - improved validation of slaves,
 - introduced one structure to organize all slow queue settings,
 - use of RTE_BE16() instead of locally defined macro,
 - some comments improvements
---
 drivers/net/bonding/rte_eth_bond_8023ad.c         | 160 ++++++--
 drivers/net/bonding/rte_eth_bond_8023ad.h         |  35 ++
 drivers/net/bonding/rte_eth_bond_8023ad_private.h |  24 ++
 drivers/net/bonding/rte_eth_bond_pmd.c            | 424 +++++++++++++++++++++-
 drivers/net/bonding/rte_eth_bond_version.map      |   9 +
 5 files changed, 610 insertions(+), 42 deletions(-)
  

Patch

diff --git a/drivers/net/bonding/rte_eth_bond_8023ad.c b/drivers/net/bonding/rte_eth_bond_8023ad.c
index d2b7592..6b37a61 100644
--- a/drivers/net/bonding/rte_eth_bond_8023ad.c
+++ b/drivers/net/bonding/rte_eth_bond_8023ad.c
@@ -632,15 +632,25 @@ 
 	lacpdu->tlv_type_terminator = TLV_TYPE_TERMINATOR_INFORMATION;
 	lacpdu->terminator_length = 0;
 
-	if (rte_ring_enqueue(port->tx_ring, lacp_pkt) == -ENOBUFS) {
-		/* If TX ring full, drop packet and free message. Retransmission
-		 * will happen in next function call. */
-		rte_pktmbuf_free(lacp_pkt);
-		set_warning_flags(port, WRN_TX_QUEUE_FULL);
-		return;
+	if (!internals->mode4.slow_pkts.hw_filtering_en) {
+		if (rte_ring_enqueue(port->tx_ring, lacp_pkt) == -ENOBUFS) {
+			/* If TX ring full, drop packet and free message.
+			   Retransmission will happen in next function call. */
+			rte_pktmbuf_free(lacp_pkt);
+			set_warning_flags(port, WRN_TX_QUEUE_FULL);
+			return;
+		}
+	} else {
+		if (rte_eth_tx_burst(slave_id,
+				internals->mode4.slow_pkts.tx_queue_id,
+				&lacp_pkt, 1) == 0) {
+			rte_pktmbuf_free(lacp_pkt);
+			set_warning_flags(port, WRN_TX_QUEUE_FULL);
+			return;
+		}
 	}
 
-	MODE4_DEBUG("sending LACP frame\n");
+	MODE4_DEBUG("Sending LACP frame\n");
 	BOND_PRINT_LACP(lacpdu);
 
 	timer_set(&port->tx_machine_timer, internals->mode4.tx_period_timeout);
@@ -741,6 +751,22 @@ 
 }
 
 static void
+rx_machine_update(struct bond_dev_private *internals, uint8_t slave_id,
+		struct rte_mbuf *lacp_pkt) {
+	struct lacpdu_header *lacp;
+
+	if (lacp_pkt != NULL) {
+		lacp = rte_pktmbuf_mtod(lacp_pkt, struct lacpdu_header *);
+		RTE_ASSERT(lacp->lacpdu.subtype == SLOW_SUBTYPE_LACP);
+
+		/* This is LACP frame so pass it to rx_machine */
+		rx_machine(internals, slave_id, &lacp->lacpdu);
+		rte_pktmbuf_free(lacp_pkt);
+	} else
+		rx_machine(internals, slave_id, NULL);
+}
+
+static void
 bond_mode_8023ad_periodic_cb(void *arg)
 {
 	struct rte_eth_dev *bond_dev = arg;
@@ -809,20 +835,24 @@ 
 
 		SM_FLAG_SET(port, LACP_ENABLED);
 
-		/* Find LACP packet to this port. Do not check subtype, it is done in
-		 * function that queued packet */
-		if (rte_ring_dequeue(port->rx_ring, &pkt) == 0) {
-			struct rte_mbuf *lacp_pkt = pkt;
-			struct lacpdu_header *lacp;
+		struct rte_mbuf *lacp_pkt = NULL;
 
-			lacp = rte_pktmbuf_mtod(lacp_pkt, struct lacpdu_header *);
-			RTE_ASSERT(lacp->lacpdu.subtype == SLOW_SUBTYPE_LACP);
+		if (!internals->mode4.slow_pkts.hw_filtering_en) {
+			/* Find LACP packet to this port. Do not check subtype,
+			 * it is done in function that queued packet
+			 */
+			if (rte_ring_dequeue(port->rx_ring, &pkt) == 0)
+				lacp_pkt = pkt;
 
-			/* This is LACP frame so pass it to rx_machine */
-			rx_machine(internals, slave_id, &lacp->lacpdu);
-			rte_pktmbuf_free(lacp_pkt);
-		} else
-			rx_machine(internals, slave_id, NULL);
+			rx_machine_update(internals, slave_id, lacp_pkt);
+		} else {
+			if (rte_eth_rx_burst(slave_id,
+					internals->mode4.slow_rx_queue,
+					&lacp_pkt, 1) == 1)
+				bond_mode_8023ad_handle_slow_pkt(internals, slave_id, lacp_pkt);
+			else
+				rx_machine_update(internals, slave_id, NULL);
+		}
 
 		periodic_machine(internals, slave_id);
 		mux_machine(internals, slave_id);
@@ -1064,6 +1094,10 @@ 
 	mode4->tx_period_timeout = conf->tx_period_ms * ms_ticks;
 	mode4->rx_marker_timeout = conf->rx_marker_period_ms * ms_ticks;
 	mode4->update_timeout_us = conf->update_timeout_ms * 1000;
+
+	mode4->slow_pkts.hw_filtering_en = 0;
+	mode4->slow_pkts.rx_queue_id = UINT16_MAX;
+	mode4->slow_pkts.tx_queue_id = UINT16_MAX;
 }
 
 static void
@@ -1188,18 +1222,34 @@ 
 		m_hdr->marker.tlv_type_marker = MARKER_TLV_TYPE_RESP;
 		rte_eth_macaddr_get(slave_id, &m_hdr->eth_hdr.s_addr);
 
-		if (unlikely(rte_ring_enqueue(port->tx_ring, pkt) == -ENOBUFS)) {
-			/* reset timer */
-			port->rx_marker_timer = 0;
-			wrn = WRN_TX_QUEUE_FULL;
-			goto free_out;
+		if (internals->mode4.slow_pkts.hw_filtering_en == 0) {
+			if (unlikely(rte_ring_enqueue(port->tx_ring, pkt) ==
+					-ENOBUFS)) {
+				/* reset timer */
+				port->rx_marker_timer = 0;
+				wrn = WRN_TX_QUEUE_FULL;
+				goto free_out;
+			}
+		} else {
+			/* Send packet directly to the slow queue */
+			if (unlikely(rte_eth_tx_burst(slave_id,
+					internals->mode4.slow_pkts.tx_queue_id,
+					&pkt, 1) == 0)) {
+				/* reset timer */
+				port->rx_marker_timer = 0;
+				wrn = WRN_TX_QUEUE_FULL;
+				goto free_out;
+			}
 		}
 	} else if (likely(subtype == SLOW_SUBTYPE_LACP)) {
-		if (unlikely(rte_ring_enqueue(port->rx_ring, pkt) == -ENOBUFS)) {
-			/* If RX fing full free lacpdu message and drop packet */
-			wrn = WRN_RX_QUEUE_FULL;
-			goto free_out;
-		}
+		if (!internals->mode4.slow_pkts.hw_filtering_en) {
+			if (unlikely(rte_ring_enqueue(port->rx_ring, pkt) == -ENOBUFS)) {
+				/* If RX fing full free lacpdu message and drop packet */
+				wrn = WRN_RX_QUEUE_FULL;
+				goto free_out;
+			}
+		} else
+			rx_machine_update(internals, slave_id, pkt);
 	} else {
 		wrn = WRN_UNKNOWN_SLOW_TYPE;
 		goto free_out;
@@ -1504,3 +1554,55 @@ 
 	rte_eal_alarm_set(internals->mode4.update_timeout_us,
 			bond_mode_8023ad_ext_periodic_cb, arg);
 }
+
+#define MBUF_CACHE_SIZE 250
+#define NUM_MBUFS 8191
+
+int
+rte_eth_bond_8023ad_slow_pkt_hw_filter_enable(uint8_t port)
+{
+	int retval = 0;
+	struct rte_eth_dev *dev = &rte_eth_devices[port];
+	struct bond_dev_private *internals = (struct bond_dev_private *)
+		dev->data->dev_private;
+
+	if (check_for_bonded_ethdev(dev) != 0)
+		return -1;
+
+	if (bond_8023ad_slow_pkt_hw_filter_supported(port) != 0)
+		return -1;
+
+	/* Device must be stopped to set up slow queue */
+	if (dev->data->dev_started)
+		return -1;
+
+	internals->mode4.slow_pkts.hw_filtering_en = 1;
+
+	bond_ethdev_mode_set(dev, internals->mode);
+	return retval;
+}
+
+int
+rte_eth_bond_8023ad_slow_pkt_hw_filter_disable(uint8_t port)
+{
+	int retval = 0;
+	struct rte_eth_dev *dev = &rte_eth_devices[port];
+	struct bond_dev_private *internals = (struct bond_dev_private *)
+		dev->data->dev_private;
+
+	if (check_for_bonded_ethdev(dev) != 0)
+		return -1;
+
+	/* Device must be stopped to set up slow queue */
+	if (dev->data->dev_started)
+		return -1;
+
+	internals->mode4.slow_pkts.hw_filtering_en = 0;
+
+	bond_ethdev_mode_set(dev, internals->mode);
+
+	internals->mode4.slow_pkts.rx_queue_id = UINT16_MAX;
+	internals->mode4.slow_pkts.tx_queue_id = UINT16_MAX;
+
+	return retval;
+}
diff --git a/drivers/net/bonding/rte_eth_bond_8023ad.h b/drivers/net/bonding/rte_eth_bond_8023ad.h
index 6b8ff57..d527970 100644
--- a/drivers/net/bonding/rte_eth_bond_8023ad.h
+++ b/drivers/net/bonding/rte_eth_bond_8023ad.h
@@ -302,4 +302,39 @@  struct rte_eth_bond_8023ad_slave_info {
 rte_eth_bond_8023ad_ext_slowtx(uint8_t port_id, uint8_t slave_id,
 		struct rte_mbuf *lacp_pkt);
 
+/**
+ * Enable slow queue on slaves
+ *
+ * This function creates additional queues on slaves to use flow director to
+ * redirect all slow packets to process it in LACP daemon.
+ * To use this feature all slaves must support at least one queue more than
+ * bonded device for receiving and transmit packets.
+ *
+ * Bonding port must be stopped to change this configuration.
+ *
+ * @param port_id      Bonding device id
+ *
+ * @return
+ *   0 on success, negative value otherwise.
+ */
+int
+rte_eth_bond_8023ad_slow_pkt_hw_filter_enable(uint8_t port_id);
+
+/**
+ * Disable slow queue on slaves
+ *
+ * This function disables hardware slow packet filter.
+ *
+ * Bonding port must be stopped to change this configuration.
+ *
+ * @see rte_eth_bond_8023ad_slow_pkt_hw_filter_enable
+ *
+ * @param port_id      Bonding device id
+ * @return
+ *   0 on success, negative value otherwise.
+ *
+ */
+int
+rte_eth_bond_8023ad_slow_pkt_hw_filter_disable(uint8_t port_id);
+
 #endif /* RTE_ETH_BOND_8023AD_H_ */
diff --git a/drivers/net/bonding/rte_eth_bond_8023ad_private.h b/drivers/net/bonding/rte_eth_bond_8023ad_private.h
index ca8858b..40a4320 100644
--- a/drivers/net/bonding/rte_eth_bond_8023ad_private.h
+++ b/drivers/net/bonding/rte_eth_bond_8023ad_private.h
@@ -39,6 +39,7 @@ 
 #include <rte_ether.h>
 #include <rte_byteorder.h>
 #include <rte_atomic.h>
+#include <rte_flow.h>
 
 #include "rte_eth_bond_8023ad.h"
 
@@ -162,6 +163,9 @@  struct port {
 
 	uint64_t warning_timer;
 	volatile uint16_t warnings_to_show;
+
+	/** Memory pool used to allocate slow queues */
+	struct rte_mempool *slow_pool;
 };
 
 struct mode8023ad_private {
@@ -175,6 +179,16 @@  struct mode8023ad_private {
 	uint64_t update_timeout_us;
 	rte_eth_bond_8023ad_ext_slowrx_fn slowrx_cb;
 	uint8_t external_sm;
+
+
+	struct {
+		uint8_t hw_filtering_en;
+
+		struct rte_flow *flow[RTE_MAX_ETHPORTS];
+
+		uint16_t rx_queue_id;
+		uint16_t tx_queue_id;
+	} slow_pkts;
 };
 
 /**
@@ -295,4 +309,14 @@  struct mode8023ad_private {
 void
 bond_mode_8023ad_mac_address_update(struct rte_eth_dev *bond_dev);
 
+int
+bond_ethdev_8023ad_flow_verify(struct rte_eth_dev *bond_dev,
+		uint8_t slave_port);
+
+int
+bond_ethdev_8023ad_flow_set(struct rte_eth_dev *bond_dev, uint8_t slave_port);
+
+int
+bond_8023ad_slow_pkt_hw_filter_supported(uint8_t port_id);
+
 #endif /* RTE_ETH_BOND_8023AD_H_ */
diff --git a/drivers/net/bonding/rte_eth_bond_pmd.c b/drivers/net/bonding/rte_eth_bond_pmd.c
index 37f3d43..46b7d80 100644
--- a/drivers/net/bonding/rte_eth_bond_pmd.c
+++ b/drivers/net/bonding/rte_eth_bond_pmd.c
@@ -133,6 +133,250 @@ 
 		(subtype == SLOW_SUBTYPE_MARKER || subtype == SLOW_SUBTYPE_LACP));
 }
 
+/*****************************************************************************
+ * Flow director's setup for mode 4 optimization
+ */
+
+static struct rte_flow_item_eth flow_item_eth_type_8023ad = {
+	.dst.addr_bytes = { 0 },
+	.src.addr_bytes = { 0 },
+	.type = RTE_BE16(ETHER_TYPE_SLOW),
+};
+
+static struct rte_flow_item_eth flow_item_eth_mask_type_8023ad = {
+	.dst.addr_bytes = { 0 },
+	.src.addr_bytes = { 0 },
+	.type = 0xFFFF,
+};
+
+static struct rte_flow_item flow_item_8023ad[] = {
+	{
+		.type = RTE_FLOW_ITEM_TYPE_ETH,
+		.spec = &flow_item_eth_type_8023ad,
+		.last = NULL,
+		.mask = &flow_item_eth_mask_type_8023ad,
+	},
+	{
+		.type = RTE_FLOW_ITEM_TYPE_END,
+		.spec = NULL,
+		.last = NULL,
+		.mask = NULL,
+	}
+};
+
+const struct rte_flow_attr flow_attr_8023ad = {
+	.group = 0,
+	.priority = 0,
+	.ingress = 1,
+	.egress = 0,
+	.reserved = 0,
+};
+
+int
+bond_ethdev_8023ad_flow_verify(struct rte_eth_dev *bond_dev,
+		uint8_t slave_port) {
+	struct rte_flow_error error;
+	struct bond_dev_private *internals = (struct bond_dev_private *)
+			(bond_dev->data->dev_private);
+
+	struct rte_flow_action_queue lacp_queue_conf = {
+		.index = internals->mode4.slow_pkts.rx_queue_id,
+	};
+
+	const struct rte_flow_action actions[] = {
+		{
+			.type = RTE_FLOW_ACTION_TYPE_QUEUE,
+			.conf = &lacp_queue_conf
+		},
+		{
+			.type = RTE_FLOW_ACTION_TYPE_END,
+		}
+	};
+
+	int ret = rte_flow_validate(slave_port, &flow_attr_8023ad,
+			flow_item_8023ad, actions, &error);
+	if (ret < 0)
+		return -1;
+
+	return 0;
+}
+
+int
+bond_8023ad_slow_pkt_hw_filter_supported(uint8_t port_id) {
+	struct rte_eth_dev *bond_dev = &rte_eth_devices[port_id];
+	struct bond_dev_private *internals = (struct bond_dev_private *)
+			(bond_dev->data->dev_private);
+	struct rte_eth_dev_info bond_info, slave_info;
+	uint8_t idx;
+
+	/* Verify if all slaves in bonding supports flow director and */
+	if (internals->slave_count > 0) {
+		rte_eth_dev_info_get(bond_dev->data->port_id, &bond_info);
+		internals->mode4.slow_pkts.rx_queue_id = bond_info.nb_rx_queues;
+		internals->mode4.slow_pkts.tx_queue_id = bond_info.nb_tx_queues;
+		for (idx = 0; idx < internals->slave_count; idx++) {
+			rte_eth_dev_info_get(internals->slaves[idx].port_id,
+					&slave_info);
+			if ((slave_info.max_rx_queues < bond_info.nb_rx_queues)
+					|| (slave_info.max_rx_queues <
+						bond_info.nb_rx_queues))
+				return -1;
+
+			if (bond_ethdev_8023ad_flow_verify(bond_dev,
+					internals->slaves[idx].port_id) != 0)
+				return -1;
+		}
+	}
+
+	return 0;
+}
+
+int
+bond_ethdev_8023ad_flow_set(struct rte_eth_dev *bond_dev, uint8_t slave_port) {
+
+	struct rte_flow_error error;
+	struct bond_dev_private *internals = (struct bond_dev_private *)
+			(bond_dev->data->dev_private);
+
+	struct rte_flow_action_queue lacp_queue_conf = {
+		.index = internals->mode4.slow_pkts.rx_queue_id,
+	};
+
+	const struct rte_flow_action actions[] = {
+		{
+			.type = RTE_FLOW_ACTION_TYPE_QUEUE,
+			.conf = &lacp_queue_conf
+		},
+		{
+			.type = RTE_FLOW_ACTION_TYPE_END,
+		}
+	};
+
+	internals->mode4.slow_pkts.flow[slave_port] = rte_flow_create(slave_port,
+			&flow_attr_8023ad, flow_item_8023ad, actions, &error);
+	if (internals->mode4.slow_pkts.flow[slave_port] == NULL) {
+		RTE_BOND_LOG(ERR, "bond_ethdev_8023ad_flow_set: %s "
+				"(slave_port=%d queue_id=%d)",
+				error.message, slave_port,
+				internals->mode4.slow_pkts.rx_queue_id);
+		return -1;
+	}
+
+	return 0;
+}
+
+static uint16_t
+bond_ethdev_rx_burst_8023ad_fast_queue(void *queue, struct rte_mbuf **bufs,
+		uint16_t nb_pkts)
+{
+	struct bond_rx_queue *bd_rx_q = (struct bond_rx_queue *)queue;
+	struct bond_dev_private *internals = bd_rx_q->dev_private;
+	uint16_t num_rx_total = 0;	/* Total number of received packets */
+	uint8_t slaves[RTE_MAX_ETHPORTS];
+	uint8_t slave_count;
+
+	uint8_t i;
+
+	/* Copy slave list to protect against slave up/down changes during tx
+	 * bursting */
+	slave_count = internals->active_slave_count;
+	memcpy(slaves, internals->active_slaves,
+			sizeof(internals->active_slaves[0]) * slave_count);
+
+	for (i = 0; i < slave_count && num_rx_total < nb_pkts; i++) {
+		/* Read packets from this slave */
+		num_rx_total += rte_eth_rx_burst(slaves[i], bd_rx_q->queue_id,
+				&bufs[num_rx_total], nb_pkts - num_rx_total);
+	}
+
+	return num_rx_total;
+}
+
+static uint16_t
+bond_ethdev_tx_burst_8023ad_fast_queue(void *queue, struct rte_mbuf **bufs,
+		uint16_t nb_pkts)
+{
+	struct bond_dev_private *internals;
+	struct bond_tx_queue *bd_tx_q;
+
+	uint8_t num_of_slaves;
+	uint8_t slaves[RTE_MAX_ETHPORTS];
+	 /* positions in slaves, not ID */
+	uint8_t distributing_offsets[RTE_MAX_ETHPORTS];
+	uint8_t distributing_count;
+
+	uint16_t num_tx_slave, num_tx_total = 0, num_tx_fail_total = 0;
+	uint16_t i, op_slave_idx;
+
+	struct rte_mbuf *slave_bufs[RTE_MAX_ETHPORTS][nb_pkts];
+
+	/* Total amount of packets in slave_bufs */
+	uint16_t slave_nb_pkts[RTE_MAX_ETHPORTS] = { 0 };
+	/* Slow packets placed in each slave */
+
+	if (unlikely(nb_pkts == 0))
+		return 0;
+
+	bd_tx_q = (struct bond_tx_queue *)queue;
+	internals = bd_tx_q->dev_private;
+
+	/* Copy slave list to protect against slave up/down changes during tx
+	 * bursting */
+	num_of_slaves = internals->active_slave_count;
+	if (num_of_slaves < 1)
+		return num_tx_total;
+
+	memcpy(slaves, internals->active_slaves, sizeof(slaves[0]) *
+			num_of_slaves);
+
+	distributing_count = 0;
+	for (i = 0; i < num_of_slaves; i++) {
+		struct port *port = &mode_8023ad_ports[slaves[i]];
+		if (ACTOR_STATE(port, DISTRIBUTING))
+			distributing_offsets[distributing_count++] = i;
+	}
+
+	if (likely(distributing_count > 0)) {
+		/* Populate slaves mbuf with the packets which are to be sent */
+		for (i = 0; i < nb_pkts; i++) {
+			/* Select output slave using hash based on xmit policy */
+			op_slave_idx = internals->xmit_hash(bufs[i],
+					distributing_count);
+
+			/* Populate slave mbuf arrays with mbufs for that slave.
+			 * Use only slaves that are currently distributing.
+			 */
+			uint8_t slave_offset =
+					distributing_offsets[op_slave_idx];
+			slave_bufs[slave_offset][slave_nb_pkts[slave_offset]] =
+					bufs[i];
+			slave_nb_pkts[slave_offset]++;
+		}
+	}
+
+	/* Send packet burst on each slave device */
+	for (i = 0; i < num_of_slaves; i++) {
+		if (slave_nb_pkts[i] == 0)
+			continue;
+
+		num_tx_slave = rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
+				slave_bufs[i], slave_nb_pkts[i]);
+
+		num_tx_total += num_tx_slave;
+		num_tx_fail_total += slave_nb_pkts[i] - num_tx_slave;
+
+		/* If tx burst fails move packets to end of bufs */
+		if (unlikely(num_tx_slave < slave_nb_pkts[i])) {
+			uint16_t j = nb_pkts - num_tx_fail_total;
+			for ( ; num_tx_slave < slave_nb_pkts[i]; j++,
+					num_tx_slave++)
+				bufs[j] = slave_bufs[i][num_tx_slave];
+		}
+	}
+
+	return num_tx_total;
+}
+
 static uint16_t
 bond_ethdev_rx_burst_8023ad(void *queue, struct rte_mbuf **bufs,
 		uint16_t nb_pkts)
@@ -180,6 +424,13 @@ 
 
 		/* Handle slow protocol packets. */
 		while (j < num_rx_total) {
+
+			/* if packet is not pure L2 and is known, skip it */
+			if ((bufs[j]->packet_type & ~RTE_PTYPE_L2_ETHER) != 0) {
+				j++;
+				continue;
+			}
+
 			if (j + 3 < num_rx_total)
 				rte_prefetch0(rte_pktmbuf_mtod(bufs[j + 3], void *));
 
@@ -187,7 +438,7 @@ 
 			subtype = ((struct slow_protocol_frame *)hdr)->slow_protocol.subtype;
 
 			/* Remove packet from array if it is slow packet or slave is not
-			 * in collecting state or bondign interface is not in promiscus
+			 * in collecting state or bonding interface is not in promiscuous
 			 * mode and packet address does not match. */
 			if (unlikely(is_lacp_packets(hdr->ether_type, subtype, bufs[j]->vlan_tci) ||
 				!collecting || (!promisc &&
@@ -204,7 +455,8 @@ 
 				num_rx_total--;
 				if (j < num_rx_total) {
 					memmove(&bufs[j], &bufs[j + 1], sizeof(bufs[0]) *
-						(num_rx_total - j));
+							(num_rx_total - j));
+
 				}
 			} else
 				j++;
@@ -1295,11 +1547,19 @@  struct bwg_slave {
 		if (bond_mode_8023ad_enable(eth_dev) != 0)
 			return -1;
 
-		eth_dev->rx_pkt_burst = bond_ethdev_rx_burst_8023ad;
-		eth_dev->tx_pkt_burst = bond_ethdev_tx_burst_8023ad;
-		RTE_LOG(WARNING, PMD,
-				"Using mode 4, it is necessary to do TX burst and RX burst "
-				"at least every 100ms.\n");
+		if (!internals->mode4.slow_pkts.hw_filtering_en) {
+			eth_dev->rx_pkt_burst = bond_ethdev_rx_burst_8023ad;
+			eth_dev->tx_pkt_burst = bond_ethdev_tx_burst_8023ad;
+			RTE_LOG(WARNING, PMD,
+				"Using mode 4, it is necessary to do TX burst "
+				"and RX burst at least every 100ms.\n");
+		} else {
+			/* Use flow director's optimization */
+			eth_dev->rx_pkt_burst =
+					bond_ethdev_rx_burst_8023ad_fast_queue;
+			eth_dev->tx_pkt_burst =
+					bond_ethdev_tx_burst_8023ad_fast_queue;
+		}
 		break;
 	case BONDING_MODE_TLB:
 		eth_dev->tx_pkt_burst = bond_ethdev_tx_burst_tlb;
@@ -1321,15 +1581,80 @@  struct bwg_slave {
 	return 0;
 }
 
+static int
+slave_configure_slow_queue(struct rte_eth_dev *bonded_eth_dev,
+		struct rte_eth_dev *slave_eth_dev)
+{
+	int errval = 0;
+	struct bond_dev_private *internals = (struct bond_dev_private *)
+		bonded_eth_dev->data->dev_private;
+	struct port *port = &mode_8023ad_ports[slave_eth_dev->data->port_id];
+
+	if (port->slow_pool == NULL) {
+		char mem_name[256];
+		int slave_id = slave_eth_dev->data->port_id;
+
+		snprintf(mem_name, RTE_DIM(mem_name), "slave_port%u_slow_pool",
+				slave_id);
+		port->slow_pool = rte_pktmbuf_pool_create(mem_name, 8191,
+			250, 0, RTE_MBUF_DEFAULT_BUF_SIZE,
+			slave_eth_dev->data->numa_node);
+
+		/* Any memory allocation failure in initialization is critical because
+		 * resources can't be free, so reinitialization is impossible. */
+		if (port->slow_pool == NULL) {
+			rte_panic("Slave %u: Failed to create memory pool '%s': %s\n",
+				slave_id, mem_name, rte_strerror(rte_errno));
+		}
+	}
+
+	if (internals->mode4.slow_pkts.hw_filtering_en) {
+		/* Configure slow Rx queue */
+
+		errval = rte_eth_rx_queue_setup(slave_eth_dev->data->port_id,
+				internals->mode4.slow_pkts.rx_queue_id, 128,
+				rte_eth_dev_socket_id(slave_eth_dev->data->port_id),
+				NULL, port->slow_pool);
+		if (errval != 0) {
+			RTE_BOND_LOG(ERR,
+					"rte_eth_rx_queue_setup: port=%d queue_id %d, err (%d)",
+					slave_eth_dev->data->port_id,
+					internals->mode4.slow_pkts.rx_queue_id,
+					errval);
+			return errval;
+		}
+
+		errval = rte_eth_tx_queue_setup(slave_eth_dev->data->port_id,
+				internals->mode4.slow_pkts.tx_queue_id, 512,
+				rte_eth_dev_socket_id(slave_eth_dev->data->port_id),
+				NULL);
+		if (errval != 0) {
+			RTE_BOND_LOG(ERR,
+				"rte_eth_tx_queue_setup: port=%d queue_id %d, err (%d)",
+				slave_eth_dev->data->port_id,
+				internals->mode4.slow_pkts.tx_queue_id,
+				errval);
+			return errval;
+		}
+	}
+	return 0;
+}
+
 int
 slave_configure(struct rte_eth_dev *bonded_eth_dev,
 		struct rte_eth_dev *slave_eth_dev)
 {
 	struct bond_rx_queue *bd_rx_q;
 	struct bond_tx_queue *bd_tx_q;
+	uint16_t nb_rx_queues;
+	uint16_t nb_tx_queues;
 
 	int errval;
 	uint16_t q_id;
+	struct rte_flow_error flow_error;
+
+	struct bond_dev_private *internals = (struct bond_dev_private *)
+		bonded_eth_dev->data->dev_private;
 
 	/* Stop slave */
 	rte_eth_dev_stop(slave_eth_dev->data->port_id);
@@ -1359,10 +1684,19 @@  struct bwg_slave {
 	slave_eth_dev->data->dev_conf.rxmode.hw_vlan_filter =
 			bonded_eth_dev->data->dev_conf.rxmode.hw_vlan_filter;
 
+	nb_rx_queues = bonded_eth_dev->data->nb_rx_queues;
+	nb_tx_queues = bonded_eth_dev->data->nb_tx_queues;
+
+	if (internals->mode == BONDING_MODE_8023AD) {
+		if (internals->mode4.slow_pkts.hw_filtering_en) {
+			nb_rx_queues++;
+			nb_tx_queues++;
+		}
+	}
+
 	/* Configure device */
 	errval = rte_eth_dev_configure(slave_eth_dev->data->port_id,
-			bonded_eth_dev->data->nb_rx_queues,
-			bonded_eth_dev->data->nb_tx_queues,
+			nb_rx_queues, nb_tx_queues,
 			&(slave_eth_dev->data->dev_conf));
 	if (errval != 0) {
 		RTE_BOND_LOG(ERR, "Cannot configure slave device: port %u , err (%d)",
@@ -1396,10 +1730,33 @@  struct bwg_slave {
 				&bd_tx_q->tx_conf);
 		if (errval != 0) {
 			RTE_BOND_LOG(ERR,
-					"rte_eth_tx_queue_setup: port=%d queue_id %d, err (%d)",
-					slave_eth_dev->data->port_id, q_id, errval);
+				"rte_eth_tx_queue_setup: port=%d queue_id %d, err (%d)",
+				slave_eth_dev->data->port_id, q_id, errval);
+			return errval;
+		}
+	}
+
+	if (internals->mode == BONDING_MODE_8023AD &&
+			internals->mode4.slow_pkts.hw_filtering_en) {
+		if (slave_configure_slow_queue(bonded_eth_dev, slave_eth_dev)
+				!= 0)
 			return errval;
+
+		if (bond_ethdev_8023ad_flow_verify(bonded_eth_dev,
+				slave_eth_dev->data->port_id) != 0) {
+			RTE_BOND_LOG(ERR,
+				"rte_eth_tx_queue_setup: port=%d queue_id %d, err (%d)",
+				slave_eth_dev->data->port_id, q_id, errval);
+			return -1;
 		}
+
+		if (internals->mode4.slow_pkts.flow[slave_eth_dev->data->port_id] != NULL)
+			rte_flow_destroy(slave_eth_dev->data->port_id,
+					internals->mode4.slow_pkts.flow[slave_eth_dev->data->port_id],
+					&flow_error);
+
+		bond_ethdev_8023ad_flow_set(bonded_eth_dev,
+				slave_eth_dev->data->port_id);
 	}
 
 	/* Start device */
@@ -1559,6 +1916,15 @@  struct bwg_slave {
 	if (internals->promiscuous_en)
 		bond_ethdev_promiscuous_enable(eth_dev);
 
+	if (internals->mode == BONDING_MODE_8023AD) {
+		if (internals->mode4.slow_pkts.hw_filtering_en) {
+			internals->mode4.slow_pkts.rx_queue_id =
+					eth_dev->data->nb_rx_queues;
+			internals->mode4.slow_pkts.tx_queue_id =
+					eth_dev->data->nb_tx_queues;
+		}
+	}
+
 	/* Reconfigure each slave device if starting bonded device */
 	for (i = 0; i < internals->slave_count; i++) {
 		if (slave_configure(eth_dev,
@@ -1688,8 +2054,10 @@  struct bwg_slave {
 
 static void
 bond_ethdev_info(struct rte_eth_dev *dev, struct rte_eth_dev_info *dev_info)
+
 {
 	struct bond_dev_private *internals = dev->data->dev_private;
+	uint16_t max_nb_rx_queues = 0, max_nb_tx_queues = 0;
 
 	dev_info->max_mac_addrs = 1;
 
@@ -1697,8 +2065,38 @@  struct bwg_slave {
 				  ? internals->candidate_max_rx_pktlen
 				  : ETHER_MAX_JUMBO_FRAME_LEN;
 
-	dev_info->max_rx_queues = (uint16_t)128;
-	dev_info->max_tx_queues = (uint16_t)512;
+	if (internals->slave_count > 0) {
+		/* Max number of tx/rx queues that the bonded device can
+		 * support is the minimum values of the bonded slaves */
+		struct rte_eth_dev_info slave_info;
+		uint8_t idx;
+
+		max_nb_rx_queues = UINT16_MAX;
+		max_nb_tx_queues = UINT16_MAX;
+		for (idx = 0; idx < internals->slave_count; idx++) {
+			rte_eth_dev_info_get(internals->slaves[idx].port_id,
+					&slave_info);
+
+			if (max_nb_rx_queues == 0 ||
+				slave_info.max_rx_queues < max_nb_rx_queues)
+				max_nb_rx_queues = slave_info.max_rx_queues;
+
+			if (max_nb_tx_queues == 0 ||
+				slave_info.max_rx_queues < max_nb_tx_queues)
+				max_nb_tx_queues = slave_info.max_tx_queues;
+		}
+		dev_info->max_rx_queues = max_nb_rx_queues;
+		dev_info->max_tx_queues = max_nb_tx_queues;
+	} else {
+		dev_info->max_rx_queues = (uint16_t)128;
+		dev_info->max_tx_queues = (uint16_t)512;
+	}
+
+	if (internals->mode == BONDING_MODE_8023AD &&
+		internals->mode4.slow_pkts.hw_filtering_en) {
+		dev_info->max_rx_queues--;
+		dev_info->max_tx_queues--;
+	}
 
 	dev_info->min_rx_bufsize = 0;
 
diff --git a/drivers/net/bonding/rte_eth_bond_version.map b/drivers/net/bonding/rte_eth_bond_version.map
index 2de0a7d..0ad2ba4 100644
--- a/drivers/net/bonding/rte_eth_bond_version.map
+++ b/drivers/net/bonding/rte_eth_bond_version.map
@@ -43,3 +43,12 @@  DPDK_16.07 {
 	rte_eth_bond_8023ad_setup;
 
 } DPDK_16.04;
+
+DPDK_17.08 {
+	global:
+
+	rte_eth_bond_8023ad_slow_pkt_hw_filter_enable;
+	rte_eth_bond_8023ad_slow_pkt_hw_filter_disable;
+
+	local: *;
+} DPDK_16.07;