[1/2] net/mlx5: fix sync queue completion processing

Message ID 20231116080833.336377-2-getelson@nvidia.com (mailing list archive)
State Accepted, archived
Delegated to: Raslan Darawsheh
Headers
Series net/mlx5: fix completions processing |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Gregory Etelson Nov. 16, 2023, 8:08 a.m. UTC
  Indirect **SYNC** METER_MARK and CT update actions
do not remove completion after WQE post.
That implementation speeds up update time by avoiding HW timeout.
The completion is remoted before the following WQE post.
However, HWS queue updates do not reflect that behaviour.
Therefore, during port destruction sync queue may have
pending completions although the queue reports empty status.

The patch validates that number of pushed WQEs will not exceed queue
capacity. As the result, it allows to process more completions than
expected.

Fixes: 48fbb0e93d06 ("net/mlx5: support flow meter mark indirect action with HWS")
Cc: stable@dpdk.org

Signed-off-by: Gregory Etelson <getelson@nvidia.com>
Acked-by: Ori Kam <orika@nvidia.com>

---
 drivers/net/mlx5/mlx5_flow_hw.c | 267 +++++++++++++++++---------------
 1 file changed, 142 insertions(+), 125 deletions(-)
  

Patch

diff --git a/drivers/net/mlx5/mlx5_flow_hw.c b/drivers/net/mlx5/mlx5_flow_hw.c
index d72f0a66fb..fb2e6bf67b 100644
--- a/drivers/net/mlx5/mlx5_flow_hw.c
+++ b/drivers/net/mlx5/mlx5_flow_hw.c
@@ -273,6 +273,22 @@  static const struct rte_flow_item_eth ctrl_rx_eth_bcast_spec = {
 	.hdr.src_addr.addr_bytes = "\x00\x00\x00\x00\x00\x00",
 	.hdr.ether_type = 0,
 };
+
+static __rte_always_inline struct mlx5_hw_q_job *
+flow_hw_job_get(struct mlx5_priv *priv, uint32_t queue)
+{
+	MLX5_ASSERT(priv->hw_q[queue].job_idx <= priv->hw_q[queue].size);
+	return priv->hw_q[queue].job_idx ?
+	       priv->hw_q[queue].job[--priv->hw_q[queue].job_idx] : NULL;
+}
+
+static __rte_always_inline void
+flow_hw_job_put(struct mlx5_priv *priv, struct mlx5_hw_q_job *job, uint32_t queue)
+{
+	MLX5_ASSERT(priv->hw_q[queue].job_idx < priv->hw_q[queue].size);
+	priv->hw_q[queue].job[priv->hw_q[queue].job_idx++] = job;
+}
+
 static inline enum mlx5dr_matcher_insert_mode
 flow_hw_matcher_insert_mode_get(enum rte_flow_table_insertion_type insert_type)
 {
@@ -3297,10 +3313,10 @@  flow_hw_async_flow_create(struct rte_eth_dev *dev,
 		.burst = attr->postpone,
 	};
 	struct mlx5dr_rule_action rule_acts[MLX5_HW_MAX_ACTS];
-	struct rte_flow_hw *flow;
-	struct mlx5_hw_q_job *job;
+	struct rte_flow_hw *flow = NULL;
+	struct mlx5_hw_q_job *job = NULL;
 	const struct rte_flow_item *rule_items;
-	uint32_t flow_idx;
+	uint32_t flow_idx = 0;
 	uint32_t res_idx = 0;
 	int ret;
 
@@ -3308,7 +3324,8 @@  flow_hw_async_flow_create(struct rte_eth_dev *dev,
 		rte_errno = EINVAL;
 		goto error;
 	}
-	if (unlikely(!priv->hw_q[queue].job_idx)) {
+	job = flow_hw_job_get(priv, queue);
+	if (!job) {
 		rte_errno = ENOMEM;
 		goto error;
 	}
@@ -3317,16 +3334,15 @@  flow_hw_async_flow_create(struct rte_eth_dev *dev,
 		goto error;
 	mlx5_ipool_malloc(table->resource, &res_idx);
 	if (!res_idx)
-		goto flow_free;
+		goto error;
 	/*
 	 * Set the table here in order to know the destination table
-	 * when free the flow afterwards.
+	 * when free the flow afterward.
 	 */
 	flow->table = table;
 	flow->mt_idx = pattern_template_index;
 	flow->idx = flow_idx;
 	flow->res_idx = res_idx;
-	job = priv->hw_q[queue].job[--priv->hw_q[queue].job_idx];
 	/*
 	 * Set the job type here in order to know if the flow memory
 	 * should be freed or not when get the result from dequeue.
@@ -3354,25 +3370,25 @@  flow_hw_async_flow_create(struct rte_eth_dev *dev,
 				      pattern_template_index, actions,
 				      rule_acts, queue, error)) {
 		rte_errno = EINVAL;
-		goto free;
+		goto error;
 	}
 	rule_items = flow_hw_get_rule_items(dev, table, items,
 					    pattern_template_index, job);
 	if (!rule_items)
-		goto free;
+		goto error;
 	ret = mlx5dr_rule_create(table->matcher,
 				 pattern_template_index, rule_items,
 				 action_template_index, rule_acts,
 				 &rule_attr, (struct mlx5dr_rule *)flow->rule);
 	if (likely(!ret))
 		return (struct rte_flow *)flow;
-free:
-	/* Flow created fail, return the descriptor and flow memory. */
-	priv->hw_q[queue].job_idx++;
-	mlx5_ipool_free(table->resource, res_idx);
-flow_free:
-	mlx5_ipool_free(table->flow, flow_idx);
 error:
+	if (job)
+		flow_hw_job_put(priv, job, queue);
+	if (flow_idx)
+		mlx5_ipool_free(table->flow, flow_idx);
+	if (res_idx)
+		mlx5_ipool_free(table->resource, res_idx);
 	rte_flow_error_set(error, rte_errno,
 			   RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
 			   "fail to create rte flow");
@@ -3425,9 +3441,9 @@  flow_hw_async_flow_create_by_index(struct rte_eth_dev *dev,
 		.burst = attr->postpone,
 	};
 	struct mlx5dr_rule_action rule_acts[MLX5_HW_MAX_ACTS];
-	struct rte_flow_hw *flow;
-	struct mlx5_hw_q_job *job;
-	uint32_t flow_idx;
+	struct rte_flow_hw *flow = NULL;
+	struct mlx5_hw_q_job *job = NULL;
+	uint32_t flow_idx = 0;
 	uint32_t res_idx = 0;
 	int ret;
 
@@ -3435,7 +3451,8 @@  flow_hw_async_flow_create_by_index(struct rte_eth_dev *dev,
 		rte_errno = EINVAL;
 		goto error;
 	}
-	if (unlikely(!priv->hw_q[queue].job_idx)) {
+	job = flow_hw_job_get(priv, queue);
+	if (!job) {
 		rte_errno = ENOMEM;
 		goto error;
 	}
@@ -3444,7 +3461,7 @@  flow_hw_async_flow_create_by_index(struct rte_eth_dev *dev,
 		goto error;
 	mlx5_ipool_malloc(table->resource, &res_idx);
 	if (!res_idx)
-		goto flow_free;
+		goto error;
 	/*
 	 * Set the table here in order to know the destination table
 	 * when free the flow afterwards.
@@ -3453,7 +3470,6 @@  flow_hw_async_flow_create_by_index(struct rte_eth_dev *dev,
 	flow->mt_idx = 0;
 	flow->idx = flow_idx;
 	flow->res_idx = res_idx;
-	job = priv->hw_q[queue].job[--priv->hw_q[queue].job_idx];
 	/*
 	 * Set the job type here in order to know if the flow memory
 	 * should be freed or not when get the result from dequeue.
@@ -3478,20 +3494,20 @@  flow_hw_async_flow_create_by_index(struct rte_eth_dev *dev,
 				      &table->ats[action_template_index],
 				      0, actions, rule_acts, queue, error)) {
 		rte_errno = EINVAL;
-		goto free;
+		goto error;
 	}
 	ret = mlx5dr_rule_create(table->matcher,
 				 0, items, action_template_index, rule_acts,
 				 &rule_attr, (struct mlx5dr_rule *)flow->rule);
 	if (likely(!ret))
 		return (struct rte_flow *)flow;
-free:
-	/* Flow created fail, return the descriptor and flow memory. */
-	priv->hw_q[queue].job_idx++;
-	mlx5_ipool_free(table->resource, res_idx);
-flow_free:
-	mlx5_ipool_free(table->flow, flow_idx);
 error:
+	if (job)
+		flow_hw_job_put(priv, job, queue);
+	if (res_idx)
+		mlx5_ipool_free(table->resource, res_idx);
+	if (flow_idx)
+		mlx5_ipool_free(table->flow, flow_idx);
 	rte_flow_error_set(error, rte_errno,
 			   RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
 			   "fail to create rte flow");
@@ -3545,18 +3561,18 @@  flow_hw_async_flow_update(struct rte_eth_dev *dev,
 	struct rte_flow_hw *of = (struct rte_flow_hw *)flow;
 	struct rte_flow_hw *nf;
 	struct rte_flow_template_table *table = of->table;
-	struct mlx5_hw_q_job *job;
+	struct mlx5_hw_q_job *job = NULL;
 	uint32_t res_idx = 0;
 	int ret;
 
-	if (unlikely(!priv->hw_q[queue].job_idx)) {
+	job = flow_hw_job_get(priv, queue);
+	if (!job) {
 		rte_errno = ENOMEM;
 		goto error;
 	}
 	mlx5_ipool_malloc(table->resource, &res_idx);
 	if (!res_idx)
 		goto error;
-	job = priv->hw_q[queue].job[--priv->hw_q[queue].job_idx];
 	nf = job->upd_flow;
 	memset(nf, 0, sizeof(struct rte_flow_hw));
 	/*
@@ -3594,7 +3610,7 @@  flow_hw_async_flow_update(struct rte_eth_dev *dev,
 				      nf->mt_idx, actions,
 				      rule_acts, queue, error)) {
 		rte_errno = EINVAL;
-		goto free;
+		goto error;
 	}
 	/*
 	 * Switch the old flow and the new flow.
@@ -3605,11 +3621,12 @@  flow_hw_async_flow_update(struct rte_eth_dev *dev,
 					action_template_index, rule_acts, &rule_attr);
 	if (likely(!ret))
 		return 0;
-free:
-	/* Flow created fail, return the descriptor and flow memory. */
-	priv->hw_q[queue].job_idx++;
-	mlx5_ipool_free(table->resource, res_idx);
 error:
+	/* Flow created fail, return the descriptor and flow memory. */
+	if (job)
+		flow_hw_job_put(priv, job, queue);
+	if (res_idx)
+		mlx5_ipool_free(table->resource, res_idx);
 	return rte_flow_error_set(error, rte_errno,
 			RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
 			"fail to update rte flow");
@@ -3656,24 +3673,24 @@  flow_hw_async_flow_destroy(struct rte_eth_dev *dev,
 	struct mlx5_hw_q_job *job;
 	int ret;
 
-	if (unlikely(!priv->hw_q[queue].job_idx)) {
-		rte_errno = ENOMEM;
-		goto error;
-	}
-	job = priv->hw_q[queue].job[--priv->hw_q[queue].job_idx];
+	job = flow_hw_job_get(priv, queue);
+	if (!job)
+		return rte_flow_error_set(error, ENOMEM,
+					  RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
+					  "fail to destroy rte flow: flow queue full");
 	job->type = MLX5_HW_Q_JOB_TYPE_DESTROY;
 	job->user_data = user_data;
 	job->flow = fh;
 	rule_attr.user_data = job;
 	rule_attr.rule_idx = fh->rule_idx;
 	ret = mlx5dr_rule_destroy((struct mlx5dr_rule *)fh->rule, &rule_attr);
-	if (likely(!ret))
-		return 0;
-	priv->hw_q[queue].job_idx++;
-error:
-	return rte_flow_error_set(error, rte_errno,
-			RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
-			"fail to destroy rte flow");
+	if (ret) {
+		flow_hw_job_put(priv, job, queue);
+		return rte_flow_error_set(error, rte_errno,
+					  RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
+					  "fail to destroy rte flow");
+	}
+	return 0;
 }
 
 /**
@@ -3732,7 +3749,7 @@  __flow_hw_pull_indir_action_comp(struct rte_eth_dev *dev,
 {
 	struct mlx5_priv *priv = dev->data->dev_private;
 	struct rte_ring *r = priv->hw_q[queue].indir_cq;
-	struct mlx5_hw_q_job *job;
+	struct mlx5_hw_q_job *job = NULL;
 	void *user_data = NULL;
 	uint32_t type, idx;
 	struct mlx5_aso_mtr *aso_mtr;
@@ -3792,8 +3809,16 @@  __flow_hw_pull_indir_action_comp(struct rte_eth_dev *dev,
 							job->query.hw);
 				aso_ct->state = ASO_CONNTRACK_READY;
 			}
+		} else {
+			/*
+			 * rte_flow_op_result::user data can point to
+			 * struct mlx5_aso_mtr object as well
+			 */
+			if (queue == CTRL_QUEUE_ID(priv))
+				continue;
+			MLX5_ASSERT(false);
 		}
-		priv->hw_q[queue].job[priv->hw_q[queue].job_idx++] = job;
+		flow_hw_job_put(priv, job, queue);
 	}
 	return ret_comp;
 }
@@ -3865,7 +3890,7 @@  flow_hw_pull(struct rte_eth_dev *dev,
 				mlx5_ipool_free(job->flow->table->resource, res_idx);
 			}
 		}
-		priv->hw_q[queue].job[priv->hw_q[queue].job_idx++] = job;
+		flow_hw_job_put(priv, job, queue);
 	}
 	/* 2. Pull indirect action comp. */
 	if (ret < n_res)
@@ -3874,7 +3899,7 @@  flow_hw_pull(struct rte_eth_dev *dev,
 	return ret;
 }
 
-static inline void
+static inline uint32_t
 __flow_hw_push_action(struct rte_eth_dev *dev,
 		    uint32_t queue)
 {
@@ -3889,10 +3914,35 @@  __flow_hw_push_action(struct rte_eth_dev *dev,
 		rte_ring_dequeue(iq, &job);
 		rte_ring_enqueue(cq, job);
 	}
-	if (priv->hws_ctpool)
-		mlx5_aso_push_wqe(priv->sh, &priv->ct_mng->aso_sqs[queue]);
-	if (priv->hws_mpool)
-		mlx5_aso_push_wqe(priv->sh, &priv->hws_mpool->sq[queue]);
+	if (!priv->shared_host) {
+		if (priv->hws_ctpool)
+			mlx5_aso_push_wqe(priv->sh,
+					  &priv->ct_mng->aso_sqs[queue]);
+		if (priv->hws_mpool)
+			mlx5_aso_push_wqe(priv->sh,
+					  &priv->hws_mpool->sq[queue]);
+	}
+	return priv->hw_q[queue].size - priv->hw_q[queue].job_idx;
+}
+
+static int
+__flow_hw_push(struct rte_eth_dev *dev,
+	       uint32_t queue,
+	       struct rte_flow_error *error)
+{
+	struct mlx5_priv *priv = dev->data->dev_private;
+	int ret, num;
+
+	num = __flow_hw_push_action(dev, queue);
+	ret = mlx5dr_send_queue_action(priv->dr_ctx, queue,
+				       MLX5DR_SEND_QUEUE_ACTION_DRAIN_ASYNC);
+	if (ret) {
+		rte_flow_error_set(error, rte_errno,
+				   RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
+				   "fail to push flows");
+		return ret;
+	}
+	return num;
 }
 
 /**
@@ -3912,22 +3962,11 @@  __flow_hw_push_action(struct rte_eth_dev *dev,
  */
 static int
 flow_hw_push(struct rte_eth_dev *dev,
-	     uint32_t queue,
-	     struct rte_flow_error *error)
+	     uint32_t queue, struct rte_flow_error *error)
 {
-	struct mlx5_priv *priv = dev->data->dev_private;
-	int ret;
+	int ret = __flow_hw_push(dev, queue, error);
 
-	__flow_hw_push_action(dev, queue);
-	ret = mlx5dr_send_queue_action(priv->dr_ctx, queue,
-				       MLX5DR_SEND_QUEUE_ACTION_DRAIN_ASYNC);
-	if (ret) {
-		rte_flow_error_set(error, rte_errno,
-				   RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
-				   "fail to push flows");
-		return ret;
-	}
-	return 0;
+	return ret >= 0 ? 0 : ret;
 }
 
 /**
@@ -3937,8 +3976,6 @@  flow_hw_push(struct rte_eth_dev *dev,
  *   Pointer to the rte_eth_dev structure.
  * @param[in] queue
  *   The queue to pull the flow.
- * @param[in] pending_rules
- *   The pending flow number.
  * @param[out] error
  *   Pointer to error structure.
  *
@@ -3947,24 +3984,24 @@  flow_hw_push(struct rte_eth_dev *dev,
  */
 static int
 __flow_hw_pull_comp(struct rte_eth_dev *dev,
-		    uint32_t queue,
-		    uint32_t pending_rules,
-		    struct rte_flow_error *error)
+		    uint32_t queue, struct rte_flow_error *error)
 {
 	struct rte_flow_op_result comp[BURST_THR];
 	int ret, i, empty_loop = 0;
+	uint32_t pending_rules;
 
-	ret = flow_hw_push(dev, queue, error);
+	ret = __flow_hw_push(dev, queue, error);
 	if (ret < 0)
 		return ret;
+	pending_rules = ret;
 	while (pending_rules) {
 		ret = flow_hw_pull(dev, queue, comp, BURST_THR, error);
 		if (ret < 0)
 			return -1;
 		if (!ret) {
-			rte_delay_us_sleep(20000);
+			rte_delay_us_sleep(MLX5_ASO_WQE_CQE_RESPONSE_DELAY);
 			if (++empty_loop > 5) {
-				DRV_LOG(WARNING, "No available dequeue, quit.");
+				DRV_LOG(WARNING, "No available dequeue %u, quit.", pending_rules);
 				break;
 			}
 			continue;
@@ -3973,13 +4010,16 @@  __flow_hw_pull_comp(struct rte_eth_dev *dev,
 			if (comp[i].status == RTE_FLOW_OP_ERROR)
 				DRV_LOG(WARNING, "Flow flush get error CQE.");
 		}
-		if ((uint32_t)ret > pending_rules) {
-			DRV_LOG(WARNING, "Flow flush get extra CQE.");
-			return rte_flow_error_set(error, ERANGE,
-					RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
-					"get extra CQE");
-		}
-		pending_rules -= ret;
+		/*
+		 * Indirect **SYNC** METER_MARK and CT actions do not
+		 * remove completion after WQE post.
+		 * That implementation avoids HW timeout.
+		 * The completion is removed before the following WQE post.
+		 * However, HWS queue updates do not reflect that behaviour.
+		 * Therefore, during port destruction sync queue may have
+		 * pending completions.
+		 */
+		pending_rules -= RTE_MIN(pending_rules, (uint32_t)ret);
 		empty_loop = 0;
 	}
 	return 0;
@@ -4001,7 +4041,7 @@  flow_hw_q_flow_flush(struct rte_eth_dev *dev,
 		     struct rte_flow_error *error)
 {
 	struct mlx5_priv *priv = dev->data->dev_private;
-	struct mlx5_hw_q *hw_q;
+	struct mlx5_hw_q *hw_q = &priv->hw_q[MLX5_DEFAULT_FLUSH_QUEUE];
 	struct rte_flow_template_table *tbl;
 	struct rte_flow_hw *flow;
 	struct rte_flow_op_attr attr = {
@@ -4020,13 +4060,10 @@  flow_hw_q_flow_flush(struct rte_eth_dev *dev,
 	 * be minus value.
 	 */
 	for (queue = 0; queue < priv->nb_queue; queue++) {
-		hw_q = &priv->hw_q[queue];
-		if (__flow_hw_pull_comp(dev, queue, hw_q->size - hw_q->job_idx,
-					error))
+		if (__flow_hw_pull_comp(dev, queue, error))
 			return -1;
 	}
 	/* Flush flow per-table from MLX5_DEFAULT_FLUSH_QUEUE. */
-	hw_q = &priv->hw_q[MLX5_DEFAULT_FLUSH_QUEUE];
 	LIST_FOREACH(tbl, &priv->flow_hw_tbl, next) {
 		if (!tbl->cfg.external)
 			continue;
@@ -4042,8 +4079,8 @@  flow_hw_q_flow_flush(struct rte_eth_dev *dev,
 			/* Drain completion with queue size. */
 			if (pending_rules >= hw_q->size) {
 				if (__flow_hw_pull_comp(dev,
-						MLX5_DEFAULT_FLUSH_QUEUE,
-						pending_rules, error))
+							MLX5_DEFAULT_FLUSH_QUEUE,
+							error))
 					return -1;
 				pending_rules = 0;
 			}
@@ -4051,8 +4088,7 @@  flow_hw_q_flow_flush(struct rte_eth_dev *dev,
 	}
 	/* Drain left completion. */
 	if (pending_rules &&
-	    __flow_hw_pull_comp(dev, MLX5_DEFAULT_FLUSH_QUEUE, pending_rules,
-				error))
+	    __flow_hw_pull_comp(dev, MLX5_DEFAULT_FLUSH_QUEUE, error))
 		return -1;
 	return 0;
 }
@@ -9911,18 +9947,6 @@  flow_hw_action_push(const struct rte_flow_op_attr *attr)
 	return attr ? !attr->postpone : true;
 }
 
-static __rte_always_inline struct mlx5_hw_q_job *
-flow_hw_job_get(struct mlx5_priv *priv, uint32_t queue)
-{
-	return priv->hw_q[queue].job[--priv->hw_q[queue].job_idx];
-}
-
-static __rte_always_inline void
-flow_hw_job_put(struct mlx5_priv *priv, uint32_t queue)
-{
-	priv->hw_q[queue].job_idx++;
-}
-
 static __rte_always_inline struct mlx5_hw_q_job *
 flow_hw_action_job_init(struct mlx5_priv *priv, uint32_t queue,
 			const struct rte_flow_action_handle *handle,
@@ -9933,13 +9957,13 @@  flow_hw_action_job_init(struct mlx5_priv *priv, uint32_t queue,
 	struct mlx5_hw_q_job *job;
 
 	MLX5_ASSERT(queue != MLX5_HW_INV_QUEUE);
-	if (unlikely(!priv->hw_q[queue].job_idx)) {
+	job = flow_hw_job_get(priv, queue);
+	if (!job) {
 		rte_flow_error_set(error, ENOMEM,
 				   RTE_FLOW_ERROR_TYPE_ACTION_NUM, NULL,
 				   "Action destroy failed due to queue full.");
 		return NULL;
 	}
-	job = flow_hw_job_get(priv, queue);
 	job->type = type;
 	job->action = handle;
 	job->user_data = user_data;
@@ -9953,16 +9977,21 @@  flow_hw_action_finalize(struct rte_eth_dev *dev, uint32_t queue,
 			bool push, bool aso, bool status)
 {
 	struct mlx5_priv *priv = dev->data->dev_private;
+
+	if (queue == MLX5_HW_INV_QUEUE)
+		queue = CTRL_QUEUE_ID(priv);
 	if (likely(status)) {
-		if (push)
-			__flow_hw_push_action(dev, queue);
+		/* 1. add new job to a queue */
 		if (!aso)
 			rte_ring_enqueue(push ?
 					 priv->hw_q[queue].indir_cq :
 					 priv->hw_q[queue].indir_iq,
 					 job);
+		/* 2. send pending jobs */
+		if (push)
+			__flow_hw_push_action(dev, queue);
 	} else {
-		flow_hw_job_put(priv, queue);
+		flow_hw_job_put(priv, job, queue);
 	}
 }
 
@@ -11584,13 +11613,7 @@  flow_hw_create_ctrl_flow(struct rte_eth_dev *owner_dev,
 		ret = -rte_errno;
 		goto error;
 	}
-	ret = flow_hw_push(proxy_dev, queue, NULL);
-	if (ret) {
-		DRV_LOG(ERR, "port %u failed to drain control flow queue",
-			proxy_dev->data->port_id);
-		goto error;
-	}
-	ret = __flow_hw_pull_comp(proxy_dev, queue, 1, NULL);
+	ret = __flow_hw_pull_comp(proxy_dev, queue, NULL);
 	if (ret) {
 		DRV_LOG(ERR, "port %u failed to insert control flow",
 			proxy_dev->data->port_id);
@@ -11651,13 +11674,7 @@  flow_hw_destroy_ctrl_flow(struct rte_eth_dev *dev, struct rte_flow *flow)
 			" flow operation", dev->data->port_id);
 		goto exit;
 	}
-	ret = flow_hw_push(dev, queue, NULL);
-	if (ret) {
-		DRV_LOG(ERR, "port %u failed to drain control flow queue",
-			dev->data->port_id);
-		goto exit;
-	}
-	ret = __flow_hw_pull_comp(dev, queue, 1, NULL);
+	ret = __flow_hw_pull_comp(dev, queue, NULL);
 	if (ret) {
 		DRV_LOG(ERR, "port %u failed to destroy control flow",
 			dev->data->port_id);