net/mlx5: fix sync meter processing in HWS setup

Message ID 20240229105614.593391-1-getelson@nvidia.com (mailing list archive)
State Superseded, archived
Delegated to: Raslan Darawsheh
Headers
Series net/mlx5: fix sync meter processing in HWS setup |

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/loongarch-compilation success Compilation OK
ci/loongarch-unit-testing success Unit Testing PASS
ci/Intel-compilation success Compilation OK
ci/intel-Testing success Testing PASS
ci/iol-intel-Performance success Performance Testing PASS
ci/iol-mellanox-Performance success Performance Testing PASS
ci/iol-compile-amd64-testing fail Testing issues
ci/iol-abi-testing success Testing PASS
ci/intel-Functional success Functional PASS
ci/iol-unit-amd64-testing success Testing PASS
ci/iol-unit-arm64-testing success Testing PASS
ci/iol-compile-arm64-testing success Testing PASS
ci/iol-intel-Functional success Functional Testing PASS
ci/iol-broadcom-Functional success Functional Testing PASS
ci/iol-broadcom-Performance success Performance Testing PASS
ci/iol-sample-apps-testing success Testing PASS

Commit Message

Gregory Etelson Feb. 29, 2024, 10:56 a.m. UTC
  Synchronous calls for meter ASO try to pull pending completions
from CQ, submit WR and return to caller. That avoids delays between
WR post and  HW response.
If the template API was activated, PMD will use control queue for
sync operations.

PMD has different formats for the `user_data` context in sync and
async meter ASO calls.
PMD port destruction procedure submits async operations to the port
control queue and polls the queue CQs to clean HW responses.

Port destruction can pull a meter ASO completion from control CQ.
Such completion has sync format, but was processed by async handler.

The patch implements sync meter ASO interface with async calls
in the template API environment.

Fixes: 48fbb0e93d06 ("net/mlx5: support flow meter mark indirect action with HWS")

Cc: stable@dpdk.org

Signed-off-by: Gregory Etelson <getelson@nvidia.com>
Acked-by: Ori Kam <orika@nvidia.com>
---
 drivers/net/mlx5/mlx5.h            | 108 ++++++++++--------
 drivers/net/mlx5/mlx5_flow_aso.c   | 170 ++++++++++++++++++-----------
 drivers/net/mlx5/mlx5_flow_hw.c    |  98 +++++++++--------
 drivers/net/mlx5/mlx5_flow_meter.c |  27 +++--
 4 files changed, 242 insertions(+), 161 deletions(-)
  

Patch

diff --git a/drivers/net/mlx5/mlx5.h b/drivers/net/mlx5/mlx5.h
index bb1853e797..1575876a46 100644
--- a/drivers/net/mlx5/mlx5.h
+++ b/drivers/net/mlx5/mlx5.h
@@ -392,44 +392,6 @@  enum mlx5_hw_indirect_type {
 
 #define MLX5_HW_MAX_ITEMS (16)
 
-/* HW steering flow management job descriptor. */
-struct mlx5_hw_q_job {
-	uint32_t type; /* Job type. */
-	uint32_t indirect_type;
-	union {
-		struct rte_flow_hw *flow; /* Flow attached to the job. */
-		const void *action; /* Indirect action attached to the job. */
-	};
-	void *user_data; /* Job user data. */
-	uint8_t *encap_data; /* Encap data. */
-	uint8_t *push_data; /* IPv6 routing push data. */
-	struct mlx5_modification_cmd *mhdr_cmd;
-	struct rte_flow_item *items;
-	union {
-		struct {
-			/* User memory for query output */
-			void *user;
-			/* Data extracted from hardware */
-			void *hw;
-		} __rte_packed query;
-		struct rte_flow_item_ethdev port_spec;
-		struct rte_flow_item_tag tag_spec;
-	} __rte_packed;
-	struct rte_flow_hw *upd_flow; /* Flow with updated values. */
-};
-
-/* HW steering job descriptor LIFO pool. */
-struct mlx5_hw_q {
-	uint32_t job_idx; /* Free job index. */
-	uint32_t size; /* LIFO size. */
-	struct mlx5_hw_q_job **job; /* LIFO header. */
-	struct rte_ring *indir_cq; /* Indirect action SW completion queue. */
-	struct rte_ring *indir_iq; /* Indirect action SW in progress queue. */
-	struct rte_ring *flow_transfer_pending;
-	struct rte_ring *flow_transfer_completed;
-} __rte_cache_aligned;
-
-
 #define MLX5_COUNTER_POOLS_MAX_NUM (1 << 15)
 #define MLX5_COUNTERS_PER_POOL 512
 #define MLX5_MAX_PENDING_QUERIES 4
@@ -2025,6 +1987,65 @@  enum dr_dump_rec_type {
 	DR_DUMP_REC_TYPE_PMD_COUNTER = 4430,
 };
 
+/* HW steering flow management job descriptor. */
+struct mlx5_hw_q_job {
+	uint32_t type; /* Job type. */
+	uint32_t indirect_type;
+	union {
+		struct rte_flow_hw *flow; /* Flow attached to the job. */
+		const void *action; /* Indirect action attached to the job. */
+	};
+	void *user_data; /* Job user data. */
+	uint8_t *encap_data; /* Encap data. */
+	uint8_t *push_data; /* IPv6 routing push data. */
+	struct mlx5_modification_cmd *mhdr_cmd;
+	struct rte_flow_item *items;
+	union {
+		struct {
+			/* User memory for query output */
+			void *user;
+			/* Data extracted from hardware */
+			void *hw;
+		} __rte_packed query;
+		struct rte_flow_item_ethdev port_spec;
+		struct rte_flow_item_tag tag_spec;
+	} __rte_packed;
+	struct rte_flow_hw *upd_flow; /* Flow with updated values. */
+};
+
+/* HW steering job descriptor LIFO pool. */
+struct mlx5_hw_q {
+	uint32_t job_idx; /* Free job index. */
+	uint32_t size; /* LIFO size. */
+	struct mlx5_hw_q_job **job; /* LIFO header. */
+	struct rte_ring *indir_cq; /* Indirect action SW completion queue. */
+	struct rte_ring *indir_iq; /* Indirect action SW in progress queue. */
+	struct rte_ring *flow_transfer_pending;
+	struct rte_ring *flow_transfer_completed;
+} __rte_cache_aligned;
+
+static __rte_always_inline struct mlx5_hw_q_job *
+flow_hw_job_get(struct mlx5_priv *priv, uint32_t queue)
+{
+	MLX5_ASSERT(priv->hw_q[queue].job_idx <= priv->hw_q[queue].size);
+	return priv->hw_q[queue].job_idx ?
+	       priv->hw_q[queue].job[--priv->hw_q[queue].job_idx] : NULL;
+}
+
+static __rte_always_inline void
+flow_hw_job_put(struct mlx5_priv *priv, struct mlx5_hw_q_job *job, uint32_t queue)
+{
+	MLX5_ASSERT(priv->hw_q[queue].job_idx < priv->hw_q[queue].size);
+	priv->hw_q[queue].job[priv->hw_q[queue].job_idx++] = job;
+}
+
+struct mlx5_hw_q_job *
+mlx5_flow_action_job_init(struct mlx5_priv *priv, uint32_t queue,
+			  const struct rte_flow_action_handle *handle,
+			  void *user_data, void *query_data,
+			  enum mlx5_hw_job_type type,
+			  struct rte_flow_error *error);
+
 /**
  * Indicates whether HW objects operations can be created by DevX.
  *
@@ -2434,11 +2455,12 @@  int mlx5_aso_flow_hit_queue_poll_start(struct mlx5_dev_ctx_shared *sh);
 int mlx5_aso_flow_hit_queue_poll_stop(struct mlx5_dev_ctx_shared *sh);
 void mlx5_aso_queue_uninit(struct mlx5_dev_ctx_shared *sh,
 			   enum mlx5_access_aso_opc_mod aso_opc_mod);
-int mlx5_aso_meter_update_by_wqe(struct mlx5_dev_ctx_shared *sh, uint32_t queue,
-		struct mlx5_aso_mtr *mtr, struct mlx5_mtr_bulk *bulk,
-		void *user_data, bool push);
-int mlx5_aso_mtr_wait(struct mlx5_dev_ctx_shared *sh, uint32_t queue,
-		struct mlx5_aso_mtr *mtr);
+int mlx5_aso_meter_update_by_wqe(struct mlx5_priv *priv, uint32_t queue,
+				 struct mlx5_aso_mtr *mtr,
+				 struct mlx5_mtr_bulk *bulk,
+				 struct mlx5_hw_q_job *job, bool push);
+int mlx5_aso_mtr_wait(struct mlx5_priv *priv,
+		      struct mlx5_aso_mtr *mtr, bool is_tmpl_api);
 int mlx5_aso_ct_update_by_wqe(struct mlx5_dev_ctx_shared *sh, uint32_t queue,
 			      struct mlx5_aso_ct_action *ct,
 			      const struct rte_flow_action_conntrack *profile,
diff --git a/drivers/net/mlx5/mlx5_flow_aso.c b/drivers/net/mlx5/mlx5_flow_aso.c
index f311443472..6b28dd63ce 100644
--- a/drivers/net/mlx5/mlx5_flow_aso.c
+++ b/drivers/net/mlx5/mlx5_flow_aso.c
@@ -792,7 +792,7 @@  mlx5_aso_mtr_sq_enqueue_single(struct mlx5_dev_ctx_shared *sh,
 			       struct mlx5_aso_mtr *aso_mtr,
 			       struct mlx5_mtr_bulk *bulk,
 			       bool need_lock,
-			       void *user_data,
+			       struct mlx5_hw_q_job *job,
 			       bool push)
 {
 	volatile struct mlx5_aso_wqe *wqe = NULL;
@@ -819,7 +819,7 @@  mlx5_aso_mtr_sq_enqueue_single(struct mlx5_dev_ctx_shared *sh,
 	rte_prefetch0(&sq->sq_obj.aso_wqes[(sq->head + 1) & mask]);
 	/* Fill next WQE. */
 	fm = &aso_mtr->fm;
-	sq->elts[sq->head & mask].mtr = user_data ? user_data : aso_mtr;
+	sq->elts[sq->head & mask].user_data = job ? job : (void *)aso_mtr;
 	if (aso_mtr->type == ASO_METER_INDIRECT) {
 		if (likely(sh->config.dv_flow_en == 2))
 			pool = aso_mtr->pool;
@@ -897,24 +897,6 @@  mlx5_aso_mtr_sq_enqueue_single(struct mlx5_dev_ctx_shared *sh,
 	return 1;
 }
 
-static void
-mlx5_aso_mtrs_status_update(struct mlx5_aso_sq *sq, uint16_t aso_mtrs_nums)
-{
-	uint16_t size = 1 << sq->log_desc_n;
-	uint16_t mask = size - 1;
-	uint16_t i;
-	struct mlx5_aso_mtr *aso_mtr = NULL;
-	uint8_t exp_state = ASO_METER_WAIT;
-
-	for (i = 0; i < aso_mtrs_nums; ++i) {
-		aso_mtr = sq->elts[(sq->tail + i) & mask].mtr;
-		MLX5_ASSERT(aso_mtr);
-		(void)__atomic_compare_exchange_n(&aso_mtr->state,
-				&exp_state, ASO_METER_READY,
-				false, __ATOMIC_RELAXED, __ATOMIC_RELAXED);
-	}
-}
-
 static void
 mlx5_aso_mtr_completion_handle(struct mlx5_aso_sq *sq, bool need_lock)
 {
@@ -925,7 +907,7 @@  mlx5_aso_mtr_completion_handle(struct mlx5_aso_sq *sq, bool need_lock)
 	uint32_t idx;
 	uint32_t next_idx = cq->cq_ci & mask;
 	uint16_t max;
-	uint16_t n = 0;
+	uint16_t i, n = 0;
 	int ret;
 
 	if (need_lock)
@@ -957,7 +939,19 @@  mlx5_aso_mtr_completion_handle(struct mlx5_aso_sq *sq, bool need_lock)
 		cq->cq_ci++;
 	} while (1);
 	if (likely(n)) {
-		mlx5_aso_mtrs_status_update(sq, n);
+		uint8_t exp_state = ASO_METER_WAIT;
+		struct mlx5_aso_mtr *aso_mtr;
+		__rte_unused bool verdict;
+
+		for (i = 0; i < n; ++i) {
+			aso_mtr = sq->elts[(sq->tail + i) & mask].mtr;
+			MLX5_ASSERT(aso_mtr);
+			verdict = __atomic_compare_exchange_n(&aso_mtr->state,
+						    &exp_state, ASO_METER_READY,
+						    false, __ATOMIC_RELAXED,
+						    __ATOMIC_RELAXED);
+			MLX5_ASSERT(verdict);
+		}
 		sq->tail += n;
 		rte_io_wmb();
 		cq->cq_obj.db_rec[0] = rte_cpu_to_be_32(cq->cq_ci);
@@ -966,6 +960,74 @@  mlx5_aso_mtr_completion_handle(struct mlx5_aso_sq *sq, bool need_lock)
 		rte_spinlock_unlock(&sq->sqsl);
 }
 
+static __rte_always_inline struct mlx5_aso_sq *
+mlx5_aso_mtr_select_sq(struct mlx5_dev_ctx_shared *sh, uint32_t queue,
+		       struct mlx5_aso_mtr *mtr, bool *need_lock)
+{
+	struct mlx5_aso_sq *sq;
+
+	if (likely(sh->config.dv_flow_en == 2) &&
+	    mtr->type == ASO_METER_INDIRECT) {
+		if (queue == MLX5_HW_INV_QUEUE) {
+			sq = &mtr->pool->sq[mtr->pool->nb_sq - 1];
+			*need_lock = true;
+		} else {
+			sq = &mtr->pool->sq[queue];
+			*need_lock = false;
+		}
+	} else {
+		sq = &sh->mtrmng->pools_mng.sq;
+		*need_lock = true;
+	}
+	return sq;
+}
+
+static void
+mlx5_aso_poll_cq_mtr_hws(struct mlx5_priv *priv, struct mlx5_aso_sq *sq)
+{
+#define MLX5_HWS_MTR_CMPL_NUM 4
+
+	int i, ret;
+	struct mlx5_aso_mtr *mtr;
+	uint8_t exp_state = ASO_METER_WAIT;
+	struct rte_flow_op_result res[MLX5_HWS_MTR_CMPL_NUM];
+	__rte_unused bool verdict;
+
+	rte_spinlock_lock(&sq->sqsl);
+repeat:
+	ret = mlx5_aso_pull_completion(sq, res, MLX5_HWS_MTR_CMPL_NUM);
+	if (ret) {
+		for (i = 0; i < ret; i++) {
+			struct mlx5_hw_q_job *job = res[i].user_data;
+
+			MLX5_ASSERT(job);
+			mtr = mlx5_ipool_get(priv->hws_mpool->idx_pool,
+					     MLX5_INDIRECT_ACTION_IDX_GET(job->action));
+			MLX5_ASSERT(mtr);
+			verdict = __atomic_compare_exchange_n(&mtr->state,
+						    &exp_state, ASO_METER_READY,
+						    false, __ATOMIC_RELAXED,
+						    __ATOMIC_RELAXED);
+			MLX5_ASSERT(verdict);
+			flow_hw_job_put(priv, job, CTRL_QUEUE_ID(priv));
+		}
+		if (ret == MLX5_HWS_MTR_CMPL_NUM)
+			goto repeat;
+	}
+	rte_spinlock_unlock(&sq->sqsl);
+
+#undef MLX5_HWS_MTR_CMPL_NUM
+}
+
+static void
+mlx5_aso_poll_cq_mtr_sws(__rte_unused struct mlx5_priv *priv,
+			 struct mlx5_aso_sq *sq)
+{
+	mlx5_aso_mtr_completion_handle(sq, true);
+}
+
+typedef void (*poll_cq_t)(struct mlx5_priv *, struct mlx5_aso_sq *);
+
 /**
  * Update meter parameter by send WQE.
  *
@@ -980,39 +1042,29 @@  mlx5_aso_mtr_completion_handle(struct mlx5_aso_sq *sq, bool need_lock)
  *   0 on success, a negative errno value otherwise and rte_errno is set.
  */
 int
-mlx5_aso_meter_update_by_wqe(struct mlx5_dev_ctx_shared *sh, uint32_t queue,
-			struct mlx5_aso_mtr *mtr,
-			struct mlx5_mtr_bulk *bulk,
-			void *user_data,
-			bool push)
+mlx5_aso_meter_update_by_wqe(struct mlx5_priv *priv, uint32_t queue,
+			     struct mlx5_aso_mtr *mtr,
+			     struct mlx5_mtr_bulk *bulk,
+			     struct mlx5_hw_q_job *job, bool push)
 {
-	struct mlx5_aso_sq *sq;
-	uint32_t poll_wqe_times = MLX5_MTR_POLL_WQE_CQE_TIMES;
 	bool need_lock;
+	struct mlx5_dev_ctx_shared *sh = priv->sh;
+	struct mlx5_aso_sq *sq =
+		mlx5_aso_mtr_select_sq(sh, queue, mtr, &need_lock);
+	uint32_t poll_wqe_times = MLX5_MTR_POLL_WQE_CQE_TIMES;
+	poll_cq_t poll_mtr_cq =
+		job ? mlx5_aso_poll_cq_mtr_hws : mlx5_aso_poll_cq_mtr_sws;
 	int ret;
 
-	if (likely(sh->config.dv_flow_en == 2) &&
-	    mtr->type == ASO_METER_INDIRECT) {
-		if (queue == MLX5_HW_INV_QUEUE) {
-			sq = &mtr->pool->sq[mtr->pool->nb_sq - 1];
-			need_lock = true;
-		} else {
-			sq = &mtr->pool->sq[queue];
-			need_lock = false;
-		}
-	} else {
-		sq = &sh->mtrmng->pools_mng.sq;
-		need_lock = true;
-	}
 	if (queue != MLX5_HW_INV_QUEUE) {
 		ret = mlx5_aso_mtr_sq_enqueue_single(sh, sq, mtr, bulk,
-						     need_lock, user_data, push);
+						     need_lock, job, push);
 		return ret > 0 ? 0 : -1;
 	}
 	do {
-		mlx5_aso_mtr_completion_handle(sq, need_lock);
+		poll_mtr_cq(priv, sq);
 		if (mlx5_aso_mtr_sq_enqueue_single(sh, sq, mtr, bulk,
-						   need_lock, NULL, true))
+						   need_lock, job, true))
 			return 0;
 		/* Waiting for wqe resource. */
 		rte_delay_us_sleep(MLX5_ASO_WQE_CQE_RESPONSE_DELAY);
@@ -1036,32 +1088,22 @@  mlx5_aso_meter_update_by_wqe(struct mlx5_dev_ctx_shared *sh, uint32_t queue,
  *   0 on success, a negative errno value otherwise and rte_errno is set.
  */
 int
-mlx5_aso_mtr_wait(struct mlx5_dev_ctx_shared *sh, uint32_t queue,
-			struct mlx5_aso_mtr *mtr)
+mlx5_aso_mtr_wait(struct mlx5_priv *priv,
+			struct mlx5_aso_mtr *mtr, bool is_tmpl_api)
 {
+	bool need_lock;
 	struct mlx5_aso_sq *sq;
+	struct mlx5_dev_ctx_shared *sh = priv->sh;
 	uint32_t poll_cqe_times = MLX5_MTR_POLL_WQE_CQE_TIMES;
-	uint8_t state;
-	bool need_lock;
+	uint8_t state = __atomic_load_n(&mtr->state, __ATOMIC_RELAXED);
+	poll_cq_t poll_mtr_cq =
+		is_tmpl_api ? mlx5_aso_poll_cq_mtr_hws : mlx5_aso_poll_cq_mtr_sws;
 
-	if (likely(sh->config.dv_flow_en == 2) &&
-	    mtr->type == ASO_METER_INDIRECT) {
-		if (queue == MLX5_HW_INV_QUEUE) {
-			sq = &mtr->pool->sq[mtr->pool->nb_sq - 1];
-			need_lock = true;
-		} else {
-			sq = &mtr->pool->sq[queue];
-			need_lock = false;
-		}
-	} else {
-		sq = &sh->mtrmng->pools_mng.sq;
-		need_lock = true;
-	}
-	state = __atomic_load_n(&mtr->state, __ATOMIC_RELAXED);
 	if (state == ASO_METER_READY || state == ASO_METER_WAIT_ASYNC)
 		return 0;
+	sq = mlx5_aso_mtr_select_sq(sh, MLX5_HW_INV_QUEUE, mtr, &need_lock);
 	do {
-		mlx5_aso_mtr_completion_handle(sq, need_lock);
+		poll_mtr_cq(priv, sq);
 		if (__atomic_load_n(&mtr->state, __ATOMIC_RELAXED) ==
 					    ASO_METER_READY)
 			return 0;
diff --git a/drivers/net/mlx5/mlx5_flow_hw.c b/drivers/net/mlx5/mlx5_flow_hw.c
index f778fd0698..92894bf266 100644
--- a/drivers/net/mlx5/mlx5_flow_hw.c
+++ b/drivers/net/mlx5/mlx5_flow_hw.c
@@ -79,6 +79,12 @@  struct mlx5_indlst_legacy {
 #define MLX5_CONST_ENCAP_ITEM(encap_type, ptr) \
 (((const struct encap_type *)(ptr))->definition)
 
+static __rte_always_inline struct mlx5_hw_q_job *
+flow_hw_action_job_init(struct mlx5_priv *priv, uint32_t queue,
+			const struct rte_flow_action_handle *handle,
+			void *user_data, void *query_data,
+			enum mlx5_hw_job_type type,
+			struct rte_flow_error *error);
 static int
 mlx5_tbl_multi_pattern_process(struct rte_eth_dev *dev,
 			       struct rte_flow_template_table *tbl,
@@ -252,21 +258,6 @@  static const struct rte_flow_item_eth ctrl_rx_eth_bcast_spec = {
 	.hdr.ether_type = 0,
 };
 
-static __rte_always_inline struct mlx5_hw_q_job *
-flow_hw_job_get(struct mlx5_priv *priv, uint32_t queue)
-{
-	MLX5_ASSERT(priv->hw_q[queue].job_idx <= priv->hw_q[queue].size);
-	return priv->hw_q[queue].job_idx ?
-	       priv->hw_q[queue].job[--priv->hw_q[queue].job_idx] : NULL;
-}
-
-static __rte_always_inline void
-flow_hw_job_put(struct mlx5_priv *priv, struct mlx5_hw_q_job *job, uint32_t queue)
-{
-	MLX5_ASSERT(priv->hw_q[queue].job_idx < priv->hw_q[queue].size);
-	priv->hw_q[queue].job[priv->hw_q[queue].job_idx++] = job;
-}
-
 static inline enum mlx5dr_matcher_insert_mode
 flow_hw_matcher_insert_mode_get(enum rte_flow_table_insertion_type insert_type)
 {
@@ -1428,7 +1419,7 @@  flow_hw_meter_compile(struct rte_eth_dev *dev,
 	acts->rule_acts[jump_pos].action = (!!group) ?
 				    acts->jump->hws_action :
 				    acts->jump->root_action;
-	if (mlx5_aso_mtr_wait(priv->sh, MLX5_HW_INV_QUEUE, aso_mtr))
+	if (mlx5_aso_mtr_wait(priv, aso_mtr, true))
 		return -ENOMEM;
 	return 0;
 }
@@ -1505,7 +1496,7 @@  static rte_be32_t vlan_hdr_to_be32(const struct rte_flow_action *actions)
 static __rte_always_inline struct mlx5_aso_mtr *
 flow_hw_meter_mark_alloc(struct rte_eth_dev *dev, uint32_t queue,
 			 const struct rte_flow_action *action,
-			 void *user_data, bool push,
+			 struct mlx5_hw_q_job *job, bool push,
 			 struct rte_flow_error *error)
 {
 	struct mlx5_priv *priv = dev->data->dev_private;
@@ -1514,6 +1505,8 @@  flow_hw_meter_mark_alloc(struct rte_eth_dev *dev, uint32_t queue,
 	struct mlx5_aso_mtr *aso_mtr;
 	struct mlx5_flow_meter_info *fm;
 	uint32_t mtr_id;
+	uintptr_t handle = (uintptr_t)MLX5_INDIRECT_ACTION_TYPE_METER_MARK <<
+					MLX5_INDIRECT_ACTION_TYPE_OFFSET;
 
 	if (priv->shared_host) {
 		rte_flow_error_set(error, ENOTSUP, RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
@@ -1537,15 +1530,16 @@  flow_hw_meter_mark_alloc(struct rte_eth_dev *dev, uint32_t queue,
 			  ASO_METER_WAIT : ASO_METER_WAIT_ASYNC;
 	aso_mtr->offset = mtr_id - 1;
 	aso_mtr->init_color = fm->color_aware ? RTE_COLORS : RTE_COLOR_GREEN;
+	job->action = (void *)(handle | mtr_id);
 	/* Update ASO flow meter by wqe. */
-	if (mlx5_aso_meter_update_by_wqe(priv->sh, queue, aso_mtr,
-					 &priv->mtr_bulk, user_data, push)) {
+	if (mlx5_aso_meter_update_by_wqe(priv, queue, aso_mtr,
+					 &priv->mtr_bulk, job, push)) {
 		mlx5_ipool_free(pool->idx_pool, mtr_id);
 		return NULL;
 	}
 	/* Wait for ASO object completion. */
 	if (queue == MLX5_HW_INV_QUEUE &&
-	    mlx5_aso_mtr_wait(priv->sh, MLX5_HW_INV_QUEUE, aso_mtr)) {
+	    mlx5_aso_mtr_wait(priv, aso_mtr, true)) {
 		mlx5_ipool_free(pool->idx_pool, mtr_id);
 		return NULL;
 	}
@@ -1564,10 +1558,17 @@  flow_hw_meter_mark_compile(struct rte_eth_dev *dev,
 	struct mlx5_priv *priv = dev->data->dev_private;
 	struct mlx5_aso_mtr_pool *pool = priv->hws_mpool;
 	struct mlx5_aso_mtr *aso_mtr;
+	struct mlx5_hw_q_job *job =
+		flow_hw_action_job_init(priv, queue, NULL, NULL, NULL,
+					MLX5_HW_Q_JOB_TYPE_CREATE, NULL);
 
-	aso_mtr = flow_hw_meter_mark_alloc(dev, queue, action, NULL, true, error);
-	if (!aso_mtr)
+	if (!job)
+		return -1;
+	aso_mtr = flow_hw_meter_mark_alloc(dev, queue, action, job, true, error);
+	if (!aso_mtr) {
+		flow_hw_job_put(priv, job, queue);
 		return -1;
+	}
 
 	/* Compile METER_MARK action */
 	acts[aso_mtr_pos].action = pool->action;
@@ -3090,7 +3091,7 @@  flow_hw_actions_construct(struct rte_eth_dev *dev,
 							 jump->root_action;
 			job->flow->jump = jump;
 			job->flow->fate_type = MLX5_FLOW_FATE_JUMP;
-			if (mlx5_aso_mtr_wait(priv->sh, MLX5_HW_INV_QUEUE, aso_mtr))
+			if (mlx5_aso_mtr_wait(priv, aso_mtr, true))
 				return -1;
 			break;
 		case RTE_FLOW_ACTION_TYPE_AGE:
@@ -3813,13 +3814,6 @@  flow_hw_pull_legacy_indirect_comp(struct rte_eth_dev *dev, struct mlx5_hw_q_job
 						job->query.hw);
 			aso_ct->state = ASO_CONNTRACK_READY;
 		}
-	} else {
-		/*
-		 * rte_flow_op_result::user data can point to
-		 * struct mlx5_aso_mtr object as well
-		 */
-		if (queue != CTRL_QUEUE_ID(priv))
-			MLX5_ASSERT(false);
 	}
 }
 
@@ -10617,7 +10611,8 @@  flow_hw_action_job_init(struct mlx5_priv *priv, uint32_t queue,
 {
 	struct mlx5_hw_q_job *job;
 
-	MLX5_ASSERT(queue != MLX5_HW_INV_QUEUE);
+	if (queue == MLX5_HW_INV_QUEUE)
+		queue = CTRL_QUEUE_ID(priv);
 	job = flow_hw_job_get(priv, queue);
 	if (!job) {
 		rte_flow_error_set(error, ENOMEM,
@@ -10632,6 +10627,17 @@  flow_hw_action_job_init(struct mlx5_priv *priv, uint32_t queue,
 	return job;
 }
 
+struct mlx5_hw_q_job *
+mlx5_flow_action_job_init(struct mlx5_priv *priv, uint32_t queue,
+			  const struct rte_flow_action_handle *handle,
+			  void *user_data, void *query_data,
+			  enum mlx5_hw_job_type type,
+			  struct rte_flow_error *error)
+{
+	return flow_hw_action_job_init(priv, queue, handle, user_data, query_data,
+				       type, error);
+}
+
 static __rte_always_inline void
 flow_hw_action_finalize(struct rte_eth_dev *dev, uint32_t queue,
 			struct mlx5_hw_q_job *job,
@@ -10691,12 +10697,12 @@  flow_hw_action_handle_create(struct rte_eth_dev *dev, uint32_t queue,
 	const struct rte_flow_action_age *age;
 	struct mlx5_aso_mtr *aso_mtr;
 	cnt_id_t cnt_id;
-	uint32_t mtr_id;
 	uint32_t age_idx;
 	bool push = flow_hw_action_push(attr);
 	bool aso = false;
+	bool force_job = action->type == RTE_FLOW_ACTION_TYPE_METER_MARK;
 
-	if (attr) {
+	if (attr || force_job) {
 		job = flow_hw_action_job_init(priv, queue, NULL, user_data,
 					      NULL, MLX5_HW_Q_JOB_TYPE_CREATE,
 					      error);
@@ -10751,9 +10757,7 @@  flow_hw_action_handle_create(struct rte_eth_dev *dev, uint32_t queue,
 		aso_mtr = flow_hw_meter_mark_alloc(dev, queue, action, job, push, error);
 		if (!aso_mtr)
 			break;
-		mtr_id = (MLX5_INDIRECT_ACTION_TYPE_METER_MARK <<
-			MLX5_INDIRECT_ACTION_TYPE_OFFSET) | (aso_mtr->fm.meter_id);
-		handle = (struct rte_flow_action_handle *)(uintptr_t)mtr_id;
+		handle = (void *)(uintptr_t)job->action;
 		break;
 	case RTE_FLOW_ACTION_TYPE_RSS:
 		handle = flow_dv_action_create(dev, conf, action, error);
@@ -10768,7 +10772,7 @@  flow_hw_action_handle_create(struct rte_eth_dev *dev, uint32_t queue,
 				   NULL, "action type not supported");
 		break;
 	}
-	if (job) {
+	if (job && !force_job) {
 		job->action = handle;
 		job->indirect_type = MLX5_HW_INDIRECT_TYPE_LEGACY;
 		flow_hw_action_finalize(dev, queue, job, push, aso,
@@ -10801,15 +10805,17 @@  mlx5_flow_update_meter_mark(struct rte_eth_dev *dev, uint32_t queue,
 		fm->color_aware = meter_mark->color_mode;
 	if (upd_meter_mark->state_valid)
 		fm->is_enable = meter_mark->state;
+	aso_mtr->state = (queue == MLX5_HW_INV_QUEUE) ?
+			 ASO_METER_WAIT : ASO_METER_WAIT_ASYNC;
 	/* Update ASO flow meter by wqe. */
-	if (mlx5_aso_meter_update_by_wqe(priv->sh, queue,
+	if (mlx5_aso_meter_update_by_wqe(priv, queue,
 					 aso_mtr, &priv->mtr_bulk, job, push))
 		return rte_flow_error_set(error, EINVAL,
 					  RTE_FLOW_ERROR_TYPE_UNSPECIFIED,
 					  NULL, "Unable to update ASO meter WQE");
 	/* Wait for ASO object completion. */
 	if (queue == MLX5_HW_INV_QUEUE &&
-	    mlx5_aso_mtr_wait(priv->sh, MLX5_HW_INV_QUEUE, aso_mtr))
+	    mlx5_aso_mtr_wait(priv, aso_mtr, true))
 		return rte_flow_error_set(error, EINVAL,
 					  RTE_FLOW_ERROR_TYPE_UNSPECIFIED,
 					  NULL, "Unable to wait for ASO meter CQE");
@@ -10855,8 +10861,9 @@  flow_hw_action_handle_update(struct rte_eth_dev *dev, uint32_t queue,
 	int ret = 0;
 	bool push = flow_hw_action_push(attr);
 	bool aso = false;
+	bool force_job = type == MLX5_INDIRECT_ACTION_TYPE_METER_MARK;
 
-	if (attr) {
+	if (attr || force_job) {
 		job = flow_hw_action_job_init(priv, queue, handle, user_data,
 					      NULL, MLX5_HW_Q_JOB_TYPE_UPDATE,
 					      error);
@@ -10893,7 +10900,7 @@  flow_hw_action_handle_update(struct rte_eth_dev *dev, uint32_t queue,
 					  "action type not supported");
 		break;
 	}
-	if (job)
+	if (job && !force_job)
 		flow_hw_action_finalize(dev, queue, job, push, aso, ret == 0);
 	return ret;
 }
@@ -10936,8 +10943,9 @@  flow_hw_action_handle_destroy(struct rte_eth_dev *dev, uint32_t queue,
 	bool push = flow_hw_action_push(attr);
 	bool aso = false;
 	int ret = 0;
+	bool force_job = type == MLX5_INDIRECT_ACTION_TYPE_METER_MARK;
 
-	if (attr) {
+	if (attr || force_job) {
 		job = flow_hw_action_job_init(priv, queue, handle, user_data,
 					      NULL, MLX5_HW_Q_JOB_TYPE_DESTROY,
 					      error);
@@ -10973,7 +10981,7 @@  flow_hw_action_handle_destroy(struct rte_eth_dev *dev, uint32_t queue,
 		fm = &aso_mtr->fm;
 		fm->is_enable = 0;
 		/* Update ASO flow meter by wqe. */
-		if (mlx5_aso_meter_update_by_wqe(priv->sh, queue, aso_mtr,
+		if (mlx5_aso_meter_update_by_wqe(priv, queue, aso_mtr,
 						 &priv->mtr_bulk, job, push)) {
 			ret = -EINVAL;
 			rte_flow_error_set(error, EINVAL,
@@ -10983,7 +10991,7 @@  flow_hw_action_handle_destroy(struct rte_eth_dev *dev, uint32_t queue,
 		}
 		/* Wait for ASO object completion. */
 		if (queue == MLX5_HW_INV_QUEUE &&
-		    mlx5_aso_mtr_wait(priv->sh, MLX5_HW_INV_QUEUE, aso_mtr)) {
+		    mlx5_aso_mtr_wait(priv, aso_mtr, true)) {
 			ret = -EINVAL;
 			rte_flow_error_set(error, EINVAL,
 				RTE_FLOW_ERROR_TYPE_UNSPECIFIED,
@@ -11007,7 +11015,7 @@  flow_hw_action_handle_destroy(struct rte_eth_dev *dev, uint32_t queue,
 					  "action type not supported");
 		break;
 	}
-	if (job)
+	if (job && !force_job)
 		flow_hw_action_finalize(dev, queue, job, push, aso, ret == 0);
 	return ret;
 }
diff --git a/drivers/net/mlx5/mlx5_flow_meter.c b/drivers/net/mlx5/mlx5_flow_meter.c
index c0578ce6e9..0f81877714 100644
--- a/drivers/net/mlx5/mlx5_flow_meter.c
+++ b/drivers/net/mlx5/mlx5_flow_meter.c
@@ -1892,12 +1892,12 @@  mlx5_flow_meter_action_modify(struct mlx5_priv *priv,
 	if (sh->meter_aso_en) {
 		fm->is_enable = !!is_enable;
 		aso_mtr = container_of(fm, struct mlx5_aso_mtr, fm);
-		ret = mlx5_aso_meter_update_by_wqe(sh, MLX5_HW_INV_QUEUE,
+		ret = mlx5_aso_meter_update_by_wqe(priv, MLX5_HW_INV_QUEUE,
 						   aso_mtr, &priv->mtr_bulk,
 						   NULL, true);
 		if (ret)
 			return ret;
-		ret = mlx5_aso_mtr_wait(sh, MLX5_HW_INV_QUEUE, aso_mtr);
+		ret = mlx5_aso_mtr_wait(priv, aso_mtr, false);
 		if (ret)
 			return ret;
 	} else {
@@ -2143,7 +2143,7 @@  mlx5_flow_meter_create(struct rte_eth_dev *dev, uint32_t meter_id,
 	/* If ASO meter supported, update ASO flow meter by wqe. */
 	if (priv->sh->meter_aso_en) {
 		aso_mtr = container_of(fm, struct mlx5_aso_mtr, fm);
-		ret = mlx5_aso_meter_update_by_wqe(priv->sh, MLX5_HW_INV_QUEUE,
+		ret = mlx5_aso_meter_update_by_wqe(priv, MLX5_HW_INV_QUEUE,
 						   aso_mtr, &priv->mtr_bulk, NULL, true);
 		if (ret)
 			goto error;
@@ -2204,6 +2204,7 @@  mlx5_flow_meter_hws_create(struct rte_eth_dev *dev, uint32_t meter_id,
 	struct mlx5_flow_meter_info *fm;
 	struct mlx5_flow_meter_policy *policy = NULL;
 	struct mlx5_aso_mtr *aso_mtr;
+	struct mlx5_hw_q_job *job;
 	int ret;
 
 	if (!priv->mtr_profile_arr ||
@@ -2249,12 +2250,20 @@  mlx5_flow_meter_hws_create(struct rte_eth_dev *dev, uint32_t meter_id,
 	fm->shared = !!shared;
 	fm->initialized = 1;
 	/* Update ASO flow meter by wqe. */
-	ret = mlx5_aso_meter_update_by_wqe(priv->sh, MLX5_HW_INV_QUEUE, aso_mtr,
-					   &priv->mtr_bulk, NULL, true);
-	if (ret)
+	job = mlx5_flow_action_job_init(priv, MLX5_HW_INV_QUEUE, NULL, NULL,
+					NULL, MLX5_HW_Q_JOB_TYPE_CREATE, NULL);
+	if (!job)
+		return -rte_mtr_error_set(error, ENOMEM,
+					  RTE_MTR_ERROR_TYPE_MTR_ID,
+					  NULL, "No job context.");
+	ret = mlx5_aso_meter_update_by_wqe(priv, MLX5_HW_INV_QUEUE, aso_mtr,
+					   &priv->mtr_bulk, job, true);
+	if (ret) {
+		flow_hw_job_put(priv, job, MLX5_HW_INV_QUEUE);
 		return -rte_mtr_error_set(error, ENOTSUP,
-			RTE_MTR_ERROR_TYPE_UNSPECIFIED,
-			NULL, "Failed to create devx meter.");
+					  RTE_MTR_ERROR_TYPE_UNSPECIFIED,
+					  NULL, "Failed to create devx meter.");
+	}
 	fm->active_state = params->meter_enable;
 	__atomic_fetch_add(&fm->profile->ref_cnt, 1, __ATOMIC_RELAXED);
 	__atomic_fetch_add(&policy->ref_cnt, 1, __ATOMIC_RELAXED);
@@ -2898,7 +2907,7 @@  mlx5_flow_meter_attach(struct mlx5_priv *priv,
 		struct mlx5_aso_mtr *aso_mtr;
 
 		aso_mtr = container_of(fm, struct mlx5_aso_mtr, fm);
-		if (mlx5_aso_mtr_wait(priv->sh, MLX5_HW_INV_QUEUE, aso_mtr)) {
+		if (mlx5_aso_mtr_wait(priv, aso_mtr, false)) {
 			return rte_flow_error_set(error, ENOENT,
 					RTE_FLOW_ERROR_TYPE_UNSPECIFIED,
 					NULL,