[v3,1/2] app/testpmd: do not poll stopped queues

Message ID 20220307125351.697936-2-dkozlyuk@nvidia.com (mailing list archive)
State Accepted, archived
Delegated to: Thomas Monjalon
Headers
Series app/testpmd: skip stopped queues when forwarding |

Checks

Context Check Description
ci/checkpatch warning coding style issues

Commit Message

Dmitry Kozlyuk March 7, 2022, 12:53 p.m. UTC
  Calling Rx/Tx functions on a stopped queue is not supported.
Do not run packet forwarding for streams that use stopped queues.

Each stream has a read-only "disabled" field,
so that lcore function can skip such streams.
Forwarding engines can set this field
using a new "stream_init" callback function
by checking relevant queue states,
which are stored along with queue configurations
(not all PMDs implement rte_eth_rx/tx_queue_info_get()
to query the state from there).

Fixes: 5f4ec54f1d16 ("testpmd: queue start and stop")
Cc: stable@dpdk.org

Signed-off-by: Dmitry Kozlyuk <dkozlyuk@nvidia.com>
Acked-by: Matan Azrad <matan@nvidia.com>
---
 app/test-pmd/5tswap.c         | 13 ++++++
 app/test-pmd/cmdline.c        | 45 ++++++++++--------
 app/test-pmd/config.c         |  8 ++--
 app/test-pmd/csumonly.c       | 13 ++++++
 app/test-pmd/flowgen.c        | 13 ++++++
 app/test-pmd/icmpecho.c       | 13 ++++++
 app/test-pmd/ieee1588fwd.c    | 13 ++++++
 app/test-pmd/iofwd.c          | 13 ++++++
 app/test-pmd/macfwd.c         | 13 ++++++
 app/test-pmd/macswap.c        | 13 ++++++
 app/test-pmd/noisy_vnf.c      | 13 ++++++
 app/test-pmd/rxonly.c         |  8 ++++
 app/test-pmd/shared_rxq_fwd.c |  8 ++++
 app/test-pmd/testpmd.c        | 87 ++++++++++++++++++++++-------------
 app/test-pmd/testpmd.h        | 19 +++++++-
 app/test-pmd/txonly.c         |  8 ++++
 16 files changed, 244 insertions(+), 56 deletions(-)
  

Comments

Dmitry Kozlyuk March 9, 2022, 10:36 a.m. UTC | #1
> Calling Rx/Tx functions on a stopped queue is not supported.
> Do not run packet forwarding for streams that use stopped queues.
> 
> Each stream has a read-only "disabled" field,
> so that lcore function can skip such streams.
> Forwarding engines can set this field
> using a new "stream_init" callback function
> by checking relevant queue states,
> which are stored along with queue configurations
> (not all PMDs implement rte_eth_rx/tx_queue_info_get()
> to query the state from there).
> 
> Fixes: 5f4ec54f1d16 ("testpmd: queue start and stop")
> Cc: stable@dpdk.org
> 
> Signed-off-by: Dmitry Kozlyuk <dkozlyuk@nvidia.com>
> Acked-by: Matan Azrad <matan@nvidia.com>
> ---

Pinging testpmd maintainers for this bugfix.
The sibling ethdev patch will have to wait until the next release cycle.
  
Thomas Monjalon May 25, 2022, 3:46 p.m. UTC | #2
07/03/2022 13:53, Dmitry Kozlyuk:
> Calling Rx/Tx functions on a stopped queue is not supported.
> Do not run packet forwarding for streams that use stopped queues.
> 
> Each stream has a read-only "disabled" field,
> so that lcore function can skip such streams.
> Forwarding engines can set this field
> using a new "stream_init" callback function
> by checking relevant queue states,
> which are stored along with queue configurations
> (not all PMDs implement rte_eth_rx/tx_queue_info_get()
> to query the state from there).
> 
> Fixes: 5f4ec54f1d16 ("testpmd: queue start and stop")
> Cc: stable@dpdk.org
> 
> Signed-off-by: Dmitry Kozlyuk <dkozlyuk@nvidia.com>
> Acked-by: Matan Azrad <matan@nvidia.com>

This patch is waiting for long without any comment.

Applied in next-net, thanks.
  
Yu Jiang June 10, 2022, 11:28 a.m. UTC | #3
Hi Kozlyuk,

Here is bugzilla ticket https://bugs.dpdk.org/show_bug.cgi?id=1028, its bad commit id is this patch.
Could you please have a look? Thanks so much.

Best regards,
Yu Jiang

> -----Original Message-----
> From: Thomas Monjalon <thomas@monjalon.net>
> Sent: Wednesday, May 25, 2022 11:46 PM
> To: Dmitry Kozlyuk <dkozlyuk@nvidia.com>
> Cc: dev@dpdk.org; stable@dpdk.org; Matan Azrad <matan@nvidia.com>; Li,
> Xiaoyun <xiaoyun.li@intel.com>; Singh, Aman Deep
> <aman.deep.singh@intel.com>; Zhang, Yuying <yuying.zhang@intel.com>
> Subject: Re: [PATCH v3 1/2] app/testpmd: do not poll stopped queues
> 
> 07/03/2022 13:53, Dmitry Kozlyuk:
> > Calling Rx/Tx functions on a stopped queue is not supported.
> > Do not run packet forwarding for streams that use stopped queues.
> >
> > Each stream has a read-only "disabled" field, so that lcore function
> > can skip such streams.
> > Forwarding engines can set this field
> > using a new "stream_init" callback function by checking relevant queue
> > states, which are stored along with queue configurations (not all PMDs
> > implement rte_eth_rx/tx_queue_info_get() to query the state from
> > there).
> >
> > Fixes: 5f4ec54f1d16 ("testpmd: queue start and stop")
> > Cc: stable@dpdk.org
> >
> > Signed-off-by: Dmitry Kozlyuk <dkozlyuk@nvidia.com>
> > Acked-by: Matan Azrad <matan@nvidia.com>
> 
> This patch is waiting for long without any comment.
> 
> Applied in next-net, thanks.
>
  
Stephen Hemminger July 8, 2023, 1:54 a.m. UTC | #4
On Wed, 9 Mar 2022 10:36:15 +0000
Dmitry Kozlyuk <dkozlyuk@nvidia.com> wrote:

> > Calling Rx/Tx functions on a stopped queue is not supported.
> > Do not run packet forwarding for streams that use stopped queues.
> > 
> > Each stream has a read-only "disabled" field,
> > so that lcore function can skip such streams.
> > Forwarding engines can set this field
> > using a new "stream_init" callback function
> > by checking relevant queue states,
> > which are stored along with queue configurations
> > (not all PMDs implement rte_eth_rx/tx_queue_info_get()
> > to query the state from there).
> > 
> > Fixes: 5f4ec54f1d16 ("testpmd: queue start and stop")
> > Cc: stable@dpdk.org
> > 
> > Signed-off-by: Dmitry Kozlyuk <dkozlyuk@nvidia.com>
> > Acked-by: Matan Azrad <matan@nvidia.com>
> > ---  
> 
> Pinging testpmd maintainers for this bugfix.
> The sibling ethdev patch will have to wait until the next release cycle.

So you broke a bunch of drivers now that is merged???
Sound like it needs to be reverted. Certainly not something that should
go into stable tree if you expect all drivers to correctly set queue state.
  

Patch

diff --git a/app/test-pmd/5tswap.c b/app/test-pmd/5tswap.c
index 629d3e0d31..f041a5e1d5 100644
--- a/app/test-pmd/5tswap.c
+++ b/app/test-pmd/5tswap.c
@@ -185,9 +185,22 @@  pkt_burst_5tuple_swap(struct fwd_stream *fs)
 	get_end_cycles(fs, start_tsc);
 }
 
+static void
+stream_init_5tuple_swap(struct fwd_stream *fs)
+{
+	bool rx_stopped, tx_stopped;
+
+	rx_stopped = ports[fs->rx_port].rxq[fs->rx_queue].state ==
+						RTE_ETH_QUEUE_STATE_STOPPED;
+	tx_stopped = ports[fs->tx_port].txq[fs->tx_queue].state ==
+						RTE_ETH_QUEUE_STATE_STOPPED;
+	fs->disabled = rx_stopped || tx_stopped;
+}
+
 struct fwd_engine five_tuple_swap_fwd_engine = {
 	.fwd_mode_name  = "5tswap",
 	.port_fwd_begin = NULL,
 	.port_fwd_end   = NULL,
+	.stream_init    = stream_init_5tuple_swap,
 	.packet_fwd     = pkt_burst_5tuple_swap,
 };
diff --git a/app/test-pmd/cmdline.c b/app/test-pmd/cmdline.c
index 7ab0575e64..2ca935f086 100644
--- a/app/test-pmd/cmdline.c
+++ b/app/test-pmd/cmdline.c
@@ -2658,8 +2658,10 @@  cmd_config_rxtx_queue_parsed(void *parsed_result,
 			__rte_unused void *data)
 {
 	struct cmd_config_rxtx_queue *res = parsed_result;
+	struct rte_port *port;
 	uint8_t isrx;
 	uint8_t isstart;
+	uint8_t *state;
 	int ret = 0;
 
 	if (test_done == 0) {
@@ -2707,8 +2709,15 @@  cmd_config_rxtx_queue_parsed(void *parsed_result,
 	else
 		ret = rte_eth_dev_tx_queue_stop(res->portid, res->qid);
 
-	if (ret == -ENOTSUP)
+	if (ret == -ENOTSUP) {
 		fprintf(stderr, "Function not supported in PMD\n");
+		return;
+	}
+
+	port = &ports[res->portid];
+	state = isrx ? &port->rxq[res->qid].state : &port->txq[res->qid].state;
+	*state = isstart ? RTE_ETH_QUEUE_STATE_STARTED :
+			   RTE_ETH_QUEUE_STATE_STOPPED;
 }
 
 cmdline_parse_token_string_t cmd_config_rxtx_queue_port =
@@ -2777,11 +2786,11 @@  cmd_config_deferred_start_rxtx_queue_parsed(void *parsed_result,
 
 	ison = !strcmp(res->state, "on");
 
-	if (isrx && port->rx_conf[res->qid].rx_deferred_start != ison) {
-		port->rx_conf[res->qid].rx_deferred_start = ison;
+	if (isrx && port->rxq[res->qid].conf.rx_deferred_start != ison) {
+		port->rxq[res->qid].conf.rx_deferred_start = ison;
 		needreconfig = 1;
-	} else if (!isrx && port->tx_conf[res->qid].tx_deferred_start != ison) {
-		port->tx_conf[res->qid].tx_deferred_start = ison;
+	} else if (!isrx && port->txq[res->qid].conf.tx_deferred_start != ison) {
+		port->txq[res->qid].conf.tx_deferred_start = ison;
 		needreconfig = 1;
 	}
 
@@ -2899,7 +2908,7 @@  cmd_setup_rxtx_queue_parsed(
 				     res->qid,
 				     port->nb_rx_desc[res->qid],
 				     socket_id,
-				     &port->rx_conf[res->qid],
+				     &port->rxq[res->qid].conf,
 				     mp);
 		if (ret)
 			fprintf(stderr, "Failed to setup RX queue\n");
@@ -2917,7 +2926,7 @@  cmd_setup_rxtx_queue_parsed(
 					     res->qid,
 					     port->nb_tx_desc[res->qid],
 					     socket_id,
-					     &port->tx_conf[res->qid]);
+					     &port->txq[res->qid].conf);
 		if (ret)
 			fprintf(stderr, "Failed to setup TX queue\n");
 	}
@@ -4693,7 +4702,7 @@  cmd_config_queue_tx_offloads(struct rte_port *port)
 
 	/* Apply queue tx offloads configuration */
 	for (k = 0; k < port->dev_info.max_tx_queues; k++)
-		port->tx_conf[k].offloads =
+		port->txq[k].conf.offloads =
 			port->dev_conf.txmode.offloads;
 }
 
@@ -16204,7 +16213,7 @@  cmd_rx_offload_get_configuration_parsed(
 
 	nb_rx_queues = dev_info.nb_rx_queues;
 	for (q = 0; q < nb_rx_queues; q++) {
-		queue_offloads = port->rx_conf[q].offloads;
+		queue_offloads = port->rxq[q].conf.offloads;
 		printf("  Queue[%2d] :", q);
 		print_rx_offloads(queue_offloads);
 		printf("\n");
@@ -16324,11 +16333,11 @@  cmd_config_per_port_rx_offload_parsed(void *parsed_result,
 	if (!strcmp(res->on_off, "on")) {
 		port->dev_conf.rxmode.offloads |= single_offload;
 		for (q = 0; q < nb_rx_queues; q++)
-			port->rx_conf[q].offloads |= single_offload;
+			port->rxq[q].conf.offloads |= single_offload;
 	} else {
 		port->dev_conf.rxmode.offloads &= ~single_offload;
 		for (q = 0; q < nb_rx_queues; q++)
-			port->rx_conf[q].offloads &= ~single_offload;
+			port->rxq[q].conf.offloads &= ~single_offload;
 	}
 
 	cmd_reconfig_device_queue(port_id, 1, 1);
@@ -16434,9 +16443,9 @@  cmd_config_per_queue_rx_offload_parsed(void *parsed_result,
 	}
 
 	if (!strcmp(res->on_off, "on"))
-		port->rx_conf[queue_id].offloads |= single_offload;
+		port->rxq[queue_id].conf.offloads |= single_offload;
 	else
-		port->rx_conf[queue_id].offloads &= ~single_offload;
+		port->rxq[queue_id].conf.offloads &= ~single_offload;
 
 	cmd_reconfig_device_queue(port_id, 1, 1);
 }
@@ -16623,7 +16632,7 @@  cmd_tx_offload_get_configuration_parsed(
 
 	nb_tx_queues = dev_info.nb_tx_queues;
 	for (q = 0; q < nb_tx_queues; q++) {
-		queue_offloads = port->tx_conf[q].offloads;
+		queue_offloads = port->txq[q].conf.offloads;
 		printf("  Queue[%2d] :", q);
 		print_tx_offloads(queue_offloads);
 		printf("\n");
@@ -16747,11 +16756,11 @@  cmd_config_per_port_tx_offload_parsed(void *parsed_result,
 	if (!strcmp(res->on_off, "on")) {
 		port->dev_conf.txmode.offloads |= single_offload;
 		for (q = 0; q < nb_tx_queues; q++)
-			port->tx_conf[q].offloads |= single_offload;
+			port->txq[q].conf.offloads |= single_offload;
 	} else {
 		port->dev_conf.txmode.offloads &= ~single_offload;
 		for (q = 0; q < nb_tx_queues; q++)
-			port->tx_conf[q].offloads &= ~single_offload;
+			port->txq[q].conf.offloads &= ~single_offload;
 	}
 
 	cmd_reconfig_device_queue(port_id, 1, 1);
@@ -16860,9 +16869,9 @@  cmd_config_per_queue_tx_offload_parsed(void *parsed_result,
 	}
 
 	if (!strcmp(res->on_off, "on"))
-		port->tx_conf[queue_id].offloads |= single_offload;
+		port->txq[queue_id].conf.offloads |= single_offload;
 	else
-		port->tx_conf[queue_id].offloads &= ~single_offload;
+		port->txq[queue_id].conf.offloads &= ~single_offload;
 
 	cmd_reconfig_device_queue(port_id, 1, 1);
 }
diff --git a/app/test-pmd/config.c b/app/test-pmd/config.c
index cc8e7aa138..c4ab3f8cae 100644
--- a/app/test-pmd/config.c
+++ b/app/test-pmd/config.c
@@ -3551,8 +3551,8 @@  rxtx_config_display(void)
 	       nb_fwd_lcores, nb_fwd_ports);
 
 	RTE_ETH_FOREACH_DEV(pid) {
-		struct rte_eth_rxconf *rx_conf = &ports[pid].rx_conf[0];
-		struct rte_eth_txconf *tx_conf = &ports[pid].tx_conf[0];
+		struct rte_eth_rxconf *rx_conf = &ports[pid].rxq[0].conf;
+		struct rte_eth_txconf *tx_conf = &ports[pid].txq[0].conf;
 		uint16_t *nb_rx_desc = &ports[pid].nb_rx_desc[0];
 		uint16_t *nb_tx_desc = &ports[pid].nb_tx_desc[0];
 		struct rte_eth_rxq_info rx_qinfo;
@@ -3810,7 +3810,7 @@  fwd_stream_on_other_lcores(uint16_t domain_id, lcoreid_t src_lc,
 			fs = fwd_streams[sm_id];
 			port = &ports[fs->rx_port];
 			dev_info = &port->dev_info;
-			rxq_conf = &port->rx_conf[fs->rx_queue];
+			rxq_conf = &port->rxq[fs->rx_queue].conf;
 			if ((dev_info->dev_capa & RTE_ETH_DEV_CAPA_RXQ_SHARE)
 			    == 0 || rxq_conf->share_group == 0)
 				/* Not shared rxq. */
@@ -3870,7 +3870,7 @@  pkt_fwd_shared_rxq_check(void)
 			fs->lcore = fwd_lcores[lc_id];
 			port = &ports[fs->rx_port];
 			dev_info = &port->dev_info;
-			rxq_conf = &port->rx_conf[fs->rx_queue];
+			rxq_conf = &port->rxq[fs->rx_queue].conf;
 			if ((dev_info->dev_capa & RTE_ETH_DEV_CAPA_RXQ_SHARE)
 			    == 0 || rxq_conf->share_group == 0)
 				/* Not shared rxq. */
diff --git a/app/test-pmd/csumonly.c b/app/test-pmd/csumonly.c
index 5274d498ee..eb58ca1906 100644
--- a/app/test-pmd/csumonly.c
+++ b/app/test-pmd/csumonly.c
@@ -1178,9 +1178,22 @@  pkt_burst_checksum_forward(struct fwd_stream *fs)
 	get_end_cycles(fs, start_tsc);
 }
 
+static void
+stream_init_checksum_forward(struct fwd_stream *fs)
+{
+	bool rx_stopped, tx_stopped;
+
+	rx_stopped = ports[fs->rx_port].rxq[fs->rx_queue].state ==
+						RTE_ETH_QUEUE_STATE_STOPPED;
+	tx_stopped = ports[fs->tx_port].txq[fs->tx_queue].state ==
+						RTE_ETH_QUEUE_STATE_STOPPED;
+	fs->disabled = rx_stopped || tx_stopped;
+}
+
 struct fwd_engine csum_fwd_engine = {
 	.fwd_mode_name  = "csum",
 	.port_fwd_begin = NULL,
 	.port_fwd_end   = NULL,
+	.stream_init    = stream_init_checksum_forward,
 	.packet_fwd     = pkt_burst_checksum_forward,
 };
diff --git a/app/test-pmd/flowgen.c b/app/test-pmd/flowgen.c
index 9ceef3b54a..1e01120ae9 100644
--- a/app/test-pmd/flowgen.c
+++ b/app/test-pmd/flowgen.c
@@ -207,9 +207,22 @@  flowgen_begin(portid_t pi)
 	return 0;
 }
 
+static void
+flowgen_stream_init(struct fwd_stream *fs)
+{
+	bool rx_stopped, tx_stopped;
+
+	rx_stopped = ports[fs->rx_port].rxq[fs->rx_queue].state ==
+						RTE_ETH_QUEUE_STATE_STOPPED;
+	tx_stopped = ports[fs->tx_port].txq[fs->tx_queue].state ==
+						RTE_ETH_QUEUE_STATE_STOPPED;
+	fs->disabled = rx_stopped || tx_stopped;
+}
+
 struct fwd_engine flow_gen_engine = {
 	.fwd_mode_name  = "flowgen",
 	.port_fwd_begin = flowgen_begin,
 	.port_fwd_end   = NULL,
+	.stream_init    = flowgen_stream_init,
 	.packet_fwd     = pkt_burst_flow_gen,
 };
diff --git a/app/test-pmd/icmpecho.c b/app/test-pmd/icmpecho.c
index 99c94cb282..066f2a3ab7 100644
--- a/app/test-pmd/icmpecho.c
+++ b/app/test-pmd/icmpecho.c
@@ -512,9 +512,22 @@  reply_to_icmp_echo_rqsts(struct fwd_stream *fs)
 	get_end_cycles(fs, start_tsc);
 }
 
+static void
+icmpecho_stream_init(struct fwd_stream *fs)
+{
+	bool rx_stopped, tx_stopped;
+
+	rx_stopped = ports[fs->rx_port].rxq[fs->rx_queue].state ==
+						RTE_ETH_QUEUE_STATE_STOPPED;
+	tx_stopped = ports[fs->tx_port].txq[fs->tx_queue].state ==
+						RTE_ETH_QUEUE_STATE_STOPPED;
+	fs->disabled = rx_stopped || tx_stopped;
+}
+
 struct fwd_engine icmp_echo_engine = {
 	.fwd_mode_name  = "icmpecho",
 	.port_fwd_begin = NULL,
 	.port_fwd_end   = NULL,
+	.stream_init    = icmpecho_stream_init,
 	.packet_fwd     = reply_to_icmp_echo_rqsts,
 };
diff --git a/app/test-pmd/ieee1588fwd.c b/app/test-pmd/ieee1588fwd.c
index 9ff817aa68..fc4e2d014c 100644
--- a/app/test-pmd/ieee1588fwd.c
+++ b/app/test-pmd/ieee1588fwd.c
@@ -211,9 +211,22 @@  port_ieee1588_fwd_end(portid_t pi)
 	rte_eth_timesync_disable(pi);
 }
 
+static void
+port_ieee1588_stream_init(struct fwd_stream *fs)
+{
+	bool rx_stopped, tx_stopped;
+
+	rx_stopped = ports[fs->rx_port].rxq[fs->rx_queue].state ==
+						RTE_ETH_QUEUE_STATE_STOPPED;
+	tx_stopped = ports[fs->tx_port].txq[fs->tx_queue].state ==
+						RTE_ETH_QUEUE_STATE_STOPPED;
+	fs->disabled = rx_stopped || tx_stopped;
+}
+
 struct fwd_engine ieee1588_fwd_engine = {
 	.fwd_mode_name  = "ieee1588",
 	.port_fwd_begin = port_ieee1588_fwd_begin,
 	.port_fwd_end   = port_ieee1588_fwd_end,
+	.stream_init    = port_ieee1588_stream_init,
 	.packet_fwd     = ieee1588_packet_fwd,
 };
diff --git a/app/test-pmd/iofwd.c b/app/test-pmd/iofwd.c
index 19cd920f70..71849aaf96 100644
--- a/app/test-pmd/iofwd.c
+++ b/app/test-pmd/iofwd.c
@@ -88,9 +88,22 @@  pkt_burst_io_forward(struct fwd_stream *fs)
 	get_end_cycles(fs, start_tsc);
 }
 
+static void
+stream_init_forward(struct fwd_stream *fs)
+{
+	bool rx_stopped, tx_stopped;
+
+	rx_stopped = ports[fs->rx_port].rxq[fs->rx_queue].state ==
+						RTE_ETH_QUEUE_STATE_STOPPED;
+	tx_stopped = ports[fs->tx_port].txq[fs->tx_queue].state ==
+						RTE_ETH_QUEUE_STATE_STOPPED;
+	fs->disabled = rx_stopped || tx_stopped;
+}
+
 struct fwd_engine io_fwd_engine = {
 	.fwd_mode_name  = "io",
 	.port_fwd_begin = NULL,
 	.port_fwd_end   = NULL,
+	.stream_init    = stream_init_forward,
 	.packet_fwd     = pkt_burst_io_forward,
 };
diff --git a/app/test-pmd/macfwd.c b/app/test-pmd/macfwd.c
index 812a0c721f..79c9241d00 100644
--- a/app/test-pmd/macfwd.c
+++ b/app/test-pmd/macfwd.c
@@ -119,9 +119,22 @@  pkt_burst_mac_forward(struct fwd_stream *fs)
 	get_end_cycles(fs, start_tsc);
 }
 
+static void
+stream_init_mac_forward(struct fwd_stream *fs)
+{
+	bool rx_stopped, tx_stopped;
+
+	rx_stopped = ports[fs->rx_port].rxq[fs->rx_queue].state ==
+						RTE_ETH_QUEUE_STATE_STOPPED;
+	tx_stopped = ports[fs->tx_port].txq[fs->tx_queue].state ==
+						RTE_ETH_QUEUE_STATE_STOPPED;
+	fs->disabled = rx_stopped || tx_stopped;
+}
+
 struct fwd_engine mac_fwd_engine = {
 	.fwd_mode_name  = "mac",
 	.port_fwd_begin = NULL,
 	.port_fwd_end   = NULL,
+	.stream_init    = stream_init_mac_forward,
 	.packet_fwd     = pkt_burst_mac_forward,
 };
diff --git a/app/test-pmd/macswap.c b/app/test-pmd/macswap.c
index 4627ff83e9..acb0fd7fb4 100644
--- a/app/test-pmd/macswap.c
+++ b/app/test-pmd/macswap.c
@@ -97,9 +97,22 @@  pkt_burst_mac_swap(struct fwd_stream *fs)
 	get_end_cycles(fs, start_tsc);
 }
 
+static void
+stream_init_mac_swap(struct fwd_stream *fs)
+{
+	bool rx_stopped, tx_stopped;
+
+	rx_stopped = ports[fs->rx_port].rxq[fs->rx_queue].state ==
+						RTE_ETH_QUEUE_STATE_STOPPED;
+	tx_stopped = ports[fs->tx_port].txq[fs->tx_queue].state ==
+						RTE_ETH_QUEUE_STATE_STOPPED;
+	fs->disabled = rx_stopped || tx_stopped;
+}
+
 struct fwd_engine mac_swap_engine = {
 	.fwd_mode_name  = "macswap",
 	.port_fwd_begin = NULL,
 	.port_fwd_end   = NULL,
+	.stream_init    = stream_init_mac_swap,
 	.packet_fwd     = pkt_burst_mac_swap,
 };
diff --git a/app/test-pmd/noisy_vnf.c b/app/test-pmd/noisy_vnf.c
index e4434bea95..a92e810190 100644
--- a/app/test-pmd/noisy_vnf.c
+++ b/app/test-pmd/noisy_vnf.c
@@ -277,9 +277,22 @@  noisy_fwd_begin(portid_t pi)
 	return 0;
 }
 
+static void
+stream_init_noisy_vnf(struct fwd_stream *fs)
+{
+	bool rx_stopped, tx_stopped;
+
+	rx_stopped = ports[fs->rx_port].rxq[fs->rx_queue].state ==
+						RTE_ETH_QUEUE_STATE_STOPPED;
+	tx_stopped = ports[fs->tx_port].txq[fs->tx_queue].state ==
+						RTE_ETH_QUEUE_STATE_STOPPED;
+	fs->disabled = rx_stopped || tx_stopped;
+}
+
 struct fwd_engine noisy_vnf_engine = {
 	.fwd_mode_name  = "noisy",
 	.port_fwd_begin = noisy_fwd_begin,
 	.port_fwd_end   = noisy_fwd_end,
+	.stream_init    = stream_init_noisy_vnf,
 	.packet_fwd     = pkt_burst_noisy_vnf,
 };
diff --git a/app/test-pmd/rxonly.c b/app/test-pmd/rxonly.c
index d1a579d8d8..04457010f4 100644
--- a/app/test-pmd/rxonly.c
+++ b/app/test-pmd/rxonly.c
@@ -68,9 +68,17 @@  pkt_burst_receive(struct fwd_stream *fs)
 	get_end_cycles(fs, start_tsc);
 }
 
+static void
+stream_init_receive(struct fwd_stream *fs)
+{
+	fs->disabled = ports[fs->rx_port].rxq[fs->rx_queue].state ==
+						RTE_ETH_QUEUE_STATE_STOPPED;
+}
+
 struct fwd_engine rx_only_engine = {
 	.fwd_mode_name  = "rxonly",
 	.port_fwd_begin = NULL,
 	.port_fwd_end   = NULL,
+	.stream_init    = stream_init_receive,
 	.packet_fwd     = pkt_burst_receive,
 };
diff --git a/app/test-pmd/shared_rxq_fwd.c b/app/test-pmd/shared_rxq_fwd.c
index da54a383fd..2e9047804b 100644
--- a/app/test-pmd/shared_rxq_fwd.c
+++ b/app/test-pmd/shared_rxq_fwd.c
@@ -107,9 +107,17 @@  shared_rxq_fwd(struct fwd_stream *fs)
 	get_end_cycles(fs, start_tsc);
 }
 
+static void
+shared_rxq_stream_init(struct fwd_stream *fs)
+{
+	fs->disabled = ports[fs->rx_port].rxq[fs->rx_queue].state ==
+						RTE_ETH_QUEUE_STATE_STOPPED;
+}
+
 struct fwd_engine shared_rxq_engine = {
 	.fwd_mode_name  = "shared_rxq",
 	.port_fwd_begin = NULL,
 	.port_fwd_end   = NULL,
+	.stream_init    = shared_rxq_stream_init,
 	.packet_fwd     = shared_rxq_fwd,
 };
diff --git a/app/test-pmd/testpmd.c b/app/test-pmd/testpmd.c
index fe2ce19f99..52175a6cd2 100644
--- a/app/test-pmd/testpmd.c
+++ b/app/test-pmd/testpmd.c
@@ -1573,10 +1573,10 @@  init_config_port_offloads(portid_t pid, uint32_t socket_id)
 
 	/* Apply Rx offloads configuration */
 	for (i = 0; i < port->dev_info.max_rx_queues; i++)
-		port->rx_conf[i].offloads = port->dev_conf.rxmode.offloads;
+		port->rxq[i].conf.offloads = port->dev_conf.rxmode.offloads;
 	/* Apply Tx offloads configuration */
 	for (i = 0; i < port->dev_info.max_tx_queues; i++)
-		port->tx_conf[i].offloads = port->dev_conf.txmode.offloads;
+		port->txq[i].conf.offloads = port->dev_conf.txmode.offloads;
 
 	if (eth_link_speed)
 		port->dev_conf.link_speeds = eth_link_speed;
@@ -1763,7 +1763,6 @@  reconfig(portid_t new_port_id, unsigned socket_id)
 	init_port_config();
 }
 
-
 int
 init_fwd_streams(void)
 {
@@ -2156,6 +2155,12 @@  flush_fwd_rx_queues(void)
 		for (rxp = 0; rxp < cur_fwd_config.nb_fwd_ports; rxp++) {
 			for (rxq = 0; rxq < nb_rxq; rxq++) {
 				port_id = fwd_ports_ids[rxp];
+
+				/* Polling stopped queues is prohibited. */
+				if (ports[port_id].rxq[rxq].state ==
+				    RTE_ETH_QUEUE_STATE_STOPPED)
+					continue;
+
 				/**
 				* testpmd can stuck in the below do while loop
 				* if rte_eth_rx_burst() always returns nonzero
@@ -2201,7 +2206,8 @@  run_pkt_fwd_on_lcore(struct fwd_lcore *fc, packet_fwd_t pkt_fwd)
 	nb_fs = fc->stream_nb;
 	do {
 		for (sm_id = 0; sm_id < nb_fs; sm_id++)
-			(*pkt_fwd)(fsm[sm_id]);
+			if (!fsm[sm_id]->disabled)
+				(*pkt_fwd)(fsm[sm_id]);
 #ifdef RTE_LIB_BITRATESTATS
 		if (bitrate_enabled != 0 &&
 				bitrate_lcore_id == rte_lcore_id()) {
@@ -2283,6 +2289,7 @@  start_packet_forwarding(int with_tx_first)
 {
 	port_fwd_begin_t port_fwd_begin;
 	port_fwd_end_t  port_fwd_end;
+	stream_init_t stream_init = cur_fwd_eng->stream_init;
 	unsigned int i;
 
 	if (strcmp(cur_fwd_eng->fwd_mode_name, "rxonly") == 0 && !nb_rxq)
@@ -2313,6 +2320,10 @@  start_packet_forwarding(int with_tx_first)
 	if (!pkt_fwd_shared_rxq_check())
 		return;
 
+	if (stream_init != NULL)
+		for (i = 0; i < cur_fwd_config.nb_fwd_streams; i++)
+			stream_init(fwd_streams[i]);
+
 	port_fwd_begin = cur_fwd_config.fwd_eng->port_fwd_begin;
 	if (port_fwd_begin != NULL) {
 		for (i = 0; i < cur_fwd_config.nb_fwd_ports; i++) {
@@ -2574,7 +2585,7 @@  rx_queue_setup(uint16_t port_id, uint16_t rx_queue_id,
 		ret = rte_eth_rx_queue_setup(port_id, rx_queue_id,
 					     nb_rx_desc, socket_id,
 					     rx_conf, mp);
-		return ret;
+		goto exit;
 	}
 	for (i = 0; i < rx_pkt_nb_segs; i++) {
 		struct rte_eth_rxseg_split *rx_seg = &rx_useg[i].split;
@@ -2599,6 +2610,10 @@  rx_queue_setup(uint16_t port_id, uint16_t rx_queue_id,
 				    socket_id, rx_conf, NULL);
 	rx_conf->rx_seg = NULL;
 	rx_conf->rx_nseg = 0;
+exit:
+	ports[port_id].rxq[rx_queue_id].state = rx_conf->rx_deferred_start ?
+						RTE_ETH_QUEUE_STATE_STOPPED :
+						RTE_ETH_QUEUE_STATE_STARTED;
 	return ret;
 }
 
@@ -2801,7 +2816,7 @@  start_port(portid_t pid)
 				for (k = 0;
 				     k < port->dev_info.max_rx_queues;
 				     k++)
-					port->rx_conf[k].offloads |=
+					port->rxq[k].conf.offloads |=
 						dev_conf.rxmode.offloads;
 			}
 			/* Apply Tx offloads configuration */
@@ -2812,7 +2827,7 @@  start_port(portid_t pid)
 				for (k = 0;
 				     k < port->dev_info.max_tx_queues;
 				     k++)
-					port->tx_conf[k].offloads |=
+					port->txq[k].conf.offloads |=
 						dev_conf.txmode.offloads;
 			}
 		}
@@ -2820,20 +2835,28 @@  start_port(portid_t pid)
 			port->need_reconfig_queues = 0;
 			/* setup tx queues */
 			for (qi = 0; qi < nb_txq; qi++) {
+				struct rte_eth_txconf *conf =
+							&port->txq[qi].conf;
+
 				if ((numa_support) &&
 					(txring_numa[pi] != NUMA_NO_CONFIG))
 					diag = rte_eth_tx_queue_setup(pi, qi,
 						port->nb_tx_desc[qi],
 						txring_numa[pi],
-						&(port->tx_conf[qi]));
+						&(port->txq[qi].conf));
 				else
 					diag = rte_eth_tx_queue_setup(pi, qi,
 						port->nb_tx_desc[qi],
 						port->socket_id,
-						&(port->tx_conf[qi]));
+						&(port->txq[qi].conf));
 
-				if (diag == 0)
+				if (diag == 0) {
+					port->txq[qi].state =
+						conf->tx_deferred_start ?
+						RTE_ETH_QUEUE_STATE_STOPPED :
+						RTE_ETH_QUEUE_STATE_STARTED;
 					continue;
+				}
 
 				/* Fail to setup tx queue, return */
 				if (port->port_status == RTE_PORT_HANDLING)
@@ -2866,7 +2889,7 @@  start_port(portid_t pid)
 					diag = rx_queue_setup(pi, qi,
 					     port->nb_rx_desc[qi],
 					     rxring_numa[pi],
-					     &(port->rx_conf[qi]),
+					     &(port->rxq[qi].conf),
 					     mp);
 				} else {
 					struct rte_mempool *mp =
@@ -2881,7 +2904,7 @@  start_port(portid_t pid)
 					diag = rx_queue_setup(pi, qi,
 					     port->nb_rx_desc[qi],
 					     port->socket_id,
-					     &(port->rx_conf[qi]),
+					     &(port->rxq[qi].conf),
 					     mp);
 				}
 				if (diag == 0)
@@ -3656,59 +3679,59 @@  rxtx_port_config(portid_t pid)
 	struct rte_port *port = &ports[pid];
 
 	for (qid = 0; qid < nb_rxq; qid++) {
-		offloads = port->rx_conf[qid].offloads;
-		port->rx_conf[qid] = port->dev_info.default_rxconf;
+		offloads = port->rxq[qid].conf.offloads;
+		port->rxq[qid].conf = port->dev_info.default_rxconf;
 
 		if (rxq_share > 0 &&
 		    (port->dev_info.dev_capa & RTE_ETH_DEV_CAPA_RXQ_SHARE)) {
 			/* Non-zero share group to enable RxQ share. */
-			port->rx_conf[qid].share_group = pid / rxq_share + 1;
-			port->rx_conf[qid].share_qid = qid; /* Equal mapping. */
+			port->rxq[qid].conf.share_group = pid / rxq_share + 1;
+			port->rxq[qid].conf.share_qid = qid; /* Equal mapping. */
 		}
 
 		if (offloads != 0)
-			port->rx_conf[qid].offloads = offloads;
+			port->rxq[qid].conf.offloads = offloads;
 
 		/* Check if any Rx parameters have been passed */
 		if (rx_pthresh != RTE_PMD_PARAM_UNSET)
-			port->rx_conf[qid].rx_thresh.pthresh = rx_pthresh;
+			port->rxq[qid].conf.rx_thresh.pthresh = rx_pthresh;
 
 		if (rx_hthresh != RTE_PMD_PARAM_UNSET)
-			port->rx_conf[qid].rx_thresh.hthresh = rx_hthresh;
+			port->rxq[qid].conf.rx_thresh.hthresh = rx_hthresh;
 
 		if (rx_wthresh != RTE_PMD_PARAM_UNSET)
-			port->rx_conf[qid].rx_thresh.wthresh = rx_wthresh;
+			port->rxq[qid].conf.rx_thresh.wthresh = rx_wthresh;
 
 		if (rx_free_thresh != RTE_PMD_PARAM_UNSET)
-			port->rx_conf[qid].rx_free_thresh = rx_free_thresh;
+			port->rxq[qid].conf.rx_free_thresh = rx_free_thresh;
 
 		if (rx_drop_en != RTE_PMD_PARAM_UNSET)
-			port->rx_conf[qid].rx_drop_en = rx_drop_en;
+			port->rxq[qid].conf.rx_drop_en = rx_drop_en;
 
 		port->nb_rx_desc[qid] = nb_rxd;
 	}
 
 	for (qid = 0; qid < nb_txq; qid++) {
-		offloads = port->tx_conf[qid].offloads;
-		port->tx_conf[qid] = port->dev_info.default_txconf;
+		offloads = port->txq[qid].conf.offloads;
+		port->txq[qid].conf = port->dev_info.default_txconf;
 		if (offloads != 0)
-			port->tx_conf[qid].offloads = offloads;
+			port->txq[qid].conf.offloads = offloads;
 
 		/* Check if any Tx parameters have been passed */
 		if (tx_pthresh != RTE_PMD_PARAM_UNSET)
-			port->tx_conf[qid].tx_thresh.pthresh = tx_pthresh;
+			port->txq[qid].conf.tx_thresh.pthresh = tx_pthresh;
 
 		if (tx_hthresh != RTE_PMD_PARAM_UNSET)
-			port->tx_conf[qid].tx_thresh.hthresh = tx_hthresh;
+			port->txq[qid].conf.tx_thresh.hthresh = tx_hthresh;
 
 		if (tx_wthresh != RTE_PMD_PARAM_UNSET)
-			port->tx_conf[qid].tx_thresh.wthresh = tx_wthresh;
+			port->txq[qid].conf.tx_thresh.wthresh = tx_wthresh;
 
 		if (tx_rs_thresh != RTE_PMD_PARAM_UNSET)
-			port->tx_conf[qid].tx_rs_thresh = tx_rs_thresh;
+			port->txq[qid].conf.tx_rs_thresh = tx_rs_thresh;
 
 		if (tx_free_thresh != RTE_PMD_PARAM_UNSET)
-			port->tx_conf[qid].tx_free_thresh = tx_free_thresh;
+			port->txq[qid].conf.tx_free_thresh = tx_free_thresh;
 
 		port->nb_tx_desc[qid] = nb_txd;
 	}
@@ -3789,7 +3812,7 @@  init_port_config(void)
 				for (i = 0;
 				     i < port->dev_info.nb_rx_queues;
 				     i++)
-					port->rx_conf[i].offloads &=
+					port->rxq[i].conf.offloads &=
 						~RTE_ETH_RX_OFFLOAD_RSS_HASH;
 			}
 		}
@@ -3963,7 +3986,7 @@  init_port_dcb_config(portid_t pid,
 	if (port_conf.rxmode.mq_mode == RTE_ETH_MQ_RX_VMDQ_DCB) {
 		port_conf.rxmode.offloads &= ~RTE_ETH_RX_OFFLOAD_RSS_HASH;
 		for (i = 0; i < nb_rxq; i++)
-			rte_port->rx_conf[i].offloads &=
+			rte_port->rxq[i].conf.offloads &=
 				~RTE_ETH_RX_OFFLOAD_RSS_HASH;
 	}
 
diff --git a/app/test-pmd/testpmd.h b/app/test-pmd/testpmd.h
index 31f766c965..1eda7b97ab 100644
--- a/app/test-pmd/testpmd.h
+++ b/app/test-pmd/testpmd.h
@@ -134,6 +134,7 @@  struct fwd_stream {
 	portid_t   tx_port;   /**< forwarding port of received packets */
 	queueid_t  tx_queue;  /**< TX queue to send forwarded packets */
 	streamid_t peer_addr; /**< index of peer ethernet address of packets */
+	bool       disabled;  /**< the stream is disabled and should not run */
 
 	unsigned int retry_enabled;
 
@@ -238,6 +239,18 @@  struct xstat_display_info {
 	bool	 allocated;
 };
 
+/** RX queue configuration and state. */
+struct port_rxqueue {
+	struct rte_eth_rxconf conf;
+	uint8_t state; /**< RTE_ETH_QUEUE_STATE_* value. */
+};
+
+/** TX queue configuration and state. */
+struct port_txqueue {
+	struct rte_eth_txconf conf;
+	uint8_t state; /**< RTE_ETH_QUEUE_STATE_* value. */
+};
+
 /**
  * The data structure associated with each port.
  */
@@ -260,8 +273,8 @@  struct rte_port {
 	uint8_t                 dcb_flag;   /**< enable dcb */
 	uint16_t                nb_rx_desc[RTE_MAX_QUEUES_PER_PORT+1]; /**< per queue rx desc number */
 	uint16_t                nb_tx_desc[RTE_MAX_QUEUES_PER_PORT+1]; /**< per queue tx desc number */
-	struct rte_eth_rxconf   rx_conf[RTE_MAX_QUEUES_PER_PORT+1]; /**< per queue rx configuration */
-	struct rte_eth_txconf   tx_conf[RTE_MAX_QUEUES_PER_PORT+1]; /**< per queue tx configuration */
+	struct port_rxqueue     rxq[RTE_MAX_QUEUES_PER_PORT+1]; /**< per queue rx configuration and state */
+	struct port_txqueue     txq[RTE_MAX_QUEUES_PER_PORT+1]; /**< per queue tx configuration and state */
 	struct rte_ether_addr   *mc_addr_pool; /**< pool of multicast addrs */
 	uint32_t                mc_addr_nb; /**< nb. of addr. in mc_addr_pool */
 	queueid_t               queue_nb; /**< nb. of queues for flow rules */
@@ -323,12 +336,14 @@  struct fwd_lcore {
  */
 typedef int (*port_fwd_begin_t)(portid_t pi);
 typedef void (*port_fwd_end_t)(portid_t pi);
+typedef void (*stream_init_t)(struct fwd_stream *fs);
 typedef void (*packet_fwd_t)(struct fwd_stream *fs);
 
 struct fwd_engine {
 	const char       *fwd_mode_name; /**< Forwarding mode name. */
 	port_fwd_begin_t port_fwd_begin; /**< NULL if nothing special to do. */
 	port_fwd_end_t   port_fwd_end;   /**< NULL if nothing special to do. */
+	stream_init_t    stream_init;    /**< NULL if nothing special to do. */
 	packet_fwd_t     packet_fwd;     /**< Mandatory. */
 };
 
diff --git a/app/test-pmd/txonly.c b/app/test-pmd/txonly.c
index fc039a622c..e1bc78b73d 100644
--- a/app/test-pmd/txonly.c
+++ b/app/test-pmd/txonly.c
@@ -504,9 +504,17 @@  tx_only_begin(portid_t pi)
 	return 0;
 }
 
+static void
+tx_only_stream_init(struct fwd_stream *fs)
+{
+	fs->disabled = ports[fs->tx_port].txq[fs->tx_queue].state ==
+						RTE_ETH_QUEUE_STATE_STOPPED;
+}
+
 struct fwd_engine tx_only_engine = {
 	.fwd_mode_name  = "txonly",
 	.port_fwd_begin = tx_only_begin,
 	.port_fwd_end   = NULL,
+	.stream_init    = tx_only_stream_init,
 	.packet_fwd     = pkt_burst_transmit,
 };