diff mbox series

[v1,2/2] vhost: introduce async enqueue for split ring

Message ID 1591869725-13331-3-git-send-email-patrick.fu@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Maxime Coquelin
Headers show
Series introduce asynchronous data path for vhost | expand

Checks

Context Check Description
ci/Intel-compilation success Compilation OK
ci/travis-robot success Travis build: errored
ci/checkpatch success coding style OK

Commit Message

Patrick Fu June 11, 2020, 10:02 a.m. UTC
From: Patrick <patrick.fu@intel.com>

This patch implement async enqueue data path for split ring.

Signed-off-by: Patrick <patrick.fu@intel.com>
---
 lib/librte_vhost/rte_vhost_async.h |  38 +++
 lib/librte_vhost/virtio_net.c      | 538 ++++++++++++++++++++++++++++++++++++-
 2 files changed, 574 insertions(+), 2 deletions(-)

Comments

Liu, Yong June 18, 2020, 6:56 a.m. UTC | #1
Thanks, Patrick. Some comments are inline.

> -----Original Message-----
> From: dev <dev-bounces@dpdk.org> On Behalf Of patrick.fu@intel.com
> Sent: Thursday, June 11, 2020 6:02 PM
> To: dev@dpdk.org; maxime.coquelin@redhat.com; Xia, Chenbo
> <chenbo.xia@intel.com>; Wang, Zhihong <zhihong.wang@intel.com>; Ye,
> Xiaolong <xiaolong.ye@intel.com>
> Cc: Fu, Patrick <patrick.fu@intel.com>; Jiang, Cheng1
> <cheng1.jiang@intel.com>; Liang, Cunming <cunming.liang@intel.com>
> Subject: [dpdk-dev] [PATCH v1 2/2] vhost: introduce async enqueue for split
> ring
> 
> From: Patrick <patrick.fu@intel.com>
> 
> This patch implement async enqueue data path for split ring.
> 
> Signed-off-by: Patrick <patrick.fu@intel.com>
> ---
>  lib/librte_vhost/rte_vhost_async.h |  38 +++
>  lib/librte_vhost/virtio_net.c      | 538
> ++++++++++++++++++++++++++++++++++++-
>  2 files changed, 574 insertions(+), 2 deletions(-)
> 
> diff --git a/lib/librte_vhost/rte_vhost_async.h
> b/lib/librte_vhost/rte_vhost_async.h
> index 82f2ebe..efcba0a 100644
> --- a/lib/librte_vhost/rte_vhost_async.h
> +++ b/lib/librte_vhost/rte_vhost_async.h
> @@ -131,4 +131,42 @@ int rte_vhost_async_channel_register(int vid,
> uint16_t queue_id,
>   */
>  int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
> 
> +/**
> + * This function submit enqueue data to DMA. This function has no
> + * guranttee to the transfer completion upon return. Applications should
> + * poll transfer status by rte_vhost_poll_enqueue_completed()
> + *
> + * @param vid
> + *  id of vhost device to enqueue data
> + * @param queue_id
> + *  queue id to enqueue data
> + * @param pkts
> + *  array of packets to be enqueued
> + * @param count
> + *  packets num to be enqueued
> + * @return
> + *  num of packets enqueued
> + */
> +uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
> +		struct rte_mbuf **pkts, uint16_t count);
> +
> +/**
> + * This function check DMA completion status for a specific vhost
> + * device queue. Packets which finish copying (enqueue) operation
> + * will be returned in an array.
> + *
> + * @param vid
> + *  id of vhost device to enqueue data
> + * @param queue_id
> + *  queue id to enqueue data
> + * @param pkts
> + *  blank array to get return packet pointer
> + * @param count
> + *  size of the packet array
> + * @return
> + *  num of packets returned
> + */
> +uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> +		struct rte_mbuf **pkts, uint16_t count);
> +
>  #endif /* _RTE_VDPA_H_ */
> diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> index 751c1f3..cf9f884 100644
> --- a/lib/librte_vhost/virtio_net.c
> +++ b/lib/librte_vhost/virtio_net.c
> @@ -17,14 +17,15 @@
>  #include <rte_arp.h>
>  #include <rte_spinlock.h>
>  #include <rte_malloc.h>
> +#include <rte_vhost_async.h>
> 
>  #include "iotlb.h"
>  #include "vhost.h"
> 
> -#define MAX_PKT_BURST 32
> -
>  #define MAX_BATCH_LEN 256
> 
> +#define VHOST_ASYNC_BATCH_THRESHOLD 8
> +
>  static  __rte_always_inline bool
>  rxvq_is_mergeable(struct virtio_net *dev)
>  {
> @@ -117,6 +118,35 @@
>  }
> 
>  static __rte_always_inline void
> +async_flush_shadow_used_ring_split(struct virtio_net *dev,
> +	struct vhost_virtqueue *vq)
> +{
> +	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
> +
> +	if (used_idx + vq->shadow_used_idx <= vq->size) {
> +		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
> +					  vq->shadow_used_idx);
> +	} else {
> +		uint16_t size;
> +
> +		/* update used ring interval [used_idx, vq->size] */
> +		size = vq->size - used_idx;
> +		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
> +
> +		/* update the left half used ring interval [0, left_size] */
> +		do_flush_shadow_used_ring_split(dev, vq, 0, size,
> +					  vq->shadow_used_idx - size);
> +	}
> +	vq->last_used_idx += vq->shadow_used_idx;
> +
> +	rte_smp_wmb();
> +
> +	vhost_log_cache_sync(dev, vq);
> +
> +	vq->shadow_used_idx = 0;
> +}
> +
> +static __rte_always_inline void
>  update_shadow_used_ring_split(struct vhost_virtqueue *vq,
>  			 uint16_t desc_idx, uint32_t len)
>  {
> @@ -905,6 +935,199 @@
>  	return error;
>  }
> 
> +static __rte_always_inline void
> +async_fill_vec(struct iovec *v, void *base, size_t len)
> +{
> +	v->iov_base = base;
> +	v->iov_len = len;
> +}
> +
> +static __rte_always_inline void
> +async_fill_it(struct iov_it *it, size_t count,
> +	struct iovec *vec, unsigned long nr_seg)
> +{
> +	it->offset = 0;
> +	it->count = count;
> +
> +	if (count) {
> +		it->iov = vec;
> +		it->nr_segs = nr_seg;
> +	} else {
> +		it->iov = 0;
> +		it->nr_segs = 0;
> +	}
> +}
> +
> +static __rte_always_inline void
> +async_fill_des(struct dma_trans_desc *desc,
> +	struct iov_it *src, struct iov_it *dst)
> +{
> +	desc->src = src;
> +	desc->dst = dst;
> +}
> +
> +static __rte_always_inline int
> +async_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
> +			struct rte_mbuf *m, struct buf_vector *buf_vec,
> +			uint16_t nr_vec, uint16_t num_buffers,
> +			struct iovec *src_iovec, struct iovec *dst_iovec,
> +			struct iov_it *src_it, struct iov_it *dst_it)
> +{

There're too much arguments in this function, please check whether it will impact performance. 

> +	uint32_t vec_idx = 0;
> +	uint32_t mbuf_offset, mbuf_avail;
> +	uint32_t buf_offset, buf_avail;
> +	uint64_t buf_addr, buf_iova, buf_len;
> +	uint32_t cpy_len, cpy_threshold;
> +	uint64_t hdr_addr;
> +	struct rte_mbuf *hdr_mbuf;
> +	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
> +	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
> +	int error = 0;
> +
> +	uint32_t tlen = 0;
> +	int tvec_idx = 0;
> +	void *hpa;
> +
> +	if (unlikely(m == NULL)) {
> +		error = -1;
> +		goto out;
> +	}
> +
> +	cpy_threshold = vq->async_threshold;
> +
> +	buf_addr = buf_vec[vec_idx].buf_addr;
> +	buf_iova = buf_vec[vec_idx].buf_iova;
> +	buf_len = buf_vec[vec_idx].buf_len;
> +
> +	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1)) {
> +		error = -1;
> +		goto out;
> +	}
> +
> +	hdr_mbuf = m;
> +	hdr_addr = buf_addr;
> +	if (unlikely(buf_len < dev->vhost_hlen))
> +		hdr = &tmp_hdr;
> +	else
> +		hdr = (struct virtio_net_hdr_mrg_rxbuf
> *)(uintptr_t)hdr_addr;
> +
> +	VHOST_LOG_DATA(DEBUG, "(%d) RX: num merge buffers %d\n",
> +		dev->vid, num_buffers);
> +
> +	if (unlikely(buf_len < dev->vhost_hlen)) {
> +		buf_offset = dev->vhost_hlen - buf_len;
> +		vec_idx++;
> +		buf_addr = buf_vec[vec_idx].buf_addr;
> +		buf_iova = buf_vec[vec_idx].buf_iova;
> +		buf_len = buf_vec[vec_idx].buf_len;
> +		buf_avail = buf_len - buf_offset;
> +	} else {
> +		buf_offset = dev->vhost_hlen;
> +		buf_avail = buf_len - dev->vhost_hlen;
> +	}
> +
> +	mbuf_avail  = rte_pktmbuf_data_len(m);
> +	mbuf_offset = 0;
> +
> +	while (mbuf_avail != 0 || m->next != NULL) {
> +		/* done with current buf, get the next one */
> +		if (buf_avail == 0) {
> +			vec_idx++;
> +			if (unlikely(vec_idx >= nr_vec)) {
> +				error = -1;
> +				goto out;
> +			}
> +
> +			buf_addr = buf_vec[vec_idx].buf_addr;
> +			buf_iova = buf_vec[vec_idx].buf_iova;
> +			buf_len = buf_vec[vec_idx].buf_len;
> +
> +			buf_offset = 0;
> +			buf_avail  = buf_len;
> +		}
> +
> +		/* done with current mbuf, get the next one */
> +		if (mbuf_avail == 0) {
> +			m = m->next;
> +
> +			mbuf_offset = 0;
> +			mbuf_avail  = rte_pktmbuf_data_len(m);
> +		}
> +
> +		if (hdr_addr) {
> +			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
> +			if (rxvq_is_mergeable(dev))
> +				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
> +						num_buffers);
> +
> +			if (unlikely(hdr == &tmp_hdr)) {
> +				copy_vnet_hdr_to_desc(dev, vq, buf_vec,
> hdr);
> +			} else {
> +				PRINT_PACKET(dev, (uintptr_t)hdr_addr,
> +						dev->vhost_hlen, 0);
> +				vhost_log_cache_write_iova(dev, vq,
> +						buf_vec[0].buf_iova,
> +						dev->vhost_hlen);
> +			}
> +
> +			hdr_addr = 0;
> +		}
> +
> +		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
> +
> +		if (unlikely(cpy_len >= cpy_threshold)) {
> +			hpa = (void *)(uintptr_t)gpa_to_hpa(dev,
> +					buf_iova + buf_offset, cpy_len);

I have one question here. If user has called async copy directly, should vhost library still check copy threshold for software fallback?  
If need software fallback, IMHO it will be more suitable to handle it in copy device driver.

IMHO, the cost will be too high for checking and fix virtio header in async copy function. 
Since this is async copy datapath, could it possible that eliminate the cost in calculation of segmented addresses? 

> +
> +			if (unlikely(!hpa)) {
> +				error = -1;
> +				goto out;
> +			}
> +
> +			async_fill_vec(src_iovec + tvec_idx,
> +				(void
> *)(uintptr_t)rte_pktmbuf_iova_offset(m,
> +						mbuf_offset), cpy_len);
> +
> +			async_fill_vec(dst_iovec + tvec_idx, hpa, cpy_len);
> +
> +			tlen += cpy_len;
> +			tvec_idx++;
> +		} else {
> +			if (unlikely(vq->batch_copy_nb_elems >= vq->size)) {
> +				rte_memcpy(
> +				(void *)((uintptr_t)(buf_addr + buf_offset)),
> +				rte_pktmbuf_mtod_offset(m, void *,
> mbuf_offset),
> +				cpy_len);
> +
> +				PRINT_PACKET(dev,
> +					(uintptr_t)(buf_addr + buf_offset),
> +					cpy_len, 0);
> +			} else {
> +				batch_copy[vq->batch_copy_nb_elems].dst =
> +				(void *)((uintptr_t)(buf_addr + buf_offset));
> +				batch_copy[vq->batch_copy_nb_elems].src =
> +				rte_pktmbuf_mtod_offset(m, void *,
> mbuf_offset);
> +				batch_copy[vq-
> >batch_copy_nb_elems].log_addr =
> +					buf_iova + buf_offset;
> +				batch_copy[vq->batch_copy_nb_elems].len =
> +					cpy_len;
> +				vq->batch_copy_nb_elems++;
> +			}
> +		}
> +
> +		mbuf_avail  -= cpy_len;
> +		mbuf_offset += cpy_len;
> +		buf_avail  -= cpy_len;
> +		buf_offset += cpy_len;
> +	}
> +
> +out:
> +	async_fill_it(src_it, tlen, src_iovec, tvec_idx);
> +	async_fill_it(dst_it, tlen, dst_iovec, tvec_idx);
> +
> +	return error;
> +}
> +
>  static __rte_always_inline int
>  vhost_enqueue_single_packed(struct virtio_net *dev,
>  			    struct vhost_virtqueue *vq,
> @@ -1236,6 +1459,317 @@
>  	return virtio_dev_rx(dev, queue_id, pkts, count);
>  }
> 
> +static __rte_always_inline void
> +virtio_dev_rx_async_submit_split_err(struct virtio_net *dev,
> +	struct vhost_virtqueue *vq, uint16_t queue_id,
> +	uint16_t last_idx, uint16_t shadow_idx)
> +{
> +	while (vq->async_pkts_inflight_n) {
> +		int er = vq->async_ops.check_completed_copies(dev->vid,
> +			queue_id, 0, MAX_PKT_BURST);
> +
> +		if (er < 0) {
> +			vq->async_pkts_inflight_n = 0;
> +			break;
> +		}
> +
> +		vq->async_pkts_inflight_n -= er;
> +	}
> +
> +	vq->shadow_used_idx = shadow_idx;
> +	vq->last_avail_idx = last_idx;
> +}
> +
> +static __rte_noinline uint32_t
> +virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> +	struct vhost_virtqueue *vq, uint16_t queue_id,
> +	struct rte_mbuf **pkts, uint32_t count)
> +{
> +	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
> +	uint16_t num_buffers;
> +	struct buf_vector buf_vec[BUF_VECTOR_MAX];
> +	uint16_t avail_head, last_idx, shadow_idx;
> +
> +	struct iov_it *it_pool = vq->it_pool;
> +	struct iovec *vec_pool = vq->vec_pool;
> +	struct dma_trans_desc tdes[MAX_PKT_BURST];
> +	struct iovec *src_iovec = vec_pool;
> +	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
> +	struct iov_it *src_it = it_pool;
> +	struct iov_it *dst_it = it_pool + 1;
> +	uint16_t n_free_slot, slot_idx;
> +	int n_pkts = 0;
> +
> +	avail_head = *((volatile uint16_t *)&vq->avail->idx);
> +	last_idx = vq->last_avail_idx;
> +	shadow_idx = vq->shadow_used_idx;
> +
> +	/*
> +	 * The ordering between avail index and
> +	 * desc reads needs to be enforced.
> +	 */
> +	rte_smp_rmb();
> +
> +	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
> +
> +	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
> +		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
> +		uint16_t nr_vec = 0;
> +
> +		if (unlikely(reserve_avail_buf_split(dev, vq,
> +						pkt_len, buf_vec,
> &num_buffers,
> +						avail_head, &nr_vec) < 0)) {
> +			VHOST_LOG_DATA(DEBUG,
> +				"(%d) failed to get enough desc from
> vring\n",
> +				dev->vid);
> +			vq->shadow_used_idx -= num_buffers;
> +			break;
> +		}
> +
> +		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end
> index %d\n",
> +			dev->vid, vq->last_avail_idx,
> +			vq->last_avail_idx + num_buffers);
> +
> +		if (async_mbuf_to_desc(dev, vq, pkts[pkt_idx],
> +				buf_vec, nr_vec, num_buffers,
> +				src_iovec, dst_iovec, src_it, dst_it) < 0) {
> +			vq->shadow_used_idx -= num_buffers;
> +			break;
> +		}
> +
> +		slot_idx = (vq->async_pkts_idx + pkt_idx) & (vq->size - 1);
> +		if (src_it->count) {
> +			async_fill_des(&tdes[pkt_burst_idx], src_it, dst_it);
> +			pkt_burst_idx++;
> +			vq->async_pending_info[slot_idx] =
> +				num_buffers | (src_it->nr_segs << 16);
> +			src_iovec += src_it->nr_segs;
> +			dst_iovec += dst_it->nr_segs;
> +			src_it += 2;
> +			dst_it += 2;
> +		} else {
> +			vq->async_pending_info[slot_idx] = num_buffers;
> +			vq->async_pkts_inflight_n++;
> +		}
> +
> +		vq->last_avail_idx += num_buffers;
> +
> +		if (pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
> +				(pkt_idx == count - 1 && pkt_burst_idx)) {
> +			n_pkts = vq->async_ops.transfer_data(dev->vid,
> +					queue_id, tdes, 0, pkt_burst_idx);
> +			src_iovec = vec_pool;
> +			dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >>
> 1);
> +			src_it = it_pool;
> +			dst_it = it_pool + 1;
> +
> +			if (unlikely(n_pkts < (int)pkt_burst_idx)) {
> +				vq->async_pkts_inflight_n +=
> +					n_pkts > 0 ? n_pkts : 0;
> +				virtio_dev_rx_async_submit_split_err(dev,
> +					vq, queue_id, last_idx, shadow_idx);
> +				return 0;
> +			}
> +
> +			pkt_burst_idx = 0;
> +			vq->async_pkts_inflight_n += n_pkts;
> +		}
> +	}
> +
> +	if (pkt_burst_idx) {
> +		n_pkts = vq->async_ops.transfer_data(dev->vid,
> +				queue_id, tdes, 0, pkt_burst_idx);
> +		if (unlikely(n_pkts <= (int)pkt_burst_idx)) {
> +			vq->async_pkts_inflight_n += n_pkts > 0 ? n_pkts : 0;
> +			virtio_dev_rx_async_submit_split_err(dev, vq,
> queue_id,
> +			last_idx, shadow_idx);
> +			return 0;
> +		}
> +
> +		vq->async_pkts_inflight_n += n_pkts;
> +	}
> +
> +	do_data_copy_enqueue(dev, vq);
> +
> +	n_free_slot = vq->size - vq->async_pkts_idx;
> +	if (n_free_slot > pkt_idx) {
> +		rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
> +			pkts, pkt_idx * sizeof(uintptr_t));
> +		vq->async_pkts_idx += pkt_idx;
> +	} else {
> +		rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
> +			pkts, n_free_slot * sizeof(uintptr_t));
> +		rte_memcpy(&vq->async_pkts_pending[0],
> +			&pkts[n_free_slot],
> +			(pkt_idx - n_free_slot) * sizeof(uintptr_t));
> +		vq->async_pkts_idx = pkt_idx - n_free_slot;
> +	}
> +
> +	if (likely(vq->shadow_used_idx))
> +		async_flush_shadow_used_ring_split(dev, vq);
> +
> +	return pkt_idx;
> +}
> +
> +uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> +		struct rte_mbuf **pkts, uint16_t count)
> +{
> +	struct virtio_net *dev = get_device(vid);
> +	struct vhost_virtqueue *vq;
> +	uint16_t n_pkts_cpl, n_pkts_put = 0, n_descs = 0;
> +	uint16_t start_idx, pkts_idx, vq_size;
> +	uint64_t *async_pending_info;
> +
> +	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
> +	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
> +		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue
> idx %d.\n",
> +			dev->vid, __func__, queue_id);
> +		return 0;
> +	}
> +
> +	vq = dev->virtqueue[queue_id];
> +
Should check whether this device or queue support async copy, vq->async_pending_info is NULL if queue not enable async_copy.

> +	rte_spinlock_lock(&vq->access_lock);
> +
> +	pkts_idx = vq->async_pkts_idx;
> +	async_pending_info = vq->async_pending_info;
> +	vq_size = vq->size;
> +	start_idx = pkts_idx > vq->async_pkts_inflight_n ?
> +		pkts_idx - vq->async_pkts_inflight_n :
> +		(vq_size - vq->async_pkts_inflight_n + pkts_idx) &
> +		(vq_size - 1);
> +
> +	n_pkts_cpl =
> +		vq->async_ops.check_completed_copies(vid, queue_id, 0,
> count);
> +
> +	rte_smp_wmb();
> +
> +	while (likely(((start_idx + n_pkts_put) & (vq_size - 1)) != pkts_idx)) {
> +		uint64_t info = async_pending_info[
> +			(start_idx + n_pkts_put) & (vq_size - 1)];
> +		uint64_t n_segs;
> +		n_pkts_put++;
> +		n_descs += info & ASYNC_PENDING_INFO_N_MSK;
> +		n_segs = info >> ASYNC_PENDING_INFO_N_SFT;
> +
> +		if (n_segs) {
> +			if (!n_pkts_cpl || n_pkts_cpl < n_segs) {
> +				n_pkts_put--;
> +				n_descs -= info &
> ASYNC_PENDING_INFO_N_MSK;
> +				if (n_pkts_cpl) {
> +					async_pending_info[
> +						(start_idx + n_pkts_put) &
> +						(vq_size - 1)] =
> +					((n_segs - n_pkts_cpl) <<
> +					 ASYNC_PENDING_INFO_N_SFT) |
> +					(info &
> ASYNC_PENDING_INFO_N_MSK);
> +					n_pkts_cpl = 0;
> +				}
> +				break;
> +			}
> +			n_pkts_cpl -= n_segs;
> +		}
> +	}
> +
> +	if (n_pkts_put) {
> +		vq->async_pkts_inflight_n -= n_pkts_put;
> +		*(volatile uint16_t *)&vq->used->idx += n_descs;
> +
> +		vhost_vring_call_split(dev, vq);
> +	}
> +
> +	if (start_idx + n_pkts_put <= vq_size) {
> +		rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
> +			n_pkts_put * sizeof(uintptr_t));
> +	} else {
> +		rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
> +			(vq_size - start_idx) * sizeof(uintptr_t));
> +		rte_memcpy(&pkts[vq_size - start_idx], vq-
> >async_pkts_pending,
> +			(n_pkts_put - vq_size + start_idx) * sizeof(uintptr_t));
> +	}
> +
> +	rte_spinlock_unlock(&vq->access_lock);
> +
> +	return n_pkts_put;
> +}
> +
> +static __rte_always_inline uint32_t
> +virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
> +	struct rte_mbuf **pkts, uint32_t count)
> +{
> +	struct vhost_virtqueue *vq;
> +	uint32_t nb_tx = 0;
> +	bool drawback = false;
> +
> +	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
> +	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
> +		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue
> idx %d.\n",
> +			dev->vid, __func__, queue_id);
> +		return 0;
> +	}
> +
> +	vq = dev->virtqueue[queue_id];
> +
> +	rte_spinlock_lock(&vq->access_lock);
> +
> +	if (unlikely(vq->enabled == 0))
> +		goto out_access_unlock;
> +
> +	if (unlikely(!vq->async_registered)) {
> +		drawback = true;
> +		goto out_access_unlock;
> +	}
> +
> +	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
> +		vhost_user_iotlb_rd_lock(vq);
> +
> +	if (unlikely(vq->access_ok == 0))
> +		if (unlikely(vring_translate(dev, vq) < 0))
> +			goto out;
> +
> +	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
> +	if (count == 0)
> +		goto out;
> +
> +	/* TODO: packed queue not implemented */
> +	if (vq_is_packed(dev))
> +		nb_tx = 0;
> +	else
> +		nb_tx = virtio_dev_rx_async_submit_split(dev,
> +				vq, queue_id, pkts, count);
> +
> +out:
> +	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
> +		vhost_user_iotlb_rd_unlock(vq);
> +
> +out_access_unlock:
> +	rte_spinlock_unlock(&vq->access_lock);
> +
> +	if (drawback)
> +		return rte_vhost_enqueue_burst(dev->vid, queue_id, pkts,
> count);
> +
> +	return nb_tx;
> +}
> +
> +uint16_t
> +rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
> +		struct rte_mbuf **pkts, uint16_t count)
> +{
> +	struct virtio_net *dev = get_device(vid);
> +
> +	if (!dev)
> +		return 0;
> +
> +	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
> +		VHOST_LOG_DATA(ERR,
> +			"(%d) %s: built-in vhost net backend is disabled.\n",
> +			dev->vid, __func__);
> +		return 0;
> +	}
> +
> +	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count);
> +}
> +
>  static inline bool
>  virtio_net_with_host_offload(struct virtio_net *dev)
>  {
> --
> 1.8.3.1
Patrick Fu June 18, 2020, 11:36 a.m. UTC | #2
Hi,

> -----Original Message-----
> From: Liu, Yong <yong.liu@intel.com>
> Sent: Thursday, June 18, 2020 2:57 PM
> To: Fu, Patrick <patrick.fu@intel.com>
> Cc: Fu, Patrick <patrick.fu@intel.com>; Jiang, Cheng1
> <cheng1.jiang@intel.com>; Liang, Cunming <cunming.liang@intel.com>;
> dev@dpdk.org; maxime.coquelin@redhat.com; Xia, Chenbo
> <chenbo.xia@intel.com>; Wang, Zhihong <zhihong.wang@intel.com>; Ye,
> Xiaolong <xiaolong.ye@intel.com>
> Subject: RE: [dpdk-dev] [PATCH v1 2/2] vhost: introduce async enqueue for
> split ring
> 
> Thanks, Patrick. Some comments are inline.
> 
> >
> > From: Patrick <patrick.fu@intel.com>
> >
> > This patch implement async enqueue data path for split ring.
> >
> > Signed-off-by: Patrick <patrick.fu@intel.com>
> > ---
> >  lib/librte_vhost/rte_vhost_async.h |  38 +++
> >  lib/librte_vhost/virtio_net.c      | 538
> > ++++++++++++++++++++++++++++++++++++-
> >  2 files changed, 574 insertions(+), 2 deletions(-)
> >
> > diff --git a/lib/librte_vhost/rte_vhost_async.h
> > b/lib/librte_vhost/rte_vhost_async.h
> > index 82f2ebe..efcba0a 100644
> > --- a/lib/librte_vhost/rte_vhost_async.h
> > +++ b/lib/librte_vhost/rte_vhost_async.h
> > +
> > +static __rte_always_inline int
> > +async_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
> > +			struct rte_mbuf *m, struct buf_vector *buf_vec,
> > +			uint16_t nr_vec, uint16_t num_buffers,
> > +			struct iovec *src_iovec, struct iovec *dst_iovec,
> > +			struct iov_it *src_it, struct iov_it *dst_it) {
> 
> There're too much arguments in this function, please check whether it will
> impact performance.
> 
I guess src_iovec & dst_iovec could be removed from the parameter list. 

> > +	uint32_t vec_idx = 0;
> > +	uint32_t mbuf_offset, mbuf_avail;
> > +	uint32_t buf_offset, buf_avail;
> > +	uint64_t buf_addr, buf_iova, buf_len;
> > +	uint32_t cpy_len, cpy_threshold;
> > +	uint64_t hdr_addr;
> > +	struct rte_mbuf *hdr_mbuf;
> > +	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
> > +	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
> > +	int error = 0;
> > +
> > +	uint32_t tlen = 0;
> > +	int tvec_idx = 0;
> > +	void *hpa;
> > +
> > +	if (unlikely(m == NULL)) {
> > +		error = -1;
> > +		goto out;
> > +	}
> > +
> > +	cpy_threshold = vq->async_threshold;
> > +
> > +	buf_addr = buf_vec[vec_idx].buf_addr;
> > +	buf_iova = buf_vec[vec_idx].buf_iova;
> > +	buf_len = buf_vec[vec_idx].buf_len;
> > +
> > +	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1)) {
> > +		error = -1;
> > +		goto out;
> > +	}
> > +
> > +	hdr_mbuf = m;
> > +	hdr_addr = buf_addr;
> > +	if (unlikely(buf_len < dev->vhost_hlen))
> > +		hdr = &tmp_hdr;
> > +	else
> > +		hdr = (struct virtio_net_hdr_mrg_rxbuf
> > *)(uintptr_t)hdr_addr;
> > +
> > +	VHOST_LOG_DATA(DEBUG, "(%d) RX: num merge buffers %d\n",
> > +		dev->vid, num_buffers);
> > +
> > +	if (unlikely(buf_len < dev->vhost_hlen)) {
> > +		buf_offset = dev->vhost_hlen - buf_len;
> > +		vec_idx++;
> > +		buf_addr = buf_vec[vec_idx].buf_addr;
> > +		buf_iova = buf_vec[vec_idx].buf_iova;
> > +		buf_len = buf_vec[vec_idx].buf_len;
> > +		buf_avail = buf_len - buf_offset;
> > +	} else {
> > +		buf_offset = dev->vhost_hlen;
> > +		buf_avail = buf_len - dev->vhost_hlen;
> > +	}
> > +
> > +	mbuf_avail  = rte_pktmbuf_data_len(m);
> > +	mbuf_offset = 0;
> > +
> > +	while (mbuf_avail != 0 || m->next != NULL) {
> > +		/* done with current buf, get the next one */
> > +		if (buf_avail == 0) {
> > +			vec_idx++;
> > +			if (unlikely(vec_idx >= nr_vec)) {
> > +				error = -1;
> > +				goto out;
> > +			}
> > +
> > +			buf_addr = buf_vec[vec_idx].buf_addr;
> > +			buf_iova = buf_vec[vec_idx].buf_iova;
> > +			buf_len = buf_vec[vec_idx].buf_len;
> > +
> > +			buf_offset = 0;
> > +			buf_avail  = buf_len;
> > +		}
> > +
> > +		/* done with current mbuf, get the next one */
> > +		if (mbuf_avail == 0) {
> > +			m = m->next;
> > +
> > +			mbuf_offset = 0;
> > +			mbuf_avail  = rte_pktmbuf_data_len(m);
> > +		}
> > +
> > +		if (hdr_addr) {
> > +			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
> > +			if (rxvq_is_mergeable(dev))
> > +				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
> > +						num_buffers);
> > +
> > +			if (unlikely(hdr == &tmp_hdr)) {
> > +				copy_vnet_hdr_to_desc(dev, vq, buf_vec,
> > hdr);
> > +			} else {
> > +				PRINT_PACKET(dev, (uintptr_t)hdr_addr,
> > +						dev->vhost_hlen, 0);
> > +				vhost_log_cache_write_iova(dev, vq,
> > +						buf_vec[0].buf_iova,
> > +						dev->vhost_hlen);
> > +			}
> > +
> > +			hdr_addr = 0;
> > +		}
> > +
> > +		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
> > +
> > +		if (unlikely(cpy_len >= cpy_threshold)) {
> > +			hpa = (void *)(uintptr_t)gpa_to_hpa(dev,
> > +					buf_iova + buf_offset, cpy_len);
> 
> I have one question here. If user has called async copy directly, should vhost
> library still check copy threshold for software fallback?
> If need software fallback, IMHO it will be more suitable to handle it in copy
> device driver.
> 
Technically, we can delegate the threshold judgement from vhost to application callbacks. 
This will significantly increase the complexity of the callback implementations, which have to maintain
correct ordering between dma copied data and CPU copies data. Meanwhile, it actually doesn't help to 
boost performance comparing with current vhost implementation. Considering this threshold is a 
generic design, I would still prefer to keep it in vhost.

> IMHO, the cost will be too high for checking and fix virtio header in async
> copy function.
> Since this is async copy datapath, could it possible that eliminate the cost in
> calculation of segmented addresses?
> 
Yes, I believe async data path certainly brings new opportunity to apply more optimizations. 
However, at current time frame, settling down the overall async framework might be the priority. 

Thanks,

Patrick
Maxime Coquelin June 26, 2020, 2:39 p.m. UTC | #3
On 6/11/20 12:02 PM, patrick.fu@intel.com wrote:
> From: Patrick <patrick.fu@intel.com>
> 
> This patch implement async enqueue data path for split ring.

A bit more verbose commit message would be helpful since the cover
letter won't be in the git history. Duplicating relevant parts from
the cover letter would be sufficient.

> Signed-off-by: Patrick <patrick.fu@intel.com>
> ---
>  lib/librte_vhost/rte_vhost_async.h |  38 +++
>  lib/librte_vhost/virtio_net.c      | 538 ++++++++++++++++++++++++++++++++++++-
>  2 files changed, 574 insertions(+), 2 deletions(-)
> 
> diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
> index 82f2ebe..efcba0a 100644
> --- a/lib/librte_vhost/rte_vhost_async.h
> +++ b/lib/librte_vhost/rte_vhost_async.h
> @@ -131,4 +131,42 @@ int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
>   */
>  int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
>  
> +/**
> + * This function submit enqueue data to DMA. This function has no
> + * guranttee to the transfer completion upon return. Applications should
> + * poll transfer status by rte_vhost_poll_enqueue_completed()
> + *
> + * @param vid
> + *  id of vhost device to enqueue data
> + * @param queue_id
> + *  queue id to enqueue data
> + * @param pkts
> + *  array of packets to be enqueued
> + * @param count
> + *  packets num to be enqueued
> + * @return
> + *  num of packets enqueued
> + */
> +uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
> +		struct rte_mbuf **pkts, uint16_t count);
> +
> +/**
> + * This function check DMA completion status for a specific vhost
> + * device queue. Packets which finish copying (enqueue) operation
> + * will be returned in an array.
> + *
> + * @param vid
> + *  id of vhost device to enqueue data
> + * @param queue_id
> + *  queue id to enqueue data
> + * @param pkts
> + *  blank array to get return packet pointer
> + * @param count
> + *  size of the packet array
> + * @return
> + *  num of packets returned
> + */
> +uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> +		struct rte_mbuf **pkts, uint16_t count);
> +
>  #endif /* _RTE_VDPA_H_ */
> diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> index 751c1f3..cf9f884 100644
> --- a/lib/librte_vhost/virtio_net.c
> +++ b/lib/librte_vhost/virtio_net.c
> @@ -17,14 +17,15 @@
>  #include <rte_arp.h>
>  #include <rte_spinlock.h>
>  #include <rte_malloc.h>
> +#include <rte_vhost_async.h>
>  
>  #include "iotlb.h"
>  #include "vhost.h"
>  
> -#define MAX_PKT_BURST 32
> -
>  #define MAX_BATCH_LEN 256
>  
> +#define VHOST_ASYNC_BATCH_THRESHOLD 8
> +
>  static  __rte_always_inline bool
>  rxvq_is_mergeable(struct virtio_net *dev)
>  {
> @@ -117,6 +118,35 @@
>  }
>  
>  static __rte_always_inline void
> +async_flush_shadow_used_ring_split(struct virtio_net *dev,
> +	struct vhost_virtqueue *vq)
> +{
> +	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
> +
> +	if (used_idx + vq->shadow_used_idx <= vq->size) {
> +		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
> +					  vq->shadow_used_idx);
> +	} else {
> +		uint16_t size;
> +
> +		/* update used ring interval [used_idx, vq->size] */
> +		size = vq->size - used_idx;
> +		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
> +
> +		/* update the left half used ring interval [0, left_size] */
> +		do_flush_shadow_used_ring_split(dev, vq, 0, size,
> +					  vq->shadow_used_idx - size);
> +	}
> +	vq->last_used_idx += vq->shadow_used_idx;
> +
> +	rte_smp_wmb();
> +
> +	vhost_log_cache_sync(dev, vq);
> +
> +	vq->shadow_used_idx = 0;
> +}
> +
> +static __rte_always_inline void
>  update_shadow_used_ring_split(struct vhost_virtqueue *vq,
>  			 uint16_t desc_idx, uint32_t len)
>  {
> @@ -905,6 +935,199 @@
>  	return error;
>  }
>  
> +static __rte_always_inline void
> +async_fill_vec(struct iovec *v, void *base, size_t len)
> +{
> +	v->iov_base = base;
> +	v->iov_len = len;
> +}
> +
> +static __rte_always_inline void
> +async_fill_it(struct iov_it *it, size_t count,
> +	struct iovec *vec, unsigned long nr_seg)
> +{
> +	it->offset = 0;
> +	it->count = count;
> +
> +	if (count) {
> +		it->iov = vec;
> +		it->nr_segs = nr_seg;
> +	} else {
> +		it->iov = 0;
> +		it->nr_segs = 0;
> +	}
> +}
> +
> +static __rte_always_inline void
> +async_fill_des(struct dma_trans_desc *desc,
> +	struct iov_it *src, struct iov_it *dst)
> +{
> +	desc->src = src;
> +	desc->dst = dst;
> +}
> +
> +static __rte_always_inline int
> +async_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
> +			struct rte_mbuf *m, struct buf_vector *buf_vec,
> +			uint16_t nr_vec, uint16_t num_buffers,
> +			struct iovec *src_iovec, struct iovec *dst_iovec,
> +			struct iov_it *src_it, struct iov_it *dst_it)
> +{
> +	uint32_t vec_idx = 0;
> +	uint32_t mbuf_offset, mbuf_avail;
> +	uint32_t buf_offset, buf_avail;
> +	uint64_t buf_addr, buf_iova, buf_len;
> +	uint32_t cpy_len, cpy_threshold;
> +	uint64_t hdr_addr;
> +	struct rte_mbuf *hdr_mbuf;
> +	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
> +	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
> +	int error = 0;
> +
> +	uint32_t tlen = 0;
> +	int tvec_idx = 0;
> +	void *hpa;
> +
> +	if (unlikely(m == NULL)) {
> +		error = -1;
> +		goto out;
> +	}
> +
> +	cpy_threshold = vq->async_threshold;
> +
> +	buf_addr = buf_vec[vec_idx].buf_addr;
> +	buf_iova = buf_vec[vec_idx].buf_iova;
> +	buf_len = buf_vec[vec_idx].buf_len;
> +
> +	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1)) {
> +		error = -1;
> +		goto out;
> +	}
> +
> +	hdr_mbuf = m;
> +	hdr_addr = buf_addr;
> +	if (unlikely(buf_len < dev->vhost_hlen))
> +		hdr = &tmp_hdr;
> +	else
> +		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr;
> +
> +	VHOST_LOG_DATA(DEBUG, "(%d) RX: num merge buffers %d\n",
> +		dev->vid, num_buffers);
> +
> +	if (unlikely(buf_len < dev->vhost_hlen)) {
> +		buf_offset = dev->vhost_hlen - buf_len;
> +		vec_idx++;
> +		buf_addr = buf_vec[vec_idx].buf_addr;
> +		buf_iova = buf_vec[vec_idx].buf_iova;
> +		buf_len = buf_vec[vec_idx].buf_len;
> +		buf_avail = buf_len - buf_offset;
> +	} else {
> +		buf_offset = dev->vhost_hlen;
> +		buf_avail = buf_len - dev->vhost_hlen;
> +	}
> +
> +	mbuf_avail  = rte_pktmbuf_data_len(m);
> +	mbuf_offset = 0;
> +
> +	while (mbuf_avail != 0 || m->next != NULL) {
> +		/* done with current buf, get the next one */
> +		if (buf_avail == 0) {
> +			vec_idx++;
> +			if (unlikely(vec_idx >= nr_vec)) {
> +				error = -1;
> +				goto out;
> +			}
> +
> +			buf_addr = buf_vec[vec_idx].buf_addr;
> +			buf_iova = buf_vec[vec_idx].buf_iova;
> +			buf_len = buf_vec[vec_idx].buf_len;
> +
> +			buf_offset = 0;
> +			buf_avail  = buf_len;
> +		}
> +
> +		/* done with current mbuf, get the next one */
> +		if (mbuf_avail == 0) {
> +			m = m->next;
> +
> +			mbuf_offset = 0;
> +			mbuf_avail  = rte_pktmbuf_data_len(m);
> +		}
> +
> +		if (hdr_addr) {
> +			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
> +			if (rxvq_is_mergeable(dev))
> +				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
> +						num_buffers);
> +
> +			if (unlikely(hdr == &tmp_hdr)) {
> +				copy_vnet_hdr_to_desc(dev, vq, buf_vec, hdr);
> +			} else {
> +				PRINT_PACKET(dev, (uintptr_t)hdr_addr,
> +						dev->vhost_hlen, 0);
> +				vhost_log_cache_write_iova(dev, vq,
> +						buf_vec[0].buf_iova,
> +						dev->vhost_hlen);
> +			}
> +
> +			hdr_addr = 0;
> +		}
> +
> +		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
> +
> +		if (unlikely(cpy_len >= cpy_threshold)) {
> +			hpa = (void *)(uintptr_t)gpa_to_hpa(dev,
> +					buf_iova + buf_offset, cpy_len);
> +
> +			if (unlikely(!hpa)) {
> +				error = -1;
> +				goto out;
> +			}
> +
> +			async_fill_vec(src_iovec + tvec_idx,
> +				(void *)(uintptr_t)rte_pktmbuf_iova_offset(m,
> +						mbuf_offset), cpy_len);
> +
> +			async_fill_vec(dst_iovec + tvec_idx, hpa, cpy_len);
> +
> +			tlen += cpy_len;
> +			tvec_idx++;
> +		} else {
> +			if (unlikely(vq->batch_copy_nb_elems >= vq->size)) {
> +				rte_memcpy(
> +				(void *)((uintptr_t)(buf_addr + buf_offset)),
> +				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
> +				cpy_len);
> +
> +				PRINT_PACKET(dev,
> +					(uintptr_t)(buf_addr + buf_offset),
> +					cpy_len, 0);
> +			} else {
> +				batch_copy[vq->batch_copy_nb_elems].dst =
> +				(void *)((uintptr_t)(buf_addr + buf_offset));
> +				batch_copy[vq->batch_copy_nb_elems].src =
> +				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
> +				batch_copy[vq->batch_copy_nb_elems].log_addr =
> +					buf_iova + buf_offset;
> +				batch_copy[vq->batch_copy_nb_elems].len =
> +					cpy_len;
> +				vq->batch_copy_nb_elems++;
> +			}
> +		}
> +
> +		mbuf_avail  -= cpy_len;
> +		mbuf_offset += cpy_len;
> +		buf_avail  -= cpy_len;
> +		buf_offset += cpy_len;
> +	}
> +
> +out:
> +	async_fill_it(src_it, tlen, src_iovec, tvec_idx);
> +	async_fill_it(dst_it, tlen, dst_iovec, tvec_idx);
> +
> +	return error;
> +}
> +
>  static __rte_always_inline int
>  vhost_enqueue_single_packed(struct virtio_net *dev,
>  			    struct vhost_virtqueue *vq,
> @@ -1236,6 +1459,317 @@
>  	return virtio_dev_rx(dev, queue_id, pkts, count);
>  }
>  
> +static __rte_always_inline void
> +virtio_dev_rx_async_submit_split_err(struct virtio_net *dev,
> +	struct vhost_virtqueue *vq, uint16_t queue_id,
> +	uint16_t last_idx, uint16_t shadow_idx)
> +{
> +	while (vq->async_pkts_inflight_n) {
> +		int er = vq->async_ops.check_completed_copies(dev->vid,
> +			queue_id, 0, MAX_PKT_BURST);
> +
> +		if (er < 0) {
> +			vq->async_pkts_inflight_n = 0;
> +			break;
> +		}
> +
> +		vq->async_pkts_inflight_n -= er;
> +	}
> +
> +	vq->shadow_used_idx = shadow_idx;
> +	vq->last_avail_idx = last_idx;
> +}
> +
> +static __rte_noinline uint32_t
> +virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> +	struct vhost_virtqueue *vq, uint16_t queue_id,
> +	struct rte_mbuf **pkts, uint32_t count)
> +{
> +	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
> +	uint16_t num_buffers;
> +	struct buf_vector buf_vec[BUF_VECTOR_MAX];
> +	uint16_t avail_head, last_idx, shadow_idx;
> +
> +	struct iov_it *it_pool = vq->it_pool;
> +	struct iovec *vec_pool = vq->vec_pool;
> +	struct dma_trans_desc tdes[MAX_PKT_BURST];
> +	struct iovec *src_iovec = vec_pool;
> +	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
> +	struct iov_it *src_it = it_pool;
> +	struct iov_it *dst_it = it_pool + 1;
> +	uint16_t n_free_slot, slot_idx;
> +	int n_pkts = 0;
> +
> +	avail_head = *((volatile uint16_t *)&vq->avail->idx);
> +	last_idx = vq->last_avail_idx;
> +	shadow_idx = vq->shadow_used_idx;
> +
> +	/*
> +	 * The ordering between avail index and
> +	 * desc reads needs to be enforced.
> +	 */
> +	rte_smp_rmb();
> +
> +	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
> +
> +	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
> +		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
> +		uint16_t nr_vec = 0;
> +
> +		if (unlikely(reserve_avail_buf_split(dev, vq,
> +						pkt_len, buf_vec, &num_buffers,
> +						avail_head, &nr_vec) < 0)) {
> +			VHOST_LOG_DATA(DEBUG,
> +				"(%d) failed to get enough desc from vring\n",
> +				dev->vid);
> +			vq->shadow_used_idx -= num_buffers;
> +			break;
> +		}
> +
> +		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
> +			dev->vid, vq->last_avail_idx,
> +			vq->last_avail_idx + num_buffers);
> +
> +		if (async_mbuf_to_desc(dev, vq, pkts[pkt_idx],
> +				buf_vec, nr_vec, num_buffers,
> +				src_iovec, dst_iovec, src_it, dst_it) < 0) {
> +			vq->shadow_used_idx -= num_buffers;
> +			break;
> +		}
> +
> +		slot_idx = (vq->async_pkts_idx + pkt_idx) & (vq->size - 1);
> +		if (src_it->count) {
> +			async_fill_des(&tdes[pkt_burst_idx], src_it, dst_it);
> +			pkt_burst_idx++;
> +			vq->async_pending_info[slot_idx] =
> +				num_buffers | (src_it->nr_segs << 16);
> +			src_iovec += src_it->nr_segs;
> +			dst_iovec += dst_it->nr_segs;
> +			src_it += 2;
> +			dst_it += 2;
> +		} else {
> +			vq->async_pending_info[slot_idx] = num_buffers;
> +			vq->async_pkts_inflight_n++;
> +		}
> +
> +		vq->last_avail_idx += num_buffers;
> +
> +		if (pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
> +				(pkt_idx == count - 1 && pkt_burst_idx)) {
> +			n_pkts = vq->async_ops.transfer_data(dev->vid,
> +					queue_id, tdes, 0, pkt_burst_idx);
> +			src_iovec = vec_pool;
> +			dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
> +			src_it = it_pool;
> +			dst_it = it_pool + 1;
> +
> +			if (unlikely(n_pkts < (int)pkt_burst_idx)) {
> +				vq->async_pkts_inflight_n +=
> +					n_pkts > 0 ? n_pkts : 0;
> +				virtio_dev_rx_async_submit_split_err(dev,
> +					vq, queue_id, last_idx, shadow_idx);
> +				return 0;
> +			}
> +
> +			pkt_burst_idx = 0;
> +			vq->async_pkts_inflight_n += n_pkts;
> +		}
> +	}
> +
> +	if (pkt_burst_idx) {
> +		n_pkts = vq->async_ops.transfer_data(dev->vid,
> +				queue_id, tdes, 0, pkt_burst_idx);
> +		if (unlikely(n_pkts <= (int)pkt_burst_idx)) {
> +			vq->async_pkts_inflight_n += n_pkts > 0 ? n_pkts : 0;
> +			virtio_dev_rx_async_submit_split_err(dev, vq, queue_id,
> +			last_idx, shadow_idx);
> +			return 0;
> +		}
> +
> +		vq->async_pkts_inflight_n += n_pkts;
> +	}
> +
> +	do_data_copy_enqueue(dev, vq);
> +
> +	n_free_slot = vq->size - vq->async_pkts_idx;
> +	if (n_free_slot > pkt_idx) {
> +		rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
> +			pkts, pkt_idx * sizeof(uintptr_t));
> +		vq->async_pkts_idx += pkt_idx;
> +	} else {
> +		rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
> +			pkts, n_free_slot * sizeof(uintptr_t));
> +		rte_memcpy(&vq->async_pkts_pending[0],
> +			&pkts[n_free_slot],
> +			(pkt_idx - n_free_slot) * sizeof(uintptr_t));
> +		vq->async_pkts_idx = pkt_idx - n_free_slot;
> +	}
> +
> +	if (likely(vq->shadow_used_idx))
> +		async_flush_shadow_used_ring_split(dev, vq);
> +
> +	return pkt_idx;
> +}
> +
> +uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> +		struct rte_mbuf **pkts, uint16_t count)
> +{
> +	struct virtio_net *dev = get_device(vid);
> +	struct vhost_virtqueue *vq;
> +	uint16_t n_pkts_cpl, n_pkts_put = 0, n_descs = 0;
> +	uint16_t start_idx, pkts_idx, vq_size;
> +	uint64_t *async_pending_info;
> +
> +	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
> +	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
> +		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
> +			dev->vid, __func__, queue_id);
> +		return 0;
> +	}
> +
> +	vq = dev->virtqueue[queue_id];
> +
> +	rte_spinlock_lock(&vq->access_lock);
> +
> +	pkts_idx = vq->async_pkts_idx;
> +	async_pending_info = vq->async_pending_info;
> +	vq_size = vq->size;
> +	start_idx = pkts_idx > vq->async_pkts_inflight_n ?
> +		pkts_idx - vq->async_pkts_inflight_n :
> +		(vq_size - vq->async_pkts_inflight_n + pkts_idx) &
> +		(vq_size - 1);
> +
> +	n_pkts_cpl =
> +		vq->async_ops.check_completed_copies(vid, queue_id, 0, count);
> +
> +	rte_smp_wmb();
> +
> +	while (likely(((start_idx + n_pkts_put) & (vq_size - 1)) != pkts_idx)) {
> +		uint64_t info = async_pending_info[
> +			(start_idx + n_pkts_put) & (vq_size - 1)];
> +		uint64_t n_segs;
> +		n_pkts_put++;
> +		n_descs += info & ASYNC_PENDING_INFO_N_MSK;
> +		n_segs = info >> ASYNC_PENDING_INFO_N_SFT;
> +
> +		if (n_segs) {
> +			if (!n_pkts_cpl || n_pkts_cpl < n_segs) {
> +				n_pkts_put--;
> +				n_descs -= info & ASYNC_PENDING_INFO_N_MSK;
> +				if (n_pkts_cpl) {
> +					async_pending_info[
> +						(start_idx + n_pkts_put) &
> +						(vq_size - 1)] =
> +					((n_segs - n_pkts_cpl) <<
> +					 ASYNC_PENDING_INFO_N_SFT) |
> +					(info & ASYNC_PENDING_INFO_N_MSK);
> +					n_pkts_cpl = 0;
> +				}
> +				break;
> +			}
> +			n_pkts_cpl -= n_segs;
> +		}
> +	}
> +
> +	if (n_pkts_put) {
> +		vq->async_pkts_inflight_n -= n_pkts_put;
> +		*(volatile uint16_t *)&vq->used->idx += n_descs;
> +
> +		vhost_vring_call_split(dev, vq);
> +	}
> +
> +	if (start_idx + n_pkts_put <= vq_size) {
> +		rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
> +			n_pkts_put * sizeof(uintptr_t));
> +	} else {
> +		rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
> +			(vq_size - start_idx) * sizeof(uintptr_t));
> +		rte_memcpy(&pkts[vq_size - start_idx], vq->async_pkts_pending,
> +			(n_pkts_put - vq_size + start_idx) * sizeof(uintptr_t));
> +	}
> +
> +	rte_spinlock_unlock(&vq->access_lock);
> +
> +	return n_pkts_put;
> +}
> +
> +static __rte_always_inline uint32_t
> +virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
> +	struct rte_mbuf **pkts, uint32_t count)
> +{
> +	struct vhost_virtqueue *vq;
> +	uint32_t nb_tx = 0;
> +	bool drawback = false;
> +
> +	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
> +	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
> +		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
> +			dev->vid, __func__, queue_id);
> +		return 0;
> +	}
> +
> +	vq = dev->virtqueue[queue_id];
> +
> +	rte_spinlock_lock(&vq->access_lock);
> +
> +	if (unlikely(vq->enabled == 0))
> +		goto out_access_unlock;
> +
> +	if (unlikely(!vq->async_registered)) {
> +		drawback = true;
fallback instead?
> +		goto out_access_unlock;
> +	}
> +
> +	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
> +		vhost_user_iotlb_rd_lock(vq);
> +
> +	if (unlikely(vq->access_ok == 0))
> +		if (unlikely(vring_translate(dev, vq) < 0))
> +			goto out;
> +
> +	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
> +	if (count == 0)
> +		goto out;
> +
> +	/* TODO: packed queue not implemented */
> +	if (vq_is_packed(dev))
> +		nb_tx = 0;
> +	else
> +		nb_tx = virtio_dev_rx_async_submit_split(dev,
> +				vq, queue_id, pkts, count);
> +
> +out:
> +	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
> +		vhost_user_iotlb_rd_unlock(vq);
> +
> +out_access_unlock:
> +	rte_spinlock_unlock(&vq->access_lock);
> +
> +	if (drawback)
> +		return rte_vhost_enqueue_burst(dev->vid, queue_id, pkts, count);
> +
> +	return nb_tx;
> +}
> +
> +uint16_t
> +rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
> +		struct rte_mbuf **pkts, uint16_t count)
> +{
> +	struct virtio_net *dev = get_device(vid);
> +
> +	if (!dev)
> +		return 0;
> +
> +	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
> +		VHOST_LOG_DATA(ERR,
> +			"(%d) %s: built-in vhost net backend is disabled.\n",
> +			dev->vid, __func__);
> +		return 0;
> +	}
> +
> +	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count);
> +}
> +
>  static inline bool
>  virtio_net_with_host_offload(struct virtio_net *dev)
>  {
>
Maxime Coquelin June 26, 2020, 2:46 p.m. UTC | #4
On 6/11/20 12:02 PM, patrick.fu@intel.com wrote:
> From: Patrick <patrick.fu@intel.com>
> 
> This patch implement async enqueue data path for split ring.
> 
> Signed-off-by: Patrick <patrick.fu@intel.com>
> ---
>  lib/librte_vhost/rte_vhost_async.h |  38 +++
>  lib/librte_vhost/virtio_net.c      | 538 ++++++++++++++++++++++++++++++++++++-
>  2 files changed, 574 insertions(+), 2 deletions(-)
> 
> diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
> index 82f2ebe..efcba0a 100644
> --- a/lib/librte_vhost/rte_vhost_async.h
> +++ b/lib/librte_vhost/rte_vhost_async.h
> @@ -131,4 +131,42 @@ int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
>   */
>  int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
>  
> +/**
> + * This function submit enqueue data to DMA. This function has no
> + * guranttee to the transfer completion upon return. Applications should
> + * poll transfer status by rte_vhost_poll_enqueue_completed()
> + *
> + * @param vid
> + *  id of vhost device to enqueue data
> + * @param queue_id
> + *  queue id to enqueue data
> + * @param pkts
> + *  array of packets to be enqueued
> + * @param count
> + *  packets num to be enqueued
> + * @return
> + *  num of packets enqueued
> + */
> +uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
> +		struct rte_mbuf **pkts, uint16_t count);
> +
> +/**
> + * This function check DMA completion status for a specific vhost
> + * device queue. Packets which finish copying (enqueue) operation
> + * will be returned in an array.
> + *
> + * @param vid
> + *  id of vhost device to enqueue data
> + * @param queue_id
> + *  queue id to enqueue data
> + * @param pkts
> + *  blank array to get return packet pointer
> + * @param count
> + *  size of the packet array
> + * @return
> + *  num of packets returned
> + */
> +uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> +		struct rte_mbuf **pkts, uint16_t count);
> +

These new APIs need to be tagged as experimental. We'll need a few
releases before considering them stable.

You need to add them to rte_vhost_version.map too.

>  #endif /* _RTE_VDPA_H_ */
You need to fix the comment here (/* _RTE_VHOST_ASYNC_H_ */)
Patrick Fu June 29, 2020, 1:25 a.m. UTC | #5
Hi Maxime,

> -----Original Message-----
> From: Maxime Coquelin <maxime.coquelin@redhat.com>
> Sent: Friday, June 26, 2020 10:46 PM
> To: Fu, Patrick <patrick.fu@intel.com>; dev@dpdk.org; Xia, Chenbo
> <chenbo.xia@intel.com>; Wang, Zhihong <zhihong.wang@intel.com>; Ye,
> Xiaolong <xiaolong.ye@intel.com>
> Cc: Jiang, Cheng1 <cheng1.jiang@intel.com>; Liang, Cunming
> <cunming.liang@intel.com>
> Subject: Re: [PATCH v1 2/2] vhost: introduce async enqueue for split ring
> 
> 
> 
> On 6/11/20 12:02 PM, patrick.fu@intel.com wrote:
> > From: Patrick <patrick.fu@intel.com>
> >
> > This patch implement async enqueue data path for split ring.
> >
> > Signed-off-by: Patrick <patrick.fu@intel.com>
> > ---
> >  lib/librte_vhost/rte_vhost_async.h |  38 +++
> >  lib/librte_vhost/virtio_net.c      | 538
> ++++++++++++++++++++++++++++++++++++-
> >  2 files changed, 574 insertions(+), 2 deletions(-)
> >
> > diff --git a/lib/librte_vhost/rte_vhost_async.h
> > b/lib/librte_vhost/rte_vhost_async.h
> > index 82f2ebe..efcba0a 100644
> > --- a/lib/librte_vhost/rte_vhost_async.h
> > +++ b/lib/librte_vhost/rte_vhost_async.h
> > @@ -131,4 +131,42 @@ int rte_vhost_async_channel_register(int vid,
> uint16_t queue_id,
> >   */
> >  int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
> >
> > +/**
> > + * This function submit enqueue data to DMA. This function has no
> > + * guranttee to the transfer completion upon return. Applications
> > +should
> > + * poll transfer status by rte_vhost_poll_enqueue_completed()
> > + *
> > + * @param vid
> > + *  id of vhost device to enqueue data
> > + * @param queue_id
> > + *  queue id to enqueue data
> > + * @param pkts
> > + *  array of packets to be enqueued
> > + * @param count
> > + *  packets num to be enqueued
> > + * @return
> > + *  num of packets enqueued
> > + */
> > +uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
> > +		struct rte_mbuf **pkts, uint16_t count);
> > +
> > +/**
> > + * This function check DMA completion status for a specific vhost
> > + * device queue. Packets which finish copying (enqueue) operation
> > + * will be returned in an array.
> > + *
> > + * @param vid
> > + *  id of vhost device to enqueue data
> > + * @param queue_id
> > + *  queue id to enqueue data
> > + * @param pkts
> > + *  blank array to get return packet pointer
> > + * @param count
> > + *  size of the packet array
> > + * @return
> > + *  num of packets returned
> > + */
> > +uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> > +		struct rte_mbuf **pkts, uint16_t count);
> > +
> 
> These new APIs need to be tagged as experimental. We'll need a few releases
> before considering them stable.
> 
> You need to add them to rte_vhost_version.map too.
> 
> >  #endif /* _RTE_VDPA_H_ */
> You need to fix the comment here (/* _RTE_VHOST_ASYNC_H_ */)

I will update in the v2 version

Thanks,

Patrick
diff mbox series

Patch

diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
index 82f2ebe..efcba0a 100644
--- a/lib/librte_vhost/rte_vhost_async.h
+++ b/lib/librte_vhost/rte_vhost_async.h
@@ -131,4 +131,42 @@  int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
  */
 int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
 
+/**
+ * This function submit enqueue data to DMA. This function has no
+ * guranttee to the transfer completion upon return. Applications should
+ * poll transfer status by rte_vhost_poll_enqueue_completed()
+ *
+ * @param vid
+ *  id of vhost device to enqueue data
+ * @param queue_id
+ *  queue id to enqueue data
+ * @param pkts
+ *  array of packets to be enqueued
+ * @param count
+ *  packets num to be enqueued
+ * @return
+ *  num of packets enqueued
+ */
+uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint16_t count);
+
+/**
+ * This function check DMA completion status for a specific vhost
+ * device queue. Packets which finish copying (enqueue) operation
+ * will be returned in an array.
+ *
+ * @param vid
+ *  id of vhost device to enqueue data
+ * @param queue_id
+ *  queue id to enqueue data
+ * @param pkts
+ *  blank array to get return packet pointer
+ * @param count
+ *  size of the packet array
+ * @return
+ *  num of packets returned
+ */
+uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint16_t count);
+
 #endif /* _RTE_VDPA_H_ */
diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index 751c1f3..cf9f884 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -17,14 +17,15 @@ 
 #include <rte_arp.h>
 #include <rte_spinlock.h>
 #include <rte_malloc.h>
+#include <rte_vhost_async.h>
 
 #include "iotlb.h"
 #include "vhost.h"
 
-#define MAX_PKT_BURST 32
-
 #define MAX_BATCH_LEN 256
 
+#define VHOST_ASYNC_BATCH_THRESHOLD 8
+
 static  __rte_always_inline bool
 rxvq_is_mergeable(struct virtio_net *dev)
 {
@@ -117,6 +118,35 @@ 
 }
 
 static __rte_always_inline void
+async_flush_shadow_used_ring_split(struct virtio_net *dev,
+	struct vhost_virtqueue *vq)
+{
+	uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
+
+	if (used_idx + vq->shadow_used_idx <= vq->size) {
+		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
+					  vq->shadow_used_idx);
+	} else {
+		uint16_t size;
+
+		/* update used ring interval [used_idx, vq->size] */
+		size = vq->size - used_idx;
+		do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
+
+		/* update the left half used ring interval [0, left_size] */
+		do_flush_shadow_used_ring_split(dev, vq, 0, size,
+					  vq->shadow_used_idx - size);
+	}
+	vq->last_used_idx += vq->shadow_used_idx;
+
+	rte_smp_wmb();
+
+	vhost_log_cache_sync(dev, vq);
+
+	vq->shadow_used_idx = 0;
+}
+
+static __rte_always_inline void
 update_shadow_used_ring_split(struct vhost_virtqueue *vq,
 			 uint16_t desc_idx, uint32_t len)
 {
@@ -905,6 +935,199 @@ 
 	return error;
 }
 
+static __rte_always_inline void
+async_fill_vec(struct iovec *v, void *base, size_t len)
+{
+	v->iov_base = base;
+	v->iov_len = len;
+}
+
+static __rte_always_inline void
+async_fill_it(struct iov_it *it, size_t count,
+	struct iovec *vec, unsigned long nr_seg)
+{
+	it->offset = 0;
+	it->count = count;
+
+	if (count) {
+		it->iov = vec;
+		it->nr_segs = nr_seg;
+	} else {
+		it->iov = 0;
+		it->nr_segs = 0;
+	}
+}
+
+static __rte_always_inline void
+async_fill_des(struct dma_trans_desc *desc,
+	struct iov_it *src, struct iov_it *dst)
+{
+	desc->src = src;
+	desc->dst = dst;
+}
+
+static __rte_always_inline int
+async_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
+			struct rte_mbuf *m, struct buf_vector *buf_vec,
+			uint16_t nr_vec, uint16_t num_buffers,
+			struct iovec *src_iovec, struct iovec *dst_iovec,
+			struct iov_it *src_it, struct iov_it *dst_it)
+{
+	uint32_t vec_idx = 0;
+	uint32_t mbuf_offset, mbuf_avail;
+	uint32_t buf_offset, buf_avail;
+	uint64_t buf_addr, buf_iova, buf_len;
+	uint32_t cpy_len, cpy_threshold;
+	uint64_t hdr_addr;
+	struct rte_mbuf *hdr_mbuf;
+	struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
+	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
+	int error = 0;
+
+	uint32_t tlen = 0;
+	int tvec_idx = 0;
+	void *hpa;
+
+	if (unlikely(m == NULL)) {
+		error = -1;
+		goto out;
+	}
+
+	cpy_threshold = vq->async_threshold;
+
+	buf_addr = buf_vec[vec_idx].buf_addr;
+	buf_iova = buf_vec[vec_idx].buf_iova;
+	buf_len = buf_vec[vec_idx].buf_len;
+
+	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1)) {
+		error = -1;
+		goto out;
+	}
+
+	hdr_mbuf = m;
+	hdr_addr = buf_addr;
+	if (unlikely(buf_len < dev->vhost_hlen))
+		hdr = &tmp_hdr;
+	else
+		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr;
+
+	VHOST_LOG_DATA(DEBUG, "(%d) RX: num merge buffers %d\n",
+		dev->vid, num_buffers);
+
+	if (unlikely(buf_len < dev->vhost_hlen)) {
+		buf_offset = dev->vhost_hlen - buf_len;
+		vec_idx++;
+		buf_addr = buf_vec[vec_idx].buf_addr;
+		buf_iova = buf_vec[vec_idx].buf_iova;
+		buf_len = buf_vec[vec_idx].buf_len;
+		buf_avail = buf_len - buf_offset;
+	} else {
+		buf_offset = dev->vhost_hlen;
+		buf_avail = buf_len - dev->vhost_hlen;
+	}
+
+	mbuf_avail  = rte_pktmbuf_data_len(m);
+	mbuf_offset = 0;
+
+	while (mbuf_avail != 0 || m->next != NULL) {
+		/* done with current buf, get the next one */
+		if (buf_avail == 0) {
+			vec_idx++;
+			if (unlikely(vec_idx >= nr_vec)) {
+				error = -1;
+				goto out;
+			}
+
+			buf_addr = buf_vec[vec_idx].buf_addr;
+			buf_iova = buf_vec[vec_idx].buf_iova;
+			buf_len = buf_vec[vec_idx].buf_len;
+
+			buf_offset = 0;
+			buf_avail  = buf_len;
+		}
+
+		/* done with current mbuf, get the next one */
+		if (mbuf_avail == 0) {
+			m = m->next;
+
+			mbuf_offset = 0;
+			mbuf_avail  = rte_pktmbuf_data_len(m);
+		}
+
+		if (hdr_addr) {
+			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
+			if (rxvq_is_mergeable(dev))
+				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
+						num_buffers);
+
+			if (unlikely(hdr == &tmp_hdr)) {
+				copy_vnet_hdr_to_desc(dev, vq, buf_vec, hdr);
+			} else {
+				PRINT_PACKET(dev, (uintptr_t)hdr_addr,
+						dev->vhost_hlen, 0);
+				vhost_log_cache_write_iova(dev, vq,
+						buf_vec[0].buf_iova,
+						dev->vhost_hlen);
+			}
+
+			hdr_addr = 0;
+		}
+
+		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
+
+		if (unlikely(cpy_len >= cpy_threshold)) {
+			hpa = (void *)(uintptr_t)gpa_to_hpa(dev,
+					buf_iova + buf_offset, cpy_len);
+
+			if (unlikely(!hpa)) {
+				error = -1;
+				goto out;
+			}
+
+			async_fill_vec(src_iovec + tvec_idx,
+				(void *)(uintptr_t)rte_pktmbuf_iova_offset(m,
+						mbuf_offset), cpy_len);
+
+			async_fill_vec(dst_iovec + tvec_idx, hpa, cpy_len);
+
+			tlen += cpy_len;
+			tvec_idx++;
+		} else {
+			if (unlikely(vq->batch_copy_nb_elems >= vq->size)) {
+				rte_memcpy(
+				(void *)((uintptr_t)(buf_addr + buf_offset)),
+				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
+				cpy_len);
+
+				PRINT_PACKET(dev,
+					(uintptr_t)(buf_addr + buf_offset),
+					cpy_len, 0);
+			} else {
+				batch_copy[vq->batch_copy_nb_elems].dst =
+				(void *)((uintptr_t)(buf_addr + buf_offset));
+				batch_copy[vq->batch_copy_nb_elems].src =
+				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
+				batch_copy[vq->batch_copy_nb_elems].log_addr =
+					buf_iova + buf_offset;
+				batch_copy[vq->batch_copy_nb_elems].len =
+					cpy_len;
+				vq->batch_copy_nb_elems++;
+			}
+		}
+
+		mbuf_avail  -= cpy_len;
+		mbuf_offset += cpy_len;
+		buf_avail  -= cpy_len;
+		buf_offset += cpy_len;
+	}
+
+out:
+	async_fill_it(src_it, tlen, src_iovec, tvec_idx);
+	async_fill_it(dst_it, tlen, dst_iovec, tvec_idx);
+
+	return error;
+}
+
 static __rte_always_inline int
 vhost_enqueue_single_packed(struct virtio_net *dev,
 			    struct vhost_virtqueue *vq,
@@ -1236,6 +1459,317 @@ 
 	return virtio_dev_rx(dev, queue_id, pkts, count);
 }
 
+static __rte_always_inline void
+virtio_dev_rx_async_submit_split_err(struct virtio_net *dev,
+	struct vhost_virtqueue *vq, uint16_t queue_id,
+	uint16_t last_idx, uint16_t shadow_idx)
+{
+	while (vq->async_pkts_inflight_n) {
+		int er = vq->async_ops.check_completed_copies(dev->vid,
+			queue_id, 0, MAX_PKT_BURST);
+
+		if (er < 0) {
+			vq->async_pkts_inflight_n = 0;
+			break;
+		}
+
+		vq->async_pkts_inflight_n -= er;
+	}
+
+	vq->shadow_used_idx = shadow_idx;
+	vq->last_avail_idx = last_idx;
+}
+
+static __rte_noinline uint32_t
+virtio_dev_rx_async_submit_split(struct virtio_net *dev,
+	struct vhost_virtqueue *vq, uint16_t queue_id,
+	struct rte_mbuf **pkts, uint32_t count)
+{
+	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
+	uint16_t num_buffers;
+	struct buf_vector buf_vec[BUF_VECTOR_MAX];
+	uint16_t avail_head, last_idx, shadow_idx;
+
+	struct iov_it *it_pool = vq->it_pool;
+	struct iovec *vec_pool = vq->vec_pool;
+	struct dma_trans_desc tdes[MAX_PKT_BURST];
+	struct iovec *src_iovec = vec_pool;
+	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
+	struct iov_it *src_it = it_pool;
+	struct iov_it *dst_it = it_pool + 1;
+	uint16_t n_free_slot, slot_idx;
+	int n_pkts = 0;
+
+	avail_head = *((volatile uint16_t *)&vq->avail->idx);
+	last_idx = vq->last_avail_idx;
+	shadow_idx = vq->shadow_used_idx;
+
+	/*
+	 * The ordering between avail index and
+	 * desc reads needs to be enforced.
+	 */
+	rte_smp_rmb();
+
+	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
+
+	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
+		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
+		uint16_t nr_vec = 0;
+
+		if (unlikely(reserve_avail_buf_split(dev, vq,
+						pkt_len, buf_vec, &num_buffers,
+						avail_head, &nr_vec) < 0)) {
+			VHOST_LOG_DATA(DEBUG,
+				"(%d) failed to get enough desc from vring\n",
+				dev->vid);
+			vq->shadow_used_idx -= num_buffers;
+			break;
+		}
+
+		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
+			dev->vid, vq->last_avail_idx,
+			vq->last_avail_idx + num_buffers);
+
+		if (async_mbuf_to_desc(dev, vq, pkts[pkt_idx],
+				buf_vec, nr_vec, num_buffers,
+				src_iovec, dst_iovec, src_it, dst_it) < 0) {
+			vq->shadow_used_idx -= num_buffers;
+			break;
+		}
+
+		slot_idx = (vq->async_pkts_idx + pkt_idx) & (vq->size - 1);
+		if (src_it->count) {
+			async_fill_des(&tdes[pkt_burst_idx], src_it, dst_it);
+			pkt_burst_idx++;
+			vq->async_pending_info[slot_idx] =
+				num_buffers | (src_it->nr_segs << 16);
+			src_iovec += src_it->nr_segs;
+			dst_iovec += dst_it->nr_segs;
+			src_it += 2;
+			dst_it += 2;
+		} else {
+			vq->async_pending_info[slot_idx] = num_buffers;
+			vq->async_pkts_inflight_n++;
+		}
+
+		vq->last_avail_idx += num_buffers;
+
+		if (pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
+				(pkt_idx == count - 1 && pkt_burst_idx)) {
+			n_pkts = vq->async_ops.transfer_data(dev->vid,
+					queue_id, tdes, 0, pkt_burst_idx);
+			src_iovec = vec_pool;
+			dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
+			src_it = it_pool;
+			dst_it = it_pool + 1;
+
+			if (unlikely(n_pkts < (int)pkt_burst_idx)) {
+				vq->async_pkts_inflight_n +=
+					n_pkts > 0 ? n_pkts : 0;
+				virtio_dev_rx_async_submit_split_err(dev,
+					vq, queue_id, last_idx, shadow_idx);
+				return 0;
+			}
+
+			pkt_burst_idx = 0;
+			vq->async_pkts_inflight_n += n_pkts;
+		}
+	}
+
+	if (pkt_burst_idx) {
+		n_pkts = vq->async_ops.transfer_data(dev->vid,
+				queue_id, tdes, 0, pkt_burst_idx);
+		if (unlikely(n_pkts <= (int)pkt_burst_idx)) {
+			vq->async_pkts_inflight_n += n_pkts > 0 ? n_pkts : 0;
+			virtio_dev_rx_async_submit_split_err(dev, vq, queue_id,
+			last_idx, shadow_idx);
+			return 0;
+		}
+
+		vq->async_pkts_inflight_n += n_pkts;
+	}
+
+	do_data_copy_enqueue(dev, vq);
+
+	n_free_slot = vq->size - vq->async_pkts_idx;
+	if (n_free_slot > pkt_idx) {
+		rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
+			pkts, pkt_idx * sizeof(uintptr_t));
+		vq->async_pkts_idx += pkt_idx;
+	} else {
+		rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
+			pkts, n_free_slot * sizeof(uintptr_t));
+		rte_memcpy(&vq->async_pkts_pending[0],
+			&pkts[n_free_slot],
+			(pkt_idx - n_free_slot) * sizeof(uintptr_t));
+		vq->async_pkts_idx = pkt_idx - n_free_slot;
+	}
+
+	if (likely(vq->shadow_used_idx))
+		async_flush_shadow_used_ring_split(dev, vq);
+
+	return pkt_idx;
+}
+
+uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint16_t count)
+{
+	struct virtio_net *dev = get_device(vid);
+	struct vhost_virtqueue *vq;
+	uint16_t n_pkts_cpl, n_pkts_put = 0, n_descs = 0;
+	uint16_t start_idx, pkts_idx, vq_size;
+	uint64_t *async_pending_info;
+
+	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
+	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
+		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
+			dev->vid, __func__, queue_id);
+		return 0;
+	}
+
+	vq = dev->virtqueue[queue_id];
+
+	rte_spinlock_lock(&vq->access_lock);
+
+	pkts_idx = vq->async_pkts_idx;
+	async_pending_info = vq->async_pending_info;
+	vq_size = vq->size;
+	start_idx = pkts_idx > vq->async_pkts_inflight_n ?
+		pkts_idx - vq->async_pkts_inflight_n :
+		(vq_size - vq->async_pkts_inflight_n + pkts_idx) &
+		(vq_size - 1);
+
+	n_pkts_cpl =
+		vq->async_ops.check_completed_copies(vid, queue_id, 0, count);
+
+	rte_smp_wmb();
+
+	while (likely(((start_idx + n_pkts_put) & (vq_size - 1)) != pkts_idx)) {
+		uint64_t info = async_pending_info[
+			(start_idx + n_pkts_put) & (vq_size - 1)];
+		uint64_t n_segs;
+		n_pkts_put++;
+		n_descs += info & ASYNC_PENDING_INFO_N_MSK;
+		n_segs = info >> ASYNC_PENDING_INFO_N_SFT;
+
+		if (n_segs) {
+			if (!n_pkts_cpl || n_pkts_cpl < n_segs) {
+				n_pkts_put--;
+				n_descs -= info & ASYNC_PENDING_INFO_N_MSK;
+				if (n_pkts_cpl) {
+					async_pending_info[
+						(start_idx + n_pkts_put) &
+						(vq_size - 1)] =
+					((n_segs - n_pkts_cpl) <<
+					 ASYNC_PENDING_INFO_N_SFT) |
+					(info & ASYNC_PENDING_INFO_N_MSK);
+					n_pkts_cpl = 0;
+				}
+				break;
+			}
+			n_pkts_cpl -= n_segs;
+		}
+	}
+
+	if (n_pkts_put) {
+		vq->async_pkts_inflight_n -= n_pkts_put;
+		*(volatile uint16_t *)&vq->used->idx += n_descs;
+
+		vhost_vring_call_split(dev, vq);
+	}
+
+	if (start_idx + n_pkts_put <= vq_size) {
+		rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
+			n_pkts_put * sizeof(uintptr_t));
+	} else {
+		rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
+			(vq_size - start_idx) * sizeof(uintptr_t));
+		rte_memcpy(&pkts[vq_size - start_idx], vq->async_pkts_pending,
+			(n_pkts_put - vq_size + start_idx) * sizeof(uintptr_t));
+	}
+
+	rte_spinlock_unlock(&vq->access_lock);
+
+	return n_pkts_put;
+}
+
+static __rte_always_inline uint32_t
+virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
+	struct rte_mbuf **pkts, uint32_t count)
+{
+	struct vhost_virtqueue *vq;
+	uint32_t nb_tx = 0;
+	bool drawback = false;
+
+	VHOST_LOG_DATA(DEBUG, "(%d) %s\n", dev->vid, __func__);
+	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
+		VHOST_LOG_DATA(ERR, "(%d) %s: invalid virtqueue idx %d.\n",
+			dev->vid, __func__, queue_id);
+		return 0;
+	}
+
+	vq = dev->virtqueue[queue_id];
+
+	rte_spinlock_lock(&vq->access_lock);
+
+	if (unlikely(vq->enabled == 0))
+		goto out_access_unlock;
+
+	if (unlikely(!vq->async_registered)) {
+		drawback = true;
+		goto out_access_unlock;
+	}
+
+	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
+		vhost_user_iotlb_rd_lock(vq);
+
+	if (unlikely(vq->access_ok == 0))
+		if (unlikely(vring_translate(dev, vq) < 0))
+			goto out;
+
+	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
+	if (count == 0)
+		goto out;
+
+	/* TODO: packed queue not implemented */
+	if (vq_is_packed(dev))
+		nb_tx = 0;
+	else
+		nb_tx = virtio_dev_rx_async_submit_split(dev,
+				vq, queue_id, pkts, count);
+
+out:
+	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
+		vhost_user_iotlb_rd_unlock(vq);
+
+out_access_unlock:
+	rte_spinlock_unlock(&vq->access_lock);
+
+	if (drawback)
+		return rte_vhost_enqueue_burst(dev->vid, queue_id, pkts, count);
+
+	return nb_tx;
+}
+
+uint16_t
+rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint16_t count)
+{
+	struct virtio_net *dev = get_device(vid);
+
+	if (!dev)
+		return 0;
+
+	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
+		VHOST_LOG_DATA(ERR,
+			"(%d) %s: built-in vhost net backend is disabled.\n",
+			dev->vid, __func__);
+		return 0;
+	}
+
+	return virtio_dev_rx_async_submit(dev, queue_id, pkts, count);
+}
+
 static inline bool
 virtio_net_with_host_offload(struct virtio_net *dev)
 {