diff mbox series

[v4,2/5] vhost: add unsafe API to drain pkts in async vhost

Message ID 20210716025923.27164-3-cheng1.jiang@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Maxime Coquelin
Headers show
Series vhost: handle memory hotplug for async vhost | expand

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Cheng Jiang July 16, 2021, 2:59 a.m. UTC
Applications need to stop DMA transfers and finish all the in-flight
pkts when in VM memory hot-plug case and async vhost is used. This
patch is to provide an unsafe API to drain in-flight pkts which are
submitted to DMA engine in vhost async data path.

Signed-off-by: Cheng Jiang <cheng1.jiang@intel.com>
---
 lib/vhost/rte_vhost_async.h | 24 ++++++++++
 lib/vhost/version.map       |  3 ++
 lib/vhost/virtio_net.c      | 90 +++++++++++++++++++++++++++----------
 3 files changed, 94 insertions(+), 23 deletions(-)

Comments

Xia, Chenbo July 16, 2021, 8:56 a.m. UTC | #1
Hi Cheng,

> -----Original Message-----
> From: Jiang, Cheng1 <cheng1.jiang@intel.com>
> Sent: Friday, July 16, 2021 10:59 AM
> To: maxime.coquelin@redhat.com; Xia, Chenbo <chenbo.xia@intel.com>
> Cc: dev@dpdk.org; Hu, Jiayu <jiayu.hu@intel.com>; Yang, YvonneX
> <yvonnex.yang@intel.com>; Jiang, Cheng1 <cheng1.jiang@intel.com>
> Subject: [PATCH v4 2/5] vhost: add unsafe API to drain pkts in async vhost
> 
> Applications need to stop DMA transfers and finish all the in-flight
> pkts when in VM memory hot-plug case and async vhost is used. This

Pkts -> packets

> patch is to provide an unsafe API to drain in-flight pkts which are

Ditto

> submitted to DMA engine in vhost async data path.
> 
> Signed-off-by: Cheng Jiang <cheng1.jiang@intel.com>
> ---
>  lib/vhost/rte_vhost_async.h | 24 ++++++++++
>  lib/vhost/version.map       |  3 ++
>  lib/vhost/virtio_net.c      | 90 +++++++++++++++++++++++++++----------
>  3 files changed, 94 insertions(+), 23 deletions(-)
> 
> diff --git a/lib/vhost/rte_vhost_async.h b/lib/vhost/rte_vhost_async.h
> index bc81cd0caa..fd622631b2 100644
> --- a/lib/vhost/rte_vhost_async.h
> +++ b/lib/vhost/rte_vhost_async.h
> @@ -193,4 +193,28 @@ __rte_experimental
>  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
>  		struct rte_mbuf **pkts, uint16_t count);
> 
> +/**
> + * This function checks async completion status and empty all pakcets

Pakcets -> packets

> + * for a specific vhost device queue. Packets which are inflight will
> + * be returned in an array.
> + *
> + * @note This function does not perform any locking
> + *
> + * @param vid
> + *  id of vhost device to enqueue data

id -> ID

to drain data?

> + * @param queue_id
> + *  queue id to enqueue data

Ditto

> + * @param pkts
> + *  blank array to get return packet pointer

Return -> returned

> + * @param count
> + *  size of the packet array
> + * @param times
> + *  max number of poll attempts
> + * @return
> + *  num of packets returned

num -> Number

And please use capital for first character in each line of param description.

> + */
> +__rte_experimental
> +uint16_t rte_vhost_try_drain_queue_thread_unsafe(int vid, uint16_t queue_id,

I think 'rte_vhost_drain_queue_thread_unsafe' is better?

> +		struct rte_mbuf **pkts, uint16_t count, uint16_t times);
> +
>  #endif /* _RTE_VHOST_ASYNC_H_ */
> diff --git a/lib/vhost/version.map b/lib/vhost/version.map
> index 9103a23cd4..b8fc8770dd 100644
> --- a/lib/vhost/version.map
> +++ b/lib/vhost/version.map
> @@ -79,4 +79,7 @@ EXPERIMENTAL {
> 
>  	# added in 21.05
>  	rte_vhost_get_negotiated_protocol_features;
> +
> +	# added in 21.08
> +	rte_vhost_try_drain_queue_thread_unsafe;
>  };
> diff --git a/lib/vhost/virtio_net.c b/lib/vhost/virtio_net.c
> index 8156796a46..9f541679b9 100644
> --- a/lib/vhost/virtio_net.c
> +++ b/lib/vhost/virtio_net.c
> @@ -2115,10 +2115,10 @@ write_back_completed_descs_packed(struct
> vhost_virtqueue *vq,
>  	} while (nr_left > 0);
>  }
> 
> -uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> +static __rte_always_inline uint16_t
> +vhost_poll_enqueue_completed(struct virtio_net *dev, uint16_t queue_id,
>  		struct rte_mbuf **pkts, uint16_t count)
>  {
> -	struct virtio_net *dev = get_device(vid);
>  	struct vhost_virtqueue *vq;
>  	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0, n_buffers = 0;
>  	uint16_t start_idx, pkts_idx, vq_size;
> @@ -2126,26 +2126,8 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid,
> uint16_t queue_id,
>  	uint16_t from, i;
>  	int32_t n_poll;
> 
> -	if (!dev)
> -		return 0;
> -
> -	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
> -	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
> -		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
> -			dev->vid, __func__, queue_id);
> -		return 0;
> -	}
> -
>  	vq = dev->virtqueue[queue_id];
> 
> -	if (unlikely(!vq->async_registered)) {
> -		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue
> id %d.\n",
> -			dev->vid, __func__, queue_id);
> -		return 0;
> -	}
> -
> -	rte_spinlock_lock(&vq->access_lock);
> -
>  	pkts_idx = vq->async_pkts_idx % vq->size;
>  	pkts_info = vq->async_pkts_info;
>  	vq_size = vq->size;
> @@ -2153,7 +2135,7 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid,
> uint16_t queue_id,
>  		vq_size, vq->async_pkts_inflight_n);
> 
>  	if (count > vq->async_last_pkts_n) {
> -		n_poll = vq->async_ops.check_completed_copies(vid,
> +		n_poll = vq->async_ops.check_completed_copies(dev->vid,
>  			queue_id, 0, count - vq->async_last_pkts_n);
>  		if (n_poll >= 0) {
>  			n_pkts_cpl = n_poll;
> @@ -2168,7 +2150,7 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid,
> uint16_t queue_id,
>  	n_pkts_put = RTE_MIN(count, n_pkts_cpl);
>  	if (unlikely(n_pkts_put == 0)) {
>  		vq->async_last_pkts_n = n_pkts_cpl;
> -		goto done;
> +		return 0;
>  	}
> 
>  	if (vq_is_packed(dev)) {
> @@ -2207,12 +2189,74 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid,
> uint16_t queue_id,
>  			vq->last_async_desc_idx_split += n_descs;
>  	}
> 
> -done:
> +	return n_pkts_put;
> +}
> +
> +uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> +		struct rte_mbuf **pkts, uint16_t count)

Based on DPDK coding style, things like return value should be in another new line.

https://doc.dpdk.org/guides/contributing/coding_style.html#definitions

For similar changes, please check.

> +{
> +	struct virtio_net *dev = get_device(vid);
> +	struct vhost_virtqueue *vq;
> +	uint16_t n_pkts_put = 0;

Since this val is for recording pkts completed, maybe n_pkts_cpl?

> +
> +	if (!dev)
> +		return 0;
> +
> +	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
> +	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
> +		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
> +			dev->vid, __func__, queue_id);
> +		return 0;
> +	}
> +
> +	vq = dev->virtqueue[queue_id];
> +
> +	if (unlikely(!vq->async_registered)) {
> +		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue
> id %d.\n",
> +			dev->vid, __func__, queue_id);
> +		return 0;
> +	}
> +
> +	rte_spinlock_lock(&vq->access_lock);
> +
> +	n_pkts_put = vhost_poll_enqueue_completed(dev, queue_id, pkts, count);
> +
>  	rte_spinlock_unlock(&vq->access_lock);
> 
>  	return n_pkts_put;
>  }
> 
> +uint16_t rte_vhost_try_drain_queue_thread_unsafe(int vid, uint16_t queue_id,
> +		struct rte_mbuf **pkts, uint16_t count, uint16_t times)
> +{
> +	struct virtio_net *dev = get_device(vid);
> +	struct vhost_virtqueue *vq;
> +	uint16_t n_cpl = 0;

Make the name same as above

> +
> +	if (!dev)
> +		return 0;
> +
> +	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
> +	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
> +		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
> +			dev->vid, __func__, queue_id);
> +		return 0;
> +	}
> +
> +	vq = dev->virtqueue[queue_id];
> +
> +	if (unlikely(!vq->async_registered)) {
> +		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue
> id %d.\n",
> +			dev->vid, __func__, queue_id);
> +		return 0;
> +	}
> +
> +	while ((n_cpl < count) && times--)

'while (n_cpl < count && times--)' is enough

Thanks,
Chenbo

> +		n_cpl += vhost_poll_enqueue_completed(dev, queue_id, pkts + n_cpl,
> count);
> +
> +	return n_cpl;
> +}
> +
>  static __rte_always_inline uint32_t
>  virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
>  	struct rte_mbuf **pkts, uint32_t count,
> --
> 2.29.2
Cheng Jiang July 19, 2021, 3:28 a.m. UTC | #2
Hi Chenbo,

I'll fix these issues in next version.
For the name, I think maybe we can use 'rte_vhost_clear_queue_thread_unsafe'.

Thanks,
Cheng

> -----Original Message-----
> From: Xia, Chenbo <chenbo.xia@intel.com>
> Sent: Friday, July 16, 2021 4:56 PM
> To: Jiang, Cheng1 <cheng1.jiang@intel.com>; maxime.coquelin@redhat.com
> Cc: dev@dpdk.org; Hu, Jiayu <jiayu.hu@intel.com>; Yang, YvonneX
> <yvonnex.yang@intel.com>
> Subject: RE: [PATCH v4 2/5] vhost: add unsafe API to drain pkts in async vhost
> 
> Hi Cheng,
> 
> > -----Original Message-----
> > From: Jiang, Cheng1 <cheng1.jiang@intel.com>
> > Sent: Friday, July 16, 2021 10:59 AM
> > To: maxime.coquelin@redhat.com; Xia, Chenbo <chenbo.xia@intel.com>
> > Cc: dev@dpdk.org; Hu, Jiayu <jiayu.hu@intel.com>; Yang, YvonneX
> > <yvonnex.yang@intel.com>; Jiang, Cheng1 <cheng1.jiang@intel.com>
> > Subject: [PATCH v4 2/5] vhost: add unsafe API to drain pkts in async
> > vhost
> >
> > Applications need to stop DMA transfers and finish all the in-flight
> > pkts when in VM memory hot-plug case and async vhost is used. This
> 
> Pkts -> packets
> 
> > patch is to provide an unsafe API to drain in-flight pkts which are
> 
> Ditto
> 
> > submitted to DMA engine in vhost async data path.
> >
> > Signed-off-by: Cheng Jiang <cheng1.jiang@intel.com>
> > ---
> >  lib/vhost/rte_vhost_async.h | 24 ++++++++++
> >  lib/vhost/version.map       |  3 ++
> >  lib/vhost/virtio_net.c      | 90 +++++++++++++++++++++++++++----------
> >  3 files changed, 94 insertions(+), 23 deletions(-)
> >
> > diff --git a/lib/vhost/rte_vhost_async.h b/lib/vhost/rte_vhost_async.h
> > index bc81cd0caa..fd622631b2 100644
> > --- a/lib/vhost/rte_vhost_async.h
> > +++ b/lib/vhost/rte_vhost_async.h
> > @@ -193,4 +193,28 @@ __rte_experimental  uint16_t
> > rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> >  		struct rte_mbuf **pkts, uint16_t count);
> >
> > +/**
> > + * This function checks async completion status and empty all pakcets
> 
> Pakcets -> packets
> 
> > + * for a specific vhost device queue. Packets which are inflight will
> > + * be returned in an array.
> > + *
> > + * @note This function does not perform any locking
> > + *
> > + * @param vid
> > + *  id of vhost device to enqueue data
> 
> id -> ID
> 
> to drain data?
> 
> > + * @param queue_id
> > + *  queue id to enqueue data
> 
> Ditto
> 
> > + * @param pkts
> > + *  blank array to get return packet pointer
> 
> Return -> returned
> 
> > + * @param count
> > + *  size of the packet array
> > + * @param times
> > + *  max number of poll attempts
> > + * @return
> > + *  num of packets returned
> 
> num -> Number
> 
> And please use capital for first character in each line of param description.
> 
> > + */
> > +__rte_experimental
> > +uint16_t rte_vhost_try_drain_queue_thread_unsafe(int vid, uint16_t
> > +queue_id,
> 
> I think 'rte_vhost_drain_queue_thread_unsafe' is better?
> 
> > +		struct rte_mbuf **pkts, uint16_t count, uint16_t times);
> > +
> >  #endif /* _RTE_VHOST_ASYNC_H_ */
> > diff --git a/lib/vhost/version.map b/lib/vhost/version.map index
> > 9103a23cd4..b8fc8770dd 100644
> > --- a/lib/vhost/version.map
> > +++ b/lib/vhost/version.map
> > @@ -79,4 +79,7 @@ EXPERIMENTAL {
> >
> >  	# added in 21.05
> >  	rte_vhost_get_negotiated_protocol_features;
> > +
> > +	# added in 21.08
> > +	rte_vhost_try_drain_queue_thread_unsafe;
> >  };
> > diff --git a/lib/vhost/virtio_net.c b/lib/vhost/virtio_net.c index
> > 8156796a46..9f541679b9 100644
> > --- a/lib/vhost/virtio_net.c
> > +++ b/lib/vhost/virtio_net.c
> > @@ -2115,10 +2115,10 @@ write_back_completed_descs_packed(struct
> > vhost_virtqueue *vq,
> >  	} while (nr_left > 0);
> >  }
> >
> > -uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> > +static __rte_always_inline uint16_t
> > +vhost_poll_enqueue_completed(struct virtio_net *dev, uint16_t
> > +queue_id,
> >  		struct rte_mbuf **pkts, uint16_t count)  {
> > -	struct virtio_net *dev = get_device(vid);
> >  	struct vhost_virtqueue *vq;
> >  	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0, n_buffers = 0;
> >  	uint16_t start_idx, pkts_idx, vq_size; @@ -2126,26 +2126,8 @@
> > uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> >  	uint16_t from, i;
> >  	int32_t n_poll;
> >
> > -	if (!dev)
> > -		return 0;
> > -
> > -	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
> > -	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
> > -		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue
> idx %d.\n",
> > -			dev->vid, __func__, queue_id);
> > -		return 0;
> > -	}
> > -
> >  	vq = dev->virtqueue[queue_id];
> >
> > -	if (unlikely(!vq->async_registered)) {
> > -		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for
> queue
> > id %d.\n",
> > -			dev->vid, __func__, queue_id);
> > -		return 0;
> > -	}
> > -
> > -	rte_spinlock_lock(&vq->access_lock);
> > -
> >  	pkts_idx = vq->async_pkts_idx % vq->size;
> >  	pkts_info = vq->async_pkts_info;
> >  	vq_size = vq->size;
> > @@ -2153,7 +2135,7 @@ uint16_t rte_vhost_poll_enqueue_completed(int
> > vid, uint16_t queue_id,
> >  		vq_size, vq->async_pkts_inflight_n);
> >
> >  	if (count > vq->async_last_pkts_n) {
> > -		n_poll = vq->async_ops.check_completed_copies(vid,
> > +		n_poll = vq->async_ops.check_completed_copies(dev->vid,
> >  			queue_id, 0, count - vq->async_last_pkts_n);
> >  		if (n_poll >= 0) {
> >  			n_pkts_cpl = n_poll;
> > @@ -2168,7 +2150,7 @@ uint16_t rte_vhost_poll_enqueue_completed(int
> > vid, uint16_t queue_id,
> >  	n_pkts_put = RTE_MIN(count, n_pkts_cpl);
> >  	if (unlikely(n_pkts_put == 0)) {
> >  		vq->async_last_pkts_n = n_pkts_cpl;
> > -		goto done;
> > +		return 0;
> >  	}
> >
> >  	if (vq_is_packed(dev)) {
> > @@ -2207,12 +2189,74 @@ uint16_t
> rte_vhost_poll_enqueue_completed(int
> > vid, uint16_t queue_id,
> >  			vq->last_async_desc_idx_split += n_descs;
> >  	}
> >
> > -done:
> > +	return n_pkts_put;
> > +}
> > +
> > +uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> > +		struct rte_mbuf **pkts, uint16_t count)
> 
> Based on DPDK coding style, things like return value should be in another
> new line.
> 
> https://doc.dpdk.org/guides/contributing/coding_style.html#definitions
> 
> For similar changes, please check.
> 
> > +{
> > +	struct virtio_net *dev = get_device(vid);
> > +	struct vhost_virtqueue *vq;
> > +	uint16_t n_pkts_put = 0;
> 
> Since this val is for recording pkts completed, maybe n_pkts_cpl?
> 
> > +
> > +	if (!dev)
> > +		return 0;
> > +
> > +	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
> > +	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
> > +		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue
> idx %d.\n",
> > +			dev->vid, __func__, queue_id);
> > +		return 0;
> > +	}
> > +
> > +	vq = dev->virtqueue[queue_id];
> > +
> > +	if (unlikely(!vq->async_registered)) {
> > +		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for
> queue
> > id %d.\n",
> > +			dev->vid, __func__, queue_id);
> > +		return 0;
> > +	}
> > +
> > +	rte_spinlock_lock(&vq->access_lock);
> > +
> > +	n_pkts_put = vhost_poll_enqueue_completed(dev, queue_id, pkts,
> > +count);
> > +
> >  	rte_spinlock_unlock(&vq->access_lock);
> >
> >  	return n_pkts_put;
> >  }
> >
> > +uint16_t rte_vhost_try_drain_queue_thread_unsafe(int vid, uint16_t
> queue_id,
> > +		struct rte_mbuf **pkts, uint16_t count, uint16_t times) {
> > +	struct virtio_net *dev = get_device(vid);
> > +	struct vhost_virtqueue *vq;
> > +	uint16_t n_cpl = 0;
> 
> Make the name same as above
> 
> > +
> > +	if (!dev)
> > +		return 0;
> > +
> > +	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
> > +	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
> > +		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue
> idx %d.\n",
> > +			dev->vid, __func__, queue_id);
> > +		return 0;
> > +	}
> > +
> > +	vq = dev->virtqueue[queue_id];
> > +
> > +	if (unlikely(!vq->async_registered)) {
> > +		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for
> queue
> > id %d.\n",
> > +			dev->vid, __func__, queue_id);
> > +		return 0;
> > +	}
> > +
> > +	while ((n_cpl < count) && times--)
> 
> 'while (n_cpl < count && times--)' is enough
> 
> Thanks,
> Chenbo
> 
> > +		n_cpl += vhost_poll_enqueue_completed(dev, queue_id,
> pkts + n_cpl,
> > count);
> > +
> > +	return n_cpl;
> > +}
> > +
> >  static __rte_always_inline uint32_t
> >  virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
> >  	struct rte_mbuf **pkts, uint32_t count,
> > --
> > 2.29.2
diff mbox series

Patch

diff --git a/lib/vhost/rte_vhost_async.h b/lib/vhost/rte_vhost_async.h
index bc81cd0caa..fd622631b2 100644
--- a/lib/vhost/rte_vhost_async.h
+++ b/lib/vhost/rte_vhost_async.h
@@ -193,4 +193,28 @@  __rte_experimental
 uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 		struct rte_mbuf **pkts, uint16_t count);
 
+/**
+ * This function checks async completion status and empty all pakcets
+ * for a specific vhost device queue. Packets which are inflight will
+ * be returned in an array.
+ *
+ * @note This function does not perform any locking
+ *
+ * @param vid
+ *  id of vhost device to enqueue data
+ * @param queue_id
+ *  queue id to enqueue data
+ * @param pkts
+ *  blank array to get return packet pointer
+ * @param count
+ *  size of the packet array
+ * @param times
+ *  max number of poll attempts
+ * @return
+ *  num of packets returned
+ */
+__rte_experimental
+uint16_t rte_vhost_try_drain_queue_thread_unsafe(int vid, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint16_t count, uint16_t times);
+
 #endif /* _RTE_VHOST_ASYNC_H_ */
diff --git a/lib/vhost/version.map b/lib/vhost/version.map
index 9103a23cd4..b8fc8770dd 100644
--- a/lib/vhost/version.map
+++ b/lib/vhost/version.map
@@ -79,4 +79,7 @@  EXPERIMENTAL {
 
 	# added in 21.05
 	rte_vhost_get_negotiated_protocol_features;
+
+	# added in 21.08
+	rte_vhost_try_drain_queue_thread_unsafe;
 };
diff --git a/lib/vhost/virtio_net.c b/lib/vhost/virtio_net.c
index 8156796a46..9f541679b9 100644
--- a/lib/vhost/virtio_net.c
+++ b/lib/vhost/virtio_net.c
@@ -2115,10 +2115,10 @@  write_back_completed_descs_packed(struct vhost_virtqueue *vq,
 	} while (nr_left > 0);
 }
 
-uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
+static __rte_always_inline uint16_t
+vhost_poll_enqueue_completed(struct virtio_net *dev, uint16_t queue_id,
 		struct rte_mbuf **pkts, uint16_t count)
 {
-	struct virtio_net *dev = get_device(vid);
 	struct vhost_virtqueue *vq;
 	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0, n_buffers = 0;
 	uint16_t start_idx, pkts_idx, vq_size;
@@ -2126,26 +2126,8 @@  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 	uint16_t from, i;
 	int32_t n_poll;
 
-	if (!dev)
-		return 0;
-
-	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
-	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
-		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
-			dev->vid, __func__, queue_id);
-		return 0;
-	}
-
 	vq = dev->virtqueue[queue_id];
 
-	if (unlikely(!vq->async_registered)) {
-		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue id %d.\n",
-			dev->vid, __func__, queue_id);
-		return 0;
-	}
-
-	rte_spinlock_lock(&vq->access_lock);
-
 	pkts_idx = vq->async_pkts_idx % vq->size;
 	pkts_info = vq->async_pkts_info;
 	vq_size = vq->size;
@@ -2153,7 +2135,7 @@  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 		vq_size, vq->async_pkts_inflight_n);
 
 	if (count > vq->async_last_pkts_n) {
-		n_poll = vq->async_ops.check_completed_copies(vid,
+		n_poll = vq->async_ops.check_completed_copies(dev->vid,
 			queue_id, 0, count - vq->async_last_pkts_n);
 		if (n_poll >= 0) {
 			n_pkts_cpl = n_poll;
@@ -2168,7 +2150,7 @@  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 	n_pkts_put = RTE_MIN(count, n_pkts_cpl);
 	if (unlikely(n_pkts_put == 0)) {
 		vq->async_last_pkts_n = n_pkts_cpl;
-		goto done;
+		return 0;
 	}
 
 	if (vq_is_packed(dev)) {
@@ -2207,12 +2189,74 @@  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 			vq->last_async_desc_idx_split += n_descs;
 	}
 
-done:
+	return n_pkts_put;
+}
+
+uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint16_t count)
+{
+	struct virtio_net *dev = get_device(vid);
+	struct vhost_virtqueue *vq;
+	uint16_t n_pkts_put = 0;
+
+	if (!dev)
+		return 0;
+
+	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
+	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
+		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
+			dev->vid, __func__, queue_id);
+		return 0;
+	}
+
+	vq = dev->virtqueue[queue_id];
+
+	if (unlikely(!vq->async_registered)) {
+		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue id %d.\n",
+			dev->vid, __func__, queue_id);
+		return 0;
+	}
+
+	rte_spinlock_lock(&vq->access_lock);
+
+	n_pkts_put = vhost_poll_enqueue_completed(dev, queue_id, pkts, count);
+
 	rte_spinlock_unlock(&vq->access_lock);
 
 	return n_pkts_put;
 }
 
+uint16_t rte_vhost_try_drain_queue_thread_unsafe(int vid, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint16_t count, uint16_t times)
+{
+	struct virtio_net *dev = get_device(vid);
+	struct vhost_virtqueue *vq;
+	uint16_t n_cpl = 0;
+
+	if (!dev)
+		return 0;
+
+	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
+	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
+		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
+			dev->vid, __func__, queue_id);
+		return 0;
+	}
+
+	vq = dev->virtqueue[queue_id];
+
+	if (unlikely(!vq->async_registered)) {
+		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue id %d.\n",
+			dev->vid, __func__, queue_id);
+		return 0;
+	}
+
+	while ((n_cpl < count) && times--)
+		n_cpl += vhost_poll_enqueue_completed(dev, queue_id, pkts + n_cpl, count);
+
+	return n_cpl;
+}
+
 static __rte_always_inline uint32_t
 virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
 	struct rte_mbuf **pkts, uint32_t count,