diff mbox series

[v3] vhost: add support for packed ring in async vhost

Message ID 20210331140629.45066-1-Cheng1.jiang@intel.com (mailing list archive)
State Superseded
Delegated to: Maxime Coquelin
Headers show
Series [v3] vhost: add support for packed ring in async vhost | expand

Checks

Context Check Description
ci/intel-Testing success Testing PASS
ci/Intel-compilation success Compilation OK
ci/iol-testing success Testing PASS
ci/iol-mellanox-Performance success Performance Testing PASS
ci/iol-intel-Performance success Performance Testing PASS
ci/github-robot success github build: passed
ci/travis-robot success travis build: passed
ci/iol-abi-testing success Testing PASS
ci/checkpatch success coding style OK

Commit Message

Cheng Jiang March 31, 2021, 2:06 p.m. UTC
For now async vhost data path only supports split ring structure. In
order to make async vhost compatible with virtio 1.1 spec this patch
enables packed ring in async vhost data path.

Signed-off-by: Cheng Jiang <Cheng1.jiang@intel.com>
---
v3:
  * fix error handler for DMA-copy packet
  * remove variables that are no longer needed
v2:
  * fix wrong buffer index in rte_vhost_poll_enqueue_completed()
  * add async_buffers_packed memory free in vhost_free_async_mem()

 lib/librte_vhost/rte_vhost_async.h |   1 +
 lib/librte_vhost/vhost.c           |  24 +-
 lib/librte_vhost/vhost.h           |   7 +-
 lib/librte_vhost/virtio_net.c      | 463 +++++++++++++++++++++++++++--
 4 files changed, 457 insertions(+), 38 deletions(-)

--
2.29.2

Comments

Hu, Jiayu April 7, 2021, 6:26 a.m. UTC | #1
Hi Cheng,

Some comments are inline.

> -----Original Message-----
> From: Jiang, Cheng1 <cheng1.jiang@intel.com>
> Sent: Wednesday, March 31, 2021 10:06 PM
> To: maxime.coquelin@redhat.com; Xia, Chenbo <chenbo.xia@intel.com>
> Cc: dev@dpdk.org; Hu, Jiayu <jiayu.hu@intel.com>; Yang, YvonneX
> <yvonnex.yang@intel.com>; Wang, Yinan <yinan.wang@intel.com>; Jiang,
> Cheng1 <cheng1.jiang@intel.com>
> Subject: [PATCH v3] vhost: add support for packed ring in async vhost
> 
> For now async vhost data path only supports split ring structure. In
> order to make async vhost compatible with virtio 1.1 spec this patch
> enables packed ring in async vhost data path.
> 
> Signed-off-by: Cheng Jiang <Cheng1.jiang@intel.com>
> ---
> v3:
>   * fix error handler for DMA-copy packet
>   * remove variables that are no longer needed
> v2:
>   * fix wrong buffer index in rte_vhost_poll_enqueue_completed()
>   * add async_buffers_packed memory free in vhost_free_async_mem()
> 
>  lib/librte_vhost/rte_vhost_async.h |   1 +
>  lib/librte_vhost/vhost.c           |  24 +-
>  lib/librte_vhost/vhost.h           |   7 +-
>  lib/librte_vhost/virtio_net.c      | 463 +++++++++++++++++++++++++++--
>  4 files changed, 457 insertions(+), 38 deletions(-)
> 
> diff --git a/lib/librte_vhost/rte_vhost_async.h
> b/lib/librte_vhost/rte_vhost_async.h
> index c855ff875..6faa31f5a 100644
> --- a/lib/librte_vhost/rte_vhost_async.h
> +++ b/lib/librte_vhost/rte_vhost_async.h
> @@ -89,6 +89,7 @@ struct rte_vhost_async_channel_ops {
>  struct async_inflight_info {
>  	struct rte_mbuf *mbuf;
>  	uint16_t descs; /* num of descs inflight */
> +	uint16_t nr_buffers; /* num of buffers inflight for packed ring */
>  };
> 
>  /**
> diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c
> index 52ab93d1e..51b44d6f2 100644
> --- a/lib/librte_vhost/vhost.c
> +++ b/lib/librte_vhost/vhost.c
> @@ -330,15 +330,20 @@ vhost_free_async_mem(struct vhost_virtqueue
> *vq)
>  {
>  	if (vq->async_pkts_info)
>  		rte_free(vq->async_pkts_info);
> -	if (vq->async_descs_split)
> +	if (vq->async_buffers_packed) {
> +		rte_free(vq->async_buffers_packed);
> +		vq->async_buffers_packed = NULL;
> +	} else {
>  		rte_free(vq->async_descs_split);
> +		vq->async_descs_split = NULL;
> +	}
> +
>  	if (vq->it_pool)
>  		rte_free(vq->it_pool);
>  	if (vq->vec_pool)
>  		rte_free(vq->vec_pool);
> 
>  	vq->async_pkts_info = NULL;
> -	vq->async_descs_split = NULL;
>  	vq->it_pool = NULL;
>  	vq->vec_pool = NULL;
>  }
> @@ -1603,9 +1608,9 @@ int rte_vhost_async_channel_register(int vid,
> uint16_t queue_id,
>  		return -1;
> 
>  	/* packed queue is not supported */
> -	if (unlikely(vq_is_packed(dev) || !f.async_inorder)) {
> +	if (unlikely(!f.async_inorder)) {
>  		VHOST_LOG_CONFIG(ERR,
> -			"async copy is not supported on packed queue or
> non-inorder mode "
> +			"async copy is not supported on non-inorder mode "
>  			"(vid %d, qid: %d)\n", vid, queue_id);
>  		return -1;
>  	}
> @@ -1643,10 +1648,17 @@ int rte_vhost_async_channel_register(int vid,
> uint16_t queue_id,
>  	vq->vec_pool = rte_malloc_socket(NULL,
>  			VHOST_MAX_ASYNC_VEC * sizeof(struct iovec),
>  			RTE_CACHE_LINE_SIZE, node);
> -	vq->async_descs_split = rte_malloc_socket(NULL,
> +	if (vq_is_packed(dev)) {
> +		vq->async_buffers_packed = rte_malloc_socket(NULL,
> +			vq->size * sizeof(struct vring_used_elem_packed),
> +			RTE_CACHE_LINE_SIZE, node);
> +	} else {
> +		vq->async_descs_split = rte_malloc_socket(NULL,
>  			vq->size * sizeof(struct vring_used_elem),
>  			RTE_CACHE_LINE_SIZE, node);
> -	if (!vq->async_descs_split || !vq->async_pkts_info ||
> +	}
> +
> +	if (!vq->async_pkts_info ||

Need to check if malloc fails for async_buffers_packed.

>  		!vq->it_pool || !vq->vec_pool) {
>  		vhost_free_async_mem(vq);
>  		VHOST_LOG_CONFIG(ERR,
> diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> index 658f6fc28..d6324fbf8 100644
> --- a/lib/librte_vhost/vhost.h
> +++ b/lib/librte_vhost/vhost.h
> @@ -206,9 +206,14 @@ struct vhost_virtqueue {
>  	uint16_t	async_pkts_idx;
>  	uint16_t	async_pkts_inflight_n;
>  	uint16_t	async_last_pkts_n;
> -	struct vring_used_elem  *async_descs_split;
> +	union {
> +		struct vring_used_elem  *async_descs_split;
> +		struct vring_used_elem_packed *async_buffers_packed;
> +	};
>  	uint16_t async_desc_idx;
> +	uint16_t async_packed_buffer_idx;
>  	uint16_t last_async_desc_idx;
> +	uint16_t last_async_buffer_idx;
> 
>  	/* vq async features */
>  	bool		async_inorder;
> diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> index 583bf379c..fa2dfde02 100644
> --- a/lib/librte_vhost/virtio_net.c
> +++ b/lib/librte_vhost/virtio_net.c
> @@ -363,8 +363,7 @@
> vhost_shadow_dequeue_single_packed_inorder(struct vhost_virtqueue *vq,
>  }
> 
>  static __rte_always_inline void
> -vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
> -				   struct vhost_virtqueue *vq,
> +vhost_shadow_enqueue_packed(struct vhost_virtqueue *vq,
>  				   uint32_t len[],
>  				   uint16_t id[],
>  				   uint16_t count[],
> @@ -382,6 +381,17 @@ vhost_shadow_enqueue_single_packed(struct
> virtio_net *dev,
>  		vq->shadow_aligned_idx += count[i];
>  		vq->shadow_used_idx++;
>  	}
> +}
> +
> +static __rte_always_inline void
> +vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
> +				   struct vhost_virtqueue *vq,
> +				   uint32_t len[],
> +				   uint16_t id[],
> +				   uint16_t count[],
> +				   uint16_t num_buffers)
> +{
> +	vhost_shadow_enqueue_packed(vq, len, id, count, num_buffers);
> 
>  	if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
>  		do_data_copy_enqueue(dev, vq);
> @@ -1452,6 +1462,73 @@ virtio_dev_rx_async_get_info_idx(uint16_t
> pkts_idx,
>  		(vq_size - n_inflight + pkts_idx) & (vq_size - 1);
>  }
> 
> +static __rte_always_inline void
> +vhost_update_used_packed(struct virtio_net *dev,
> +				  struct vhost_virtqueue *vq,
> +				  struct vring_used_elem_packed
> *shadow_ring,
> +				  uint16_t count)
> +{
> +	if (count == 0)
> +		return;
> +	int i;
> +	uint16_t used_idx = vq->last_used_idx;
> +	uint16_t head_idx = vq->last_used_idx;
> +	uint16_t head_flags = 0;
> +
> +	/* Split loop in two to save memory barriers */
> +	for (i = 0; i < count; i++) {
> +		vq->desc_packed[used_idx].id = shadow_ring[i].id;
> +		vq->desc_packed[used_idx].len = shadow_ring[i].len;
> +
> +		used_idx += shadow_ring[i].count;
> +		if (used_idx >= vq->size)
> +			used_idx -= vq->size;
> +	}
> +
> +	/* The ordering for storing desc flags needs to be enforced. */
> +	rte_atomic_thread_fence(__ATOMIC_RELEASE);
> +
> +	for (i = 0; i < count; i++) {
> +		uint16_t flags;
> +
> +		if (vq->shadow_used_packed[i].len)
> +			flags = VRING_DESC_F_WRITE;
> +		else
> +			flags = 0;
> +
> +		if (vq->used_wrap_counter) {
> +			flags |= VRING_DESC_F_USED;
> +			flags |= VRING_DESC_F_AVAIL;
> +		} else {
> +			flags &= ~VRING_DESC_F_USED;
> +			flags &= ~VRING_DESC_F_AVAIL;
> +		}
> +
> +		if (i > 0) {
> +			vq->desc_packed[vq->last_used_idx].flags = flags;
> +
> +			vhost_log_cache_used_vring(dev, vq,
> +					vq->last_used_idx *
> +					sizeof(struct vring_packed_desc),
> +					sizeof(struct vring_packed_desc));
> +		} else {
> +			head_idx = vq->last_used_idx;
> +			head_flags = flags;
> +		}
> +
> +		vq_inc_last_used_packed(vq, shadow_ring[i].count);
> +	}
> +
> +	vq->desc_packed[head_idx].flags = head_flags;
> +
> +	vhost_log_cache_used_vring(dev, vq,
> +				head_idx *
> +				sizeof(struct vring_packed_desc),
> +				sizeof(struct vring_packed_desc));
> +
> +	vhost_log_cache_sync(dev, vq);

Async enqueue of packed ring has no support of live migration.
The above code is not needed.

> +}
> +
>  static __rte_noinline uint32_t
>  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
>  	struct vhost_virtqueue *vq, uint16_t queue_id,
> @@ -1633,12 +1710,292 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
>  	return pkt_idx;
>  }
> 
> +static __rte_always_inline int
> +vhost_enqueue_async_single_packed(struct virtio_net *dev,
> +			    struct vhost_virtqueue *vq,
> +			    struct rte_mbuf *pkt,
> +			    struct buf_vector *buf_vec,
> +			    uint16_t *nr_descs,
> +			    uint16_t *nr_buffers,
> +			    struct iovec *src_iovec, struct iovec *dst_iovec,
> +			    struct rte_vhost_iov_iter *src_it,
> +			    struct rte_vhost_iov_iter *dst_it)
> +{
> +	uint16_t nr_vec = 0;
> +	uint16_t avail_idx = vq->last_avail_idx;
> +	uint16_t max_tries, tries = 0;
> +	uint16_t buf_id = 0;
> +	uint32_t len = 0;
> +	uint16_t desc_count;
> +	uint32_t size = pkt->pkt_len + sizeof(struct
> virtio_net_hdr_mrg_rxbuf);
> +	uint32_t buffer_len[vq->size];
> +	uint16_t buffer_buf_id[vq->size];
> +	uint16_t buffer_desc_count[vq->size];
> +	*nr_buffers = 0;
> +
> +	if (rxvq_is_mergeable(dev))
> +		max_tries = vq->size - 1;
> +	else
> +		max_tries = 1;
> +
> +	while (size > 0) {
> +		/*
> +		 * if we tried all available ring items, and still
> +		 * can't get enough buf, it means something abnormal
> +		 * happened.
> +		 */
> +		if (unlikely(++tries > max_tries))
> +			return -1;
> +
> +		if (unlikely(fill_vec_buf_packed(dev, vq,
> +						avail_idx, &desc_count,
> +						buf_vec, &nr_vec,
> +						&buf_id, &len,
> +						VHOST_ACCESS_RW) < 0))
> +			return -1;
> +
> +		len = RTE_MIN(len, size);
> +		size -= len;
> +
> +		buffer_len[*nr_buffers] = len;
> +		buffer_buf_id[*nr_buffers] = buf_id;
> +		buffer_desc_count[*nr_buffers] = desc_count;
> +		*nr_buffers += 1;
> +
> +		*nr_descs += desc_count;
> +		avail_idx += desc_count;
> +		if (avail_idx >= vq->size)
> +			avail_idx -= vq->size;
> +	}
> +
> +	if (async_mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, *nr_buffers,
> +		src_iovec, dst_iovec, src_it, dst_it) < 0)
> +		return -1;
> +
> +	vhost_shadow_enqueue_packed(vq, buffer_len, buffer_buf_id,
> +					   buffer_desc_count, *nr_buffers);
> +
> +	return 0;
> +}
> +
> +static __rte_always_inline int16_t
> +virtio_dev_rx_async_single_packed(struct virtio_net *dev,
> +			    struct vhost_virtqueue *vq,
> +			    struct rte_mbuf *pkt,
> +			    uint16_t *nr_descs, uint16_t *nr_buffers,
> +			    struct iovec *src_iovec, struct iovec *dst_iovec,
> +			    struct rte_vhost_iov_iter *src_it,
> +			    struct rte_vhost_iov_iter *dst_it)
> +{
> +	struct buf_vector buf_vec[BUF_VECTOR_MAX];
> +	*nr_descs = 0;
> +	*nr_buffers = 0;
> +
> +	if (unlikely(vhost_enqueue_async_single_packed(dev, vq, pkt,
> buf_vec,
> +						 nr_descs,
> +						 nr_buffers,
> +						 src_iovec, dst_iovec,
> +						 src_it, dst_it) < 0)) {
> +		VHOST_LOG_DATA(DEBUG,
> +				"(%d) failed to get enough desc from vring\n",
> +				dev->vid);
> +		return -1;
> +	}
> +
> +	VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end
> index %d\n",
> +			dev->vid, vq->last_avail_idx,
> +			vq->last_avail_idx + *nr_descs);
> +
> +	return 0;
> +}
> +
> +static __rte_noinline uint32_t
> +virtio_dev_rx_async_submit_packed(struct virtio_net *dev,
> +	struct vhost_virtqueue *vq, uint16_t queue_id,
> +	struct rte_mbuf **pkts, uint32_t count,
> +	struct rte_mbuf **comp_pkts, uint32_t *comp_count)
> +{
> +	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
> +	uint16_t num_buffers;
> +	uint16_t num_desc;
> +
> +	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
> +	struct iovec *vec_pool = vq->vec_pool;
> +	struct rte_vhost_async_desc tdes[MAX_PKT_BURST];
> +	struct iovec *src_iovec = vec_pool;
> +	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
> +	struct rte_vhost_iov_iter *src_it = it_pool;
> +	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
> +	uint16_t slot_idx = 0;
> +	uint16_t segs_await = 0;
> +	struct async_inflight_info *pkts_info = vq->async_pkts_info;
> +	uint32_t n_pkts = 0, pkt_err = 0;
> +	uint32_t num_async_pkts = 0, num_done_pkts = 0;
> +
> +	rte_prefetch0(&vq->desc[vq->last_avail_idx & (vq->size - 1)]);
> +
> +	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
> +		if (unlikely(virtio_dev_rx_async_single_packed(dev, vq,
> +						pkts[pkt_idx],
> +						&num_desc, &num_buffers,
> +						src_iovec, dst_iovec,
> +						src_it, dst_it) < 0)) {
> +			break;
> +		}
> +
> +		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end
> index %d\n",
> +			dev->vid, vq->last_avail_idx,
> +			vq->last_avail_idx + num_desc);
> +
> +		slot_idx = (vq->async_pkts_idx + num_async_pkts) &
> +			(vq->size - 1);
> +		if (src_it->count) {
> +			uint16_t from, to;
> +
> +			async_fill_desc(&tdes[pkt_burst_idx++], src_it, dst_it);
> +			pkts_info[slot_idx].descs = num_desc;
> +			pkts_info[slot_idx].nr_buffers = num_buffers;
> +			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
> +			num_async_pkts++;
> +			src_iovec += src_it->nr_segs;
> +			dst_iovec += dst_it->nr_segs;
> +			src_it += 2;
> +			dst_it += 2;
> +			segs_await += src_it->nr_segs;
> +
> +			/**
> +			 * recover shadow used ring and keep DMA-occupied
> +			 * descriptors.
> +			 */
> +			from = vq->shadow_used_idx - num_buffers;
> +			to = vq->async_packed_buffer_idx & (vq->size - 1);
> +			if (num_buffers + to <= vq->size) {
> +				rte_memcpy(&vq->async_buffers_packed[to],
> +					&vq->shadow_used_packed[from],
> +					num_buffers *
> +					sizeof(struct
> vring_used_elem_packed));
> +			} else {
> +				int size = vq->size - to;
> +
> +				rte_memcpy(&vq->async_buffers_packed[to],
> +					&vq->shadow_used_packed[from],
> +					size *
> +					sizeof(struct
> vring_used_elem_packed));
> +				rte_memcpy(vq->async_buffers_packed,
> +					&vq->shadow_used_packed[from +
> +					size], (num_buffers - size) *
> +					sizeof(struct
> vring_used_elem_packed));
> +			}
> +			vq->async_packed_buffer_idx += num_buffers;
> +			vq->shadow_used_idx -= num_buffers;
> +		} else
> +			comp_pkts[num_done_pkts++] = pkts[pkt_idx];
> +
> +		vq_inc_last_avail_packed(vq, num_desc);
> +
> +		/*
> +		 * conditions to trigger async device transfer:
> +		 * - buffered packet number reaches transfer threshold
> +		 * - unused async iov number is less than max vhost vector
> +		 */
> +		if (unlikely(pkt_burst_idx >=
> VHOST_ASYNC_BATCH_THRESHOLD ||
> +			((VHOST_MAX_ASYNC_VEC >> 1) - segs_await <
> +			BUF_VECTOR_MAX))) {
> +			n_pkts = vq->async_ops.transfer_data(dev->vid,
> +					queue_id, tdes, 0, pkt_burst_idx);
> +			src_iovec = vec_pool;
> +			dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >>
> 1);
> +			src_it = it_pool;
> +			dst_it = it_pool + 1;
> +			segs_await = 0;
> +			vq->async_pkts_inflight_n += n_pkts;
> +
> +			if (unlikely(n_pkts < pkt_burst_idx)) {
> +				/*
> +				 * log error packets number here and do
> actual
> +				 * error processing when applications poll
> +				 * completion
> +				 */
> +				pkt_err = pkt_burst_idx - n_pkts;
> +				pkt_burst_idx = 0;
> +				pkt_idx++;
> +				break;
> +			}
> +
> +			pkt_burst_idx = 0;
> +		}
> +	}
> +
> +	if (pkt_burst_idx) {
> +		n_pkts = vq->async_ops.transfer_data(dev->vid,
> +				queue_id, tdes, 0, pkt_burst_idx);
> +		vq->async_pkts_inflight_n += n_pkts;
> +
> +		if (unlikely(n_pkts < pkt_burst_idx))
> +			pkt_err = pkt_burst_idx - n_pkts;
> +	}
> +
> +	do_data_copy_enqueue(dev, vq);
> +
> +	if (unlikely(pkt_err)) {
> +		uint16_t buffers_err = 0;
> +		uint16_t async_buffer_idx;
> +		uint16_t i;
> +
> +		num_async_pkts -= pkt_err;
> +		pkt_idx -= pkt_err;
> +		/* calculate the sum of buffers of DMA-error packets. */
> +		while (pkt_err-- > 0) {
> +			buffers_err +=
> +				pkts_info[slot_idx & (vq->size - 1)].nr_buffers;
> +			slot_idx--;
> +		}
> +
> +		vq->async_packed_buffer_idx -= buffers_err;
> +		async_buffer_idx = vq->async_packed_buffer_idx;
> +		/* set 0 to the length of descriptors of DMA-error packets */
> +		for (i = 0; i < buffers_err; i++) {
> +			vq->async_buffers_packed[(async_buffer_idx + i)
> +						& (vq->size - 1)].len = 0;
> +		}
> +		/* write back DMA-error descriptors to used ring */
> +		do {
> +			uint16_t from = async_buffer_idx & (vq->size - 1);
> +			uint16_t to = (from + buffers_err) & (vq->size - 1);
> +
> +			if (to > from) {
> +				vhost_update_used_packed(dev, vq,
> +					vq->async_buffers_packed + from,
> +					to - from);
> +				buffers_err = 0;
> +			} else {
> +				vhost_update_used_packed(dev, vq,
> +					vq->async_buffers_packed + from,
> +					vq->size - from);
> +				buffers_err -= vq->size - from;
> +			}
> +		} while (buffers_err > 0);
> +		vhost_vring_call_packed(dev, vq);

Why notify front-end here?

> +		num_done_pkts = pkt_idx - num_async_pkts;
> +	}
> +
> +	vq->async_pkts_idx += num_async_pkts;
> +	*comp_count = num_done_pkts;
> +
> +	if (likely(vq->shadow_used_idx)) {
> +		vhost_flush_enqueue_shadow_packed(dev, vq);
> +		vhost_vring_call_packed(dev, vq);
> +	}
> +
> +	return pkt_idx;
> +}

virtio_dev_rx_async_submit_packed is too long and it has several parts are
similar with split ring. I think you need to abstract common parts into inline
functions to make the code easier to read.

> +
>  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
>  		struct rte_mbuf **pkts, uint16_t count)
>  {
>  	struct virtio_net *dev = get_device(vid);
>  	struct vhost_virtqueue *vq;
> -	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0;
> +	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0, n_buffers = 0;
>  	uint16_t start_idx, pkts_idx, vq_size;
>  	struct async_inflight_info *pkts_info;
>  	uint16_t from, i;
> @@ -1680,53 +2037,96 @@ uint16_t
> rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
>  		goto done;
>  	}
> 
> -	for (i = 0; i < n_pkts_put; i++) {
> -		from = (start_idx + i) & (vq_size - 1);
> -		n_descs += pkts_info[from].descs;
> -		pkts[i] = pkts_info[from].mbuf;
> +	if (vq_is_packed(dev)) {
> +		for (i = 0; i < n_pkts_put; i++) {
> +			from = (start_idx + i) & (vq_size - 1);
> +			n_buffers += pkts_info[from].nr_buffers;
> +			pkts[i] = pkts_info[from].mbuf;
> +		}
> +	} else {
> +		for (i = 0; i < n_pkts_put; i++) {
> +			from = (start_idx + i) & (vq_size - 1);
> +			n_descs += pkts_info[from].descs;
> +			pkts[i] = pkts_info[from].mbuf;
> +		}
>  	}
> +
>  	vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
>  	vq->async_pkts_inflight_n -= n_pkts_put;
> 
>  	if (likely(vq->enabled && vq->access_ok)) {
> -		uint16_t nr_left = n_descs;
>  		uint16_t nr_copy;
>  		uint16_t to;
> 
>  		/* write back completed descriptors to used ring */
> -		do {
> -			from = vq->last_async_desc_idx & (vq->size - 1);
> -			nr_copy = nr_left + from <= vq->size ? nr_left :
> -				vq->size - from;
> -			to = vq->last_used_idx & (vq->size - 1);
> -
> -			if (to + nr_copy <= vq->size) {
> -				rte_memcpy(&vq->used->ring[to],
> +		if (vq_is_packed(dev)) {
> +			uint16_t nr_left = n_buffers;
> +			uint16_t to;
> +			do {
> +				from = vq->last_async_buffer_idx &
> +								(vq->size - 1);
> +				to = (from + nr_left) & (vq->size - 1);
> +
> +				if (to > from) {
> +					vhost_update_used_packed(dev, vq,
> +						vq->async_buffers_packed +
> from,
> +						to - from);
> +					vq->last_async_buffer_idx += nr_left;
> +					nr_left = 0;
> +				} else {
> +					vhost_update_used_packed(dev, vq,
> +						vq->async_buffers_packed +
> from,
> +						vq->size - from);
> +					vq->last_async_buffer_idx +=
> +								vq->size -
> from;
> +					nr_left -= vq->size - from;
> +				}
> +			} while (nr_left > 0);
> +			vhost_vring_call_packed(dev, vq);
> +		} else {
> +			uint16_t nr_left = n_descs;
> +			do {
> +				from = vq->last_async_desc_idx & (vq->size -
> 1);
> +				nr_copy = nr_left + from <= vq->size ? nr_left :
> +					vq->size - from;
> +				to = vq->last_used_idx & (vq->size - 1);
> +
> +				if (to + nr_copy <= vq->size) {
> +					rte_memcpy(&vq->used->ring[to],
>  						&vq-
> >async_descs_split[from],
>  						nr_copy *
>  						sizeof(struct
> vring_used_elem));
> -			} else {
> -				uint16_t size = vq->size - to;
> +				} else {
> +					uint16_t size = vq->size - to;
> 
> -				rte_memcpy(&vq->used->ring[to],
> +					rte_memcpy(&vq->used->ring[to],
>  						&vq-
> >async_descs_split[from],
>  						size *
>  						sizeof(struct
> vring_used_elem));
> -				rte_memcpy(vq->used->ring,
> +					rte_memcpy(vq->used->ring,
>  						&vq->async_descs_split[from
> +
>  						size], (nr_copy - size) *
>  						sizeof(struct
> vring_used_elem));
> -			}
> +				}
> +
> +				vq->last_async_desc_idx += nr_copy;
> +				vq->last_used_idx += nr_copy;
> +				nr_left -= nr_copy;
> +			} while (nr_left > 0);
> +
> +			__atomic_add_fetch(&vq->used->idx, n_descs,
> +					__ATOMIC_RELEASE);
> +			vhost_vring_call_split(dev, vq);
> +		}
> 
> -			vq->last_async_desc_idx += nr_copy;
> -			vq->last_used_idx += nr_copy;
> -			nr_left -= nr_copy;
> -		} while (nr_left > 0);
> 
> -		__atomic_add_fetch(&vq->used->idx, n_descs,
> __ATOMIC_RELEASE);
> -		vhost_vring_call_split(dev, vq);
> -	} else
> -		vq->last_async_desc_idx += n_descs;
> +
> +	} else {
> +		if (vq_is_packed(dev))
> +			vq->last_async_buffer_idx += n_buffers;
> +		else
> +			vq->last_async_desc_idx += n_descs;
> +	}

rte_vhost_poll_enqueue_completed is too long and not easy to read. Save suggestion
as above.

Thanks,
Jiayu

> 
>  done:
>  	rte_spinlock_unlock(&vq->access_lock);
> @@ -1767,9 +2167,10 @@ virtio_dev_rx_async_submit(struct virtio_net
> *dev, uint16_t queue_id,
>  	if (count == 0)
>  		goto out;
> 
> -	/* TODO: packed queue not implemented */
>  	if (vq_is_packed(dev))
> -		nb_tx = 0;
> +		nb_tx = virtio_dev_rx_async_submit_packed(dev,
> +				vq, queue_id, pkts, count, comp_pkts,
> +				comp_count);
>  	else
>  		nb_tx = virtio_dev_rx_async_submit_split(dev,
>  				vq, queue_id, pkts, count, comp_pkts,
> --
> 2.29.2
Cheng Jiang April 8, 2021, 12:01 p.m. UTC | #2
Hi Jiayu,

> -----Original Message-----
> From: Hu, Jiayu <jiayu.hu@intel.com>
> Sent: Wednesday, April 7, 2021 2:27 PM
> To: Jiang, Cheng1 <cheng1.jiang@intel.com>; maxime.coquelin@redhat.com;
> Xia, Chenbo <chenbo.xia@intel.com>
> Cc: dev@dpdk.org; Yang, YvonneX <yvonnex.yang@intel.com>; Wang, Yinan
> <yinan.wang@intel.com>
> Subject: RE: [PATCH v3] vhost: add support for packed ring in async vhost
> 
> Hi Cheng,
> 
> Some comments are inline.
> 
> > -----Original Message-----
> > From: Jiang, Cheng1 <cheng1.jiang@intel.com>
> > Sent: Wednesday, March 31, 2021 10:06 PM
> > To: maxime.coquelin@redhat.com; Xia, Chenbo <chenbo.xia@intel.com>
> > Cc: dev@dpdk.org; Hu, Jiayu <jiayu.hu@intel.com>; Yang, YvonneX
> > <yvonnex.yang@intel.com>; Wang, Yinan <yinan.wang@intel.com>; Jiang,
> > Cheng1 <cheng1.jiang@intel.com>
> > Subject: [PATCH v3] vhost: add support for packed ring in async vhost
> >
> > For now async vhost data path only supports split ring structure. In
> > order to make async vhost compatible with virtio 1.1 spec this patch
> > enables packed ring in async vhost data path.
> >
> > Signed-off-by: Cheng Jiang <Cheng1.jiang@intel.com>
> > ---
> > v3:
> >   * fix error handler for DMA-copy packet
> >   * remove variables that are no longer needed
> > v2:
> >   * fix wrong buffer index in rte_vhost_poll_enqueue_completed()
> >   * add async_buffers_packed memory free in vhost_free_async_mem()
> >
> >  lib/librte_vhost/rte_vhost_async.h |   1 +
> >  lib/librte_vhost/vhost.c           |  24 +-
> >  lib/librte_vhost/vhost.h           |   7 +-
> >  lib/librte_vhost/virtio_net.c      | 463 +++++++++++++++++++++++++++--
> >  4 files changed, 457 insertions(+), 38 deletions(-)
> >
> > diff --git a/lib/librte_vhost/rte_vhost_async.h
> > b/lib/librte_vhost/rte_vhost_async.h
> > index c855ff875..6faa31f5a 100644
> > --- a/lib/librte_vhost/rte_vhost_async.h
> > +++ b/lib/librte_vhost/rte_vhost_async.h
> > @@ -89,6 +89,7 @@ struct rte_vhost_async_channel_ops {  struct
> > async_inflight_info {  struct rte_mbuf *mbuf;  uint16_t descs; /* num
> > of descs inflight */
> > +uint16_t nr_buffers; /* num of buffers inflight for packed ring */
> >  };
> >
> >  /**
> > diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c index
> > 52ab93d1e..51b44d6f2 100644
> > --- a/lib/librte_vhost/vhost.c
> > +++ b/lib/librte_vhost/vhost.c
> > @@ -330,15 +330,20 @@ vhost_free_async_mem(struct vhost_virtqueue
> > *vq)
> >  {
> >  if (vq->async_pkts_info)
> >  rte_free(vq->async_pkts_info);
> > -if (vq->async_descs_split)
> > +if (vq->async_buffers_packed) {
> > +rte_free(vq->async_buffers_packed);
> > +vq->async_buffers_packed = NULL;
> > +} else {
> >  rte_free(vq->async_descs_split);
> > +vq->async_descs_split = NULL;
> > +}
> > +
> >  if (vq->it_pool)
> >  rte_free(vq->it_pool);
> >  if (vq->vec_pool)
> >  rte_free(vq->vec_pool);
> >
> >  vq->async_pkts_info = NULL;
> > -vq->async_descs_split = NULL;
> >  vq->it_pool = NULL;
> >  vq->vec_pool = NULL;
> >  }
> > @@ -1603,9 +1608,9 @@ int rte_vhost_async_channel_register(int vid,
> > uint16_t queue_id,  return -1;
> >
> >  /* packed queue is not supported */
> > -if (unlikely(vq_is_packed(dev) || !f.async_inorder)) {
> > +if (unlikely(!f.async_inorder)) {
> >  VHOST_LOG_CONFIG(ERR,
> > -"async copy is not supported on packed queue or non-inorder mode "
> > +"async copy is not supported on non-inorder mode "
> >  "(vid %d, qid: %d)\n", vid, queue_id);  return -1;  } @@ -1643,10
> > +1648,17 @@ int rte_vhost_async_channel_register(int vid, uint16_t
> > queue_id,  vq->vec_pool = rte_malloc_socket(NULL,
> VHOST_MAX_ASYNC_VEC
> > * sizeof(struct iovec),  RTE_CACHE_LINE_SIZE, node);
> > -vq->async_descs_split = rte_malloc_socket(NULL,
> > +if (vq_is_packed(dev)) {
> > +vq->async_buffers_packed = rte_malloc_socket(NULL, size *
> > +vq->sizeof(struct vring_used_elem_packed),
> > +RTE_CACHE_LINE_SIZE, node);
> > +} else {
> > +vq->async_descs_split = rte_malloc_socket(NULL,
> >  vq->size * sizeof(struct vring_used_elem),  RTE_CACHE_LINE_SIZE,
> > node); -if (!vq->async_descs_split || !vq->async_pkts_info ||
> > +}
> > +
> > +if (!vq->async_pkts_info ||
> 
> Need to check if malloc fails for async_buffers_packed.

Sure, It will be fixed in the next version.

> 
> >  !vq->it_pool || !vq->vec_pool) {
> >  vhost_free_async_mem(vq);
> >  VHOST_LOG_CONFIG(ERR,
> > diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h index
> > 658f6fc28..d6324fbf8 100644
> > --- a/lib/librte_vhost/vhost.h
> > +++ b/lib/librte_vhost/vhost.h
> > @@ -206,9 +206,14 @@ struct vhost_virtqueue {  uint16_tasync_pkts_idx;
> > uint16_tasync_pkts_inflight_n;  uint16_tasync_last_pkts_n; -struct
> > vring_used_elem  *async_descs_split;
> > +union {
> > +struct vring_used_elem  *async_descs_split; struct
> > +vring_used_elem_packed *async_buffers_packed; };
> >  uint16_t async_desc_idx;
> > +uint16_t async_packed_buffer_idx;
> >  uint16_t last_async_desc_idx;
> > +uint16_t last_async_buffer_idx;
> >
> >  /* vq async features */
> >  boolasync_inorder;
> > diff --git a/lib/librte_vhost/virtio_net.c
> > b/lib/librte_vhost/virtio_net.c index 583bf379c..fa2dfde02 100644
> > --- a/lib/librte_vhost/virtio_net.c
> > +++ b/lib/librte_vhost/virtio_net.c
> > @@ -363,8 +363,7 @@
> > vhost_shadow_dequeue_single_packed_inorder(struct vhost_virtqueue
> *vq,
> > }
> >
> >  static __rte_always_inline void
> > -vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
> > -   struct vhost_virtqueue *vq,
> > +vhost_shadow_enqueue_packed(struct vhost_virtqueue *vq,
> >     uint32_t len[],
> >     uint16_t id[],
> >     uint16_t count[],
> > @@ -382,6 +381,17 @@ vhost_shadow_enqueue_single_packed(struct
> > virtio_net *dev,
> >  vq->shadow_aligned_idx += count[i];
> >  vq->shadow_used_idx++;
> >  }
> > +}
> > +
> > +static __rte_always_inline void
> > +vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
> > +   struct vhost_virtqueue *vq,
> > +   uint32_t len[],
> > +   uint16_t id[],
> > +   uint16_t count[],
> > +   uint16_t num_buffers)
> > +{
> > +vhost_shadow_enqueue_packed(vq, len, id, count, num_buffers);
> >
> >  if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
> > do_data_copy_enqueue(dev, vq); @@ -1452,6 +1462,73 @@
> > virtio_dev_rx_async_get_info_idx(uint16_t
> > pkts_idx,
> >  (vq_size - n_inflight + pkts_idx) & (vq_size - 1);  }
> >
> > +static __rte_always_inline void
> > +vhost_update_used_packed(struct virtio_net *dev,
> > +  struct vhost_virtqueue *vq,
> > +  struct vring_used_elem_packed
> > *shadow_ring,
> > +  uint16_t count)
> > +{
> > +if (count == 0)
> > +return;
> > +int i;
> > +uint16_t used_idx = vq->last_used_idx; uint16_t head_idx =
> > +vq->last_used_idx; uint16_t head_flags = 0;
> > +
> > +/* Split loop in two to save memory barriers */ for (i = 0; i <
> > +count; i++) {
> > +vq->desc_packed[used_idx].id = shadow_ring[i].id;
> > +vq->desc_packed[used_idx].len = shadow_ring[i].len;
> > +
> > +used_idx += shadow_ring[i].count;
> > +if (used_idx >= vq->size)
> > +used_idx -= vq->size;
> > +}
> > +
> > +/* The ordering for storing desc flags needs to be enforced. */
> > +rte_atomic_thread_fence(__ATOMIC_RELEASE);
> > +
> > +for (i = 0; i < count; i++) {
> > +uint16_t flags;
> > +
> > +if (vq->shadow_used_packed[i].len)
> > +flags = VRING_DESC_F_WRITE;
> > +else
> > +flags = 0;
> > +
> > +if (vq->used_wrap_counter) {
> > +flags |= VRING_DESC_F_USED;
> > +flags |= VRING_DESC_F_AVAIL;
> > +} else {
> > +flags &= ~VRING_DESC_F_USED;
> > +flags &= ~VRING_DESC_F_AVAIL;
> > +}
> > +
> > +if (i > 0) {
> > +vq->desc_packed[vq->last_used_idx].flags = flags;
> > +
> > +vhost_log_cache_used_vring(dev, vq,
> > +vq->last_used_idx *
> > +sizeof(struct vring_packed_desc),
> > +sizeof(struct vring_packed_desc));
> > +} else {
> > +head_idx = vq->last_used_idx;
> > +head_flags = flags;
> > +}
> > +
> > +vq_inc_last_used_packed(vq, shadow_ring[i].count); }
> > +
> > +vq->desc_packed[head_idx].flags = head_flags;
> > +
> > +vhost_log_cache_used_vring(dev, vq,
> > +head_idx *
> > +sizeof(struct vring_packed_desc),
> > +sizeof(struct vring_packed_desc));
> > +
> > +vhost_log_cache_sync(dev, vq);
> 
> Async enqueue of packed ring has no support of live migration.
> The above code is not needed.

It will be removed.

> 
> > +}
> > +
> >  static __rte_noinline uint32_t
> >  virtio_dev_rx_async_submit_split(struct virtio_net *dev,  struct
> > vhost_virtqueue *vq, uint16_t queue_id, @@ -1633,12 +1710,292 @@
> > virtio_dev_rx_async_submit_split(struct
> > virtio_net *dev,
> >  return pkt_idx;
> >  }
> >
> > +static __rte_always_inline int
> > +vhost_enqueue_async_single_packed(struct virtio_net *dev,
> > +    struct vhost_virtqueue *vq,
> > +    struct rte_mbuf *pkt,
> > +    struct buf_vector *buf_vec,
> > +    uint16_t *nr_descs,
> > +    uint16_t *nr_buffers,
> > +    struct iovec *src_iovec, struct iovec *dst_iovec,
> > +    struct rte_vhost_iov_iter *src_it,
> > +    struct rte_vhost_iov_iter *dst_it) { uint16_t nr_vec = 0;
> > +uint16_t avail_idx = vq->last_avail_idx; uint16_t max_tries, tries =
> > +0; uint16_t buf_id = 0; uint32_t len = 0; uint16_t desc_count;
> > +uint32_t size = pkt->pkt_len + sizeof(struct
> > virtio_net_hdr_mrg_rxbuf);
> > +uint32_t buffer_len[vq->size];
> > +uint16_t buffer_buf_id[vq->size];
> > +uint16_t buffer_desc_count[vq->size]; *nr_buffers = 0;
> > +
> > +if (rxvq_is_mergeable(dev))
> > +max_tries = vq->size - 1;
> > +else
> > +max_tries = 1;
> > +
> > +while (size > 0) {
> > +/*
> > + * if we tried all available ring items, and still
> > + * can't get enough buf, it means something abnormal
> > + * happened.
> > + */
> > +if (unlikely(++tries > max_tries))
> > +return -1;
> > +
> > +if (unlikely(fill_vec_buf_packed(dev, vq, avail_idx, &desc_count,
> > +buf_vec, &nr_vec, &buf_id, &len,
> > +VHOST_ACCESS_RW) < 0))
> > +return -1;
> > +
> > +len = RTE_MIN(len, size);
> > +size -= len;
> > +
> > +buffer_len[*nr_buffers] = len;
> > +buffer_buf_id[*nr_buffers] = buf_id;
> > +buffer_desc_count[*nr_buffers] = desc_count; *nr_buffers += 1;
> > +
> > +*nr_descs += desc_count;
> > +avail_idx += desc_count;
> > +if (avail_idx >= vq->size)
> > +avail_idx -= vq->size;
> > +}
> > +
> > +if (async_mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, *nr_buffers,
> > +src_iovec, dst_iovec, src_it, dst_it) < 0) return -1;
> > +
> > +vhost_shadow_enqueue_packed(vq, buffer_len, buffer_buf_id,
> > +   buffer_desc_count, *nr_buffers);
> > +
> > +return 0;
> > +}
> > +
> > +static __rte_always_inline int16_t
> > +virtio_dev_rx_async_single_packed(struct virtio_net *dev,
> > +    struct vhost_virtqueue *vq,
> > +    struct rte_mbuf *pkt,
> > +    uint16_t *nr_descs, uint16_t *nr_buffers,
> > +    struct iovec *src_iovec, struct iovec *dst_iovec,
> > +    struct rte_vhost_iov_iter *src_it,
> > +    struct rte_vhost_iov_iter *dst_it) { struct buf_vector
> > +buf_vec[BUF_VECTOR_MAX]; *nr_descs = 0; *nr_buffers = 0;
> > +
> > +if (unlikely(vhost_enqueue_async_single_packed(dev, vq, pkt,
> > buf_vec,
> > + nr_descs,
> > + nr_buffers,
> > + src_iovec, dst_iovec,
> > + src_it, dst_it) < 0)) {
> > +VHOST_LOG_DATA(DEBUG,
> > +"(%d) failed to get enough desc from vring\n",
> > +dev->vid);
> > +return -1;
> > +}
> > +
> > +VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end
> > index %d\n",
> > +dev->vid, vq->last_avail_idx,
> > +vq->last_avail_idx + *nr_descs);
> > +
> > +return 0;
> > +}
> > +
> > +static __rte_noinline uint32_t
> > +virtio_dev_rx_async_submit_packed(struct virtio_net *dev, struct
> > +vhost_virtqueue *vq, uint16_t queue_id, struct rte_mbuf **pkts,
> > +uint32_t count, struct rte_mbuf **comp_pkts, uint32_t *comp_count) {
> > +uint32_t pkt_idx = 0, pkt_burst_idx = 0; uint16_t num_buffers;
> > +uint16_t num_desc;
> > +
> > +struct rte_vhost_iov_iter *it_pool = vq->it_pool; struct iovec
> > +*vec_pool = vq->vec_pool; struct rte_vhost_async_desc
> > +tdes[MAX_PKT_BURST]; struct iovec *src_iovec = vec_pool; struct iovec
> > +*dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1); struct
> > +rte_vhost_iov_iter *src_it = it_pool; struct rte_vhost_iov_iter
> > +*dst_it = it_pool + 1; uint16_t slot_idx = 0; uint16_t segs_await =
> > +0; struct async_inflight_info *pkts_info = vq->async_pkts_info;
> > +uint32_t n_pkts = 0, pkt_err = 0; uint32_t num_async_pkts = 0,
> > +num_done_pkts = 0;
> > +
> > +rte_prefetch0(&vq->desc[vq->last_avail_idx & (vq->size - 1)]);
> > +
> > +for (pkt_idx = 0; pkt_idx < count; pkt_idx++) { if
> > +(unlikely(virtio_dev_rx_async_single_packed(dev, vq, pkts[pkt_idx],
> > +&num_desc, &num_buffers, src_iovec, dst_iovec, src_it, dst_it) < 0))
> > +{ break; }
> > +
> > +VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end
> > index %d\n",
> > +dev->vid, vq->last_avail_idx,
> > +vq->last_avail_idx + num_desc);
> > +
> > +slot_idx = (vq->async_pkts_idx + num_async_pkts) & (vq->size - 1); if
> > +(src_it->count) { uint16_t from, to;
> > +
> > +async_fill_desc(&tdes[pkt_burst_idx++], src_it, dst_it);
> > +pkts_info[slot_idx].descs = num_desc; pkts_info[slot_idx].nr_buffers
> > += num_buffers; pkts_info[slot_idx].mbuf = pkts[pkt_idx];
> > +num_async_pkts++; src_iovec += src_it->nr_segs; dst_iovec +=
> > +dst_it->nr_segs; src_it += 2; dst_it += 2; segs_await +=
> > +src_it->nr_segs;
> > +
> > +/**
> > + * recover shadow used ring and keep DMA-occupied
> > + * descriptors.
> > + */
> > +from = vq->shadow_used_idx - num_buffers;
> > +to = vq->async_packed_buffer_idx & (vq->size - 1);
> > +if (num_buffers + to <= vq->size) {
> > +rte_memcpy(&vq->async_buffers_packed[to],
> > +&vq->shadow_used_packed[from],
> > +num_buffers *
> > +sizeof(struct
> > vring_used_elem_packed));
> > +} else {
> > +int size = vq->size - to;
> > +
> > +rte_memcpy(&vq->async_buffers_packed[to],
> > +&vq->shadow_used_packed[from],
> > +size *
> > +sizeof(struct
> > vring_used_elem_packed));
> > +rte_memcpy(vq->async_buffers_packed,
> > +&vq->shadow_used_packed[from +
> > +size], (num_buffers - size) *
> > +sizeof(struct
> > vring_used_elem_packed));
> > +}
> > +vq->async_packed_buffer_idx += num_buffers;
> > +vq->shadow_used_idx -= num_buffers;
> > +} else
> > +comp_pkts[num_done_pkts++] = pkts[pkt_idx];
> > +
> > +vq_inc_last_avail_packed(vq, num_desc);
> > +
> > +/*
> > + * conditions to trigger async device transfer:
> > + * - buffered packet number reaches transfer threshold
> > + * - unused async iov number is less than max vhost vector
> > + */
> > +if (unlikely(pkt_burst_idx >=
> > VHOST_ASYNC_BATCH_THRESHOLD ||
> > +((VHOST_MAX_ASYNC_VEC >> 1) - segs_await <
> > +BUF_VECTOR_MAX))) {
> > +n_pkts = vq->async_ops.transfer_data(dev->vid,
> > +queue_id, tdes, 0, pkt_burst_idx);
> > +src_iovec = vec_pool;
> > +dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >>
> > 1);
> > +src_it = it_pool;
> > +dst_it = it_pool + 1;
> > +segs_await = 0;
> > +vq->async_pkts_inflight_n += n_pkts;
> > +
> > +if (unlikely(n_pkts < pkt_burst_idx)) {
> > +/*
> > + * log error packets number here and do
> > actual
> > + * error processing when applications poll
> > + * completion
> > + */
> > +pkt_err = pkt_burst_idx - n_pkts;
> > +pkt_burst_idx = 0;
> > +pkt_idx++;
> > +break;
> > +}
> > +
> > +pkt_burst_idx = 0;
> > +}
> > +}
> > +
> > +if (pkt_burst_idx) {
> > +n_pkts = vq->async_ops.transfer_data(dev->vid,
> > +queue_id, tdes, 0, pkt_burst_idx);
> > +vq->async_pkts_inflight_n += n_pkts;
> > +
> > +if (unlikely(n_pkts < pkt_burst_idx))
> > +pkt_err = pkt_burst_idx - n_pkts;
> > +}
> > +
> > +do_data_copy_enqueue(dev, vq);
> > +
> > +if (unlikely(pkt_err)) {
> > +uint16_t buffers_err = 0;
> > +uint16_t async_buffer_idx;
> > +uint16_t i;
> > +
> > +num_async_pkts -= pkt_err;
> > +pkt_idx -= pkt_err;
> > +/* calculate the sum of buffers of DMA-error packets. */
> > +while (pkt_err-- > 0) {
> > +buffers_err +=
> > +pkts_info[slot_idx & (vq->size - 1)].nr_buffers;
> > +slot_idx--;
> > +}
> > +
> > +vq->async_packed_buffer_idx -= buffers_err;
> > +async_buffer_idx = vq->async_packed_buffer_idx;
> > +/* set 0 to the length of descriptors of DMA-error packets */
> > +for (i = 0; i < buffers_err; i++) {
> > +vq->async_buffers_packed[(async_buffer_idx + i)
> > +& (vq->size - 1)].len = 0;
> > +}
> > +/* write back DMA-error descriptors to used ring */
> > +do {
> > +uint16_t from = async_buffer_idx & (vq->size - 1);
> > +uint16_t to = (from + buffers_err) & (vq->size - 1);
> > +
> > +if (to > from) {
> > +vhost_update_used_packed(dev, vq,
> > +vq->async_buffers_packed + from,
> > +to - from);
> > +buffers_err = 0;
> > +} else {
> > +vhost_update_used_packed(dev, vq,
> > +vq->async_buffers_packed + from,
> > +vq->size - from);
> > +buffers_err -= vq->size - from;
> > +}
> > +} while (buffers_err > 0);
> > +vhost_vring_call_packed(dev, vq);
> 
> Why notify front-end here?

The error handling method will be changed in the next version, so this notification will be removed.

> 
> > +num_done_pkts = pkt_idx - num_async_pkts;
> > +}
> > +
> > +vq->async_pkts_idx += num_async_pkts;
> > +*comp_count = num_done_pkts;
> > +
> > +if (likely(vq->shadow_used_idx)) {
> > +vhost_flush_enqueue_shadow_packed(dev, vq);
> > +vhost_vring_call_packed(dev, vq);
> > +}
> > +
> > +return pkt_idx;
> > +}
> 
> virtio_dev_rx_async_submit_packed is too long and it has several parts are
> similar with split ring. I think you need to abstract common parts into inline
> functions to make the code easier to read.

I'm not sure which parts can be easily processed into functions. Maybe we can have a discussion offline. 

> 
> > +
> >  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> >  struct rte_mbuf **pkts, uint16_t count)
> >  {
> >  struct virtio_net *dev = get_device(vid);
> >  struct vhost_virtqueue *vq;
> > -uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0;
> > +uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0, n_buffers = 0;
> >  uint16_t start_idx, pkts_idx, vq_size;
> >  struct async_inflight_info *pkts_info;
> >  uint16_t from, i;
> > @@ -1680,53 +2037,96 @@ uint16_t
> > rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> >  goto done;
> >  }
> >
> > -for (i = 0; i < n_pkts_put; i++) {
> > -from = (start_idx + i) & (vq_size - 1);
> > -n_descs += pkts_info[from].descs;
> > -pkts[i] = pkts_info[from].mbuf;
> > +if (vq_is_packed(dev)) {
> > +for (i = 0; i < n_pkts_put; i++) {
> > +from = (start_idx + i) & (vq_size - 1);
> > +n_buffers += pkts_info[from].nr_buffers;
> > +pkts[i] = pkts_info[from].mbuf;
> > +}
> > +} else {
> > +for (i = 0; i < n_pkts_put; i++) {
> > +from = (start_idx + i) & (vq_size - 1);
> > +n_descs += pkts_info[from].descs;
> > +pkts[i] = pkts_info[from].mbuf;
> > +}
> >  }
> > +
> >  vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
> >  vq->async_pkts_inflight_n -= n_pkts_put;
> >
> >  if (likely(vq->enabled && vq->access_ok)) {
> > -uint16_t nr_left = n_descs;
> >  uint16_t nr_copy;
> >  uint16_t to;
> >
> >  /* write back completed descriptors to used ring */
> > -do {
> > -from = vq->last_async_desc_idx & (vq->size - 1);
> > -nr_copy = nr_left + from <= vq->size ? nr_left :
> > -vq->size - from;
> > -to = vq->last_used_idx & (vq->size - 1);
> > -
> > -if (to + nr_copy <= vq->size) {
> > -rte_memcpy(&vq->used->ring[to],
> > +if (vq_is_packed(dev)) {
> > +uint16_t nr_left = n_buffers;
> > +uint16_t to;
> > +do {
> > +from = vq->last_async_buffer_idx &
> > +(vq->size - 1);
> > +to = (from + nr_left) & (vq->size - 1);
> > +
> > +if (to > from) {
> > +vhost_update_used_packed(dev, vq,
> > +vq->async_buffers_packed +
> > from,
> > +to - from);
> > +vq->last_async_buffer_idx += nr_left;
> > +nr_left = 0;
> > +} else {
> > +vhost_update_used_packed(dev, vq,
> > +vq->async_buffers_packed +
> > from,
> > +vq->size - from);
> > +vq->last_async_buffer_idx +=
> > +vq->size -
> > from;
> > +nr_left -= vq->size - from;
> > +}
> > +} while (nr_left > 0);
> > +vhost_vring_call_packed(dev, vq);
> > +} else {
> > +uint16_t nr_left = n_descs;
> > +do {
> > +from = vq->last_async_desc_idx & (vq->size -
> > 1);
> > +nr_copy = nr_left + from <= vq->size ? nr_left :
> > +vq->size - from;
> > +to = vq->last_used_idx & (vq->size - 1);
> > +
> > +if (to + nr_copy <= vq->size) {
> > +rte_memcpy(&vq->used->ring[to],
> >  &vq-
> > >async_descs_split[from],
> >  nr_copy *
> >  sizeof(struct
> > vring_used_elem));
> > -} else {
> > -uint16_t size = vq->size - to;
> > +} else {
> > +uint16_t size = vq->size - to;
> >
> > -rte_memcpy(&vq->used->ring[to],
> > +rte_memcpy(&vq->used->ring[to],
> >  &vq-
> > >async_descs_split[from],
> >  size *
> >  sizeof(struct
> > vring_used_elem));
> > -rte_memcpy(vq->used->ring,
> > +rte_memcpy(vq->used->ring,
> >  &vq->async_descs_split[from
> > +
> >  size], (nr_copy - size) *
> >  sizeof(struct
> > vring_used_elem));
> > -}
> > +}
> > +
> > +vq->last_async_desc_idx += nr_copy;
> > +vq->last_used_idx += nr_copy;
> > +nr_left -= nr_copy;
> > +} while (nr_left > 0);
> > +
> > +__atomic_add_fetch(&vq->used->idx, n_descs,
> > +__ATOMIC_RELEASE);
> > +vhost_vring_call_split(dev, vq);
> > +}
> >
> > -vq->last_async_desc_idx += nr_copy;
> > -vq->last_used_idx += nr_copy;
> > -nr_left -= nr_copy;
> > -} while (nr_left > 0);
> >
> > -__atomic_add_fetch(&vq->used->idx, n_descs,
> > __ATOMIC_RELEASE);
> > -vhost_vring_call_split(dev, vq);
> > -} else
> > -vq->last_async_desc_idx += n_descs;
> > +
> > +} else {
> > +if (vq_is_packed(dev))
> > +vq->last_async_buffer_idx += n_buffers;
> > +else
> > +vq->last_async_desc_idx += n_descs;
> > +}
> 
> rte_vhost_poll_enqueue_completed is too long and not easy to read. Save
> suggestion
> as above.
> 

I can try to process some code into functions, but I'm not sure if this is necessary, I will discuss it with you later.

Thanks,
Cheng

> Thanks,
> Jiayu
> 
> >
> >  done:
> >  rte_spinlock_unlock(&vq->access_lock);
> > @@ -1767,9 +2167,10 @@ virtio_dev_rx_async_submit(struct virtio_net
> > *dev, uint16_t queue_id,
> >  if (count == 0)
> >  goto out;
> >
> > -/* TODO: packed queue not implemented */
> >  if (vq_is_packed(dev))
> > -nb_tx = 0;
> > +nb_tx = virtio_dev_rx_async_submit_packed(dev,
> > +vq, queue_id, pkts, count, comp_pkts,
> > +comp_count);
> >  else
> >  nb_tx = virtio_dev_rx_async_submit_split(dev,
> >  vq, queue_id, pkts, count, comp_pkts,
> > --
> > 2.29.2
>
diff mbox series

Patch

diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
index c855ff875..6faa31f5a 100644
--- a/lib/librte_vhost/rte_vhost_async.h
+++ b/lib/librte_vhost/rte_vhost_async.h
@@ -89,6 +89,7 @@  struct rte_vhost_async_channel_ops {
 struct async_inflight_info {
 	struct rte_mbuf *mbuf;
 	uint16_t descs; /* num of descs inflight */
+	uint16_t nr_buffers; /* num of buffers inflight for packed ring */
 };

 /**
diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c
index 52ab93d1e..51b44d6f2 100644
--- a/lib/librte_vhost/vhost.c
+++ b/lib/librte_vhost/vhost.c
@@ -330,15 +330,20 @@  vhost_free_async_mem(struct vhost_virtqueue *vq)
 {
 	if (vq->async_pkts_info)
 		rte_free(vq->async_pkts_info);
-	if (vq->async_descs_split)
+	if (vq->async_buffers_packed) {
+		rte_free(vq->async_buffers_packed);
+		vq->async_buffers_packed = NULL;
+	} else {
 		rte_free(vq->async_descs_split);
+		vq->async_descs_split = NULL;
+	}
+
 	if (vq->it_pool)
 		rte_free(vq->it_pool);
 	if (vq->vec_pool)
 		rte_free(vq->vec_pool);

 	vq->async_pkts_info = NULL;
-	vq->async_descs_split = NULL;
 	vq->it_pool = NULL;
 	vq->vec_pool = NULL;
 }
@@ -1603,9 +1608,9 @@  int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
 		return -1;

 	/* packed queue is not supported */
-	if (unlikely(vq_is_packed(dev) || !f.async_inorder)) {
+	if (unlikely(!f.async_inorder)) {
 		VHOST_LOG_CONFIG(ERR,
-			"async copy is not supported on packed queue or non-inorder mode "
+			"async copy is not supported on non-inorder mode "
 			"(vid %d, qid: %d)\n", vid, queue_id);
 		return -1;
 	}
@@ -1643,10 +1648,17 @@  int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
 	vq->vec_pool = rte_malloc_socket(NULL,
 			VHOST_MAX_ASYNC_VEC * sizeof(struct iovec),
 			RTE_CACHE_LINE_SIZE, node);
-	vq->async_descs_split = rte_malloc_socket(NULL,
+	if (vq_is_packed(dev)) {
+		vq->async_buffers_packed = rte_malloc_socket(NULL,
+			vq->size * sizeof(struct vring_used_elem_packed),
+			RTE_CACHE_LINE_SIZE, node);
+	} else {
+		vq->async_descs_split = rte_malloc_socket(NULL,
 			vq->size * sizeof(struct vring_used_elem),
 			RTE_CACHE_LINE_SIZE, node);
-	if (!vq->async_descs_split || !vq->async_pkts_info ||
+	}
+
+	if (!vq->async_pkts_info ||
 		!vq->it_pool || !vq->vec_pool) {
 		vhost_free_async_mem(vq);
 		VHOST_LOG_CONFIG(ERR,
diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
index 658f6fc28..d6324fbf8 100644
--- a/lib/librte_vhost/vhost.h
+++ b/lib/librte_vhost/vhost.h
@@ -206,9 +206,14 @@  struct vhost_virtqueue {
 	uint16_t	async_pkts_idx;
 	uint16_t	async_pkts_inflight_n;
 	uint16_t	async_last_pkts_n;
-	struct vring_used_elem  *async_descs_split;
+	union {
+		struct vring_used_elem  *async_descs_split;
+		struct vring_used_elem_packed *async_buffers_packed;
+	};
 	uint16_t async_desc_idx;
+	uint16_t async_packed_buffer_idx;
 	uint16_t last_async_desc_idx;
+	uint16_t last_async_buffer_idx;

 	/* vq async features */
 	bool		async_inorder;
diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index 583bf379c..fa2dfde02 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -363,8 +363,7 @@  vhost_shadow_dequeue_single_packed_inorder(struct vhost_virtqueue *vq,
 }

 static __rte_always_inline void
-vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
-				   struct vhost_virtqueue *vq,
+vhost_shadow_enqueue_packed(struct vhost_virtqueue *vq,
 				   uint32_t len[],
 				   uint16_t id[],
 				   uint16_t count[],
@@ -382,6 +381,17 @@  vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
 		vq->shadow_aligned_idx += count[i];
 		vq->shadow_used_idx++;
 	}
+}
+
+static __rte_always_inline void
+vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
+				   struct vhost_virtqueue *vq,
+				   uint32_t len[],
+				   uint16_t id[],
+				   uint16_t count[],
+				   uint16_t num_buffers)
+{
+	vhost_shadow_enqueue_packed(vq, len, id, count, num_buffers);

 	if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
 		do_data_copy_enqueue(dev, vq);
@@ -1452,6 +1462,73 @@  virtio_dev_rx_async_get_info_idx(uint16_t pkts_idx,
 		(vq_size - n_inflight + pkts_idx) & (vq_size - 1);
 }

+static __rte_always_inline void
+vhost_update_used_packed(struct virtio_net *dev,
+				  struct vhost_virtqueue *vq,
+				  struct vring_used_elem_packed *shadow_ring,
+				  uint16_t count)
+{
+	if (count == 0)
+		return;
+	int i;
+	uint16_t used_idx = vq->last_used_idx;
+	uint16_t head_idx = vq->last_used_idx;
+	uint16_t head_flags = 0;
+
+	/* Split loop in two to save memory barriers */
+	for (i = 0; i < count; i++) {
+		vq->desc_packed[used_idx].id = shadow_ring[i].id;
+		vq->desc_packed[used_idx].len = shadow_ring[i].len;
+
+		used_idx += shadow_ring[i].count;
+		if (used_idx >= vq->size)
+			used_idx -= vq->size;
+	}
+
+	/* The ordering for storing desc flags needs to be enforced. */
+	rte_atomic_thread_fence(__ATOMIC_RELEASE);
+
+	for (i = 0; i < count; i++) {
+		uint16_t flags;
+
+		if (vq->shadow_used_packed[i].len)
+			flags = VRING_DESC_F_WRITE;
+		else
+			flags = 0;
+
+		if (vq->used_wrap_counter) {
+			flags |= VRING_DESC_F_USED;
+			flags |= VRING_DESC_F_AVAIL;
+		} else {
+			flags &= ~VRING_DESC_F_USED;
+			flags &= ~VRING_DESC_F_AVAIL;
+		}
+
+		if (i > 0) {
+			vq->desc_packed[vq->last_used_idx].flags = flags;
+
+			vhost_log_cache_used_vring(dev, vq,
+					vq->last_used_idx *
+					sizeof(struct vring_packed_desc),
+					sizeof(struct vring_packed_desc));
+		} else {
+			head_idx = vq->last_used_idx;
+			head_flags = flags;
+		}
+
+		vq_inc_last_used_packed(vq, shadow_ring[i].count);
+	}
+
+	vq->desc_packed[head_idx].flags = head_flags;
+
+	vhost_log_cache_used_vring(dev, vq,
+				head_idx *
+				sizeof(struct vring_packed_desc),
+				sizeof(struct vring_packed_desc));
+
+	vhost_log_cache_sync(dev, vq);
+}
+
 static __rte_noinline uint32_t
 virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	struct vhost_virtqueue *vq, uint16_t queue_id,
@@ -1633,12 +1710,292 @@  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	return pkt_idx;
 }

+static __rte_always_inline int
+vhost_enqueue_async_single_packed(struct virtio_net *dev,
+			    struct vhost_virtqueue *vq,
+			    struct rte_mbuf *pkt,
+			    struct buf_vector *buf_vec,
+			    uint16_t *nr_descs,
+			    uint16_t *nr_buffers,
+			    struct iovec *src_iovec, struct iovec *dst_iovec,
+			    struct rte_vhost_iov_iter *src_it,
+			    struct rte_vhost_iov_iter *dst_it)
+{
+	uint16_t nr_vec = 0;
+	uint16_t avail_idx = vq->last_avail_idx;
+	uint16_t max_tries, tries = 0;
+	uint16_t buf_id = 0;
+	uint32_t len = 0;
+	uint16_t desc_count;
+	uint32_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
+	uint32_t buffer_len[vq->size];
+	uint16_t buffer_buf_id[vq->size];
+	uint16_t buffer_desc_count[vq->size];
+	*nr_buffers = 0;
+
+	if (rxvq_is_mergeable(dev))
+		max_tries = vq->size - 1;
+	else
+		max_tries = 1;
+
+	while (size > 0) {
+		/*
+		 * if we tried all available ring items, and still
+		 * can't get enough buf, it means something abnormal
+		 * happened.
+		 */
+		if (unlikely(++tries > max_tries))
+			return -1;
+
+		if (unlikely(fill_vec_buf_packed(dev, vq,
+						avail_idx, &desc_count,
+						buf_vec, &nr_vec,
+						&buf_id, &len,
+						VHOST_ACCESS_RW) < 0))
+			return -1;
+
+		len = RTE_MIN(len, size);
+		size -= len;
+
+		buffer_len[*nr_buffers] = len;
+		buffer_buf_id[*nr_buffers] = buf_id;
+		buffer_desc_count[*nr_buffers] = desc_count;
+		*nr_buffers += 1;
+
+		*nr_descs += desc_count;
+		avail_idx += desc_count;
+		if (avail_idx >= vq->size)
+			avail_idx -= vq->size;
+	}
+
+	if (async_mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, *nr_buffers,
+		src_iovec, dst_iovec, src_it, dst_it) < 0)
+		return -1;
+
+	vhost_shadow_enqueue_packed(vq, buffer_len, buffer_buf_id,
+					   buffer_desc_count, *nr_buffers);
+
+	return 0;
+}
+
+static __rte_always_inline int16_t
+virtio_dev_rx_async_single_packed(struct virtio_net *dev,
+			    struct vhost_virtqueue *vq,
+			    struct rte_mbuf *pkt,
+			    uint16_t *nr_descs, uint16_t *nr_buffers,
+			    struct iovec *src_iovec, struct iovec *dst_iovec,
+			    struct rte_vhost_iov_iter *src_it,
+			    struct rte_vhost_iov_iter *dst_it)
+{
+	struct buf_vector buf_vec[BUF_VECTOR_MAX];
+	*nr_descs = 0;
+	*nr_buffers = 0;
+
+	if (unlikely(vhost_enqueue_async_single_packed(dev, vq, pkt, buf_vec,
+						 nr_descs,
+						 nr_buffers,
+						 src_iovec, dst_iovec,
+						 src_it, dst_it) < 0)) {
+		VHOST_LOG_DATA(DEBUG,
+				"(%d) failed to get enough desc from vring\n",
+				dev->vid);
+		return -1;
+	}
+
+	VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
+			dev->vid, vq->last_avail_idx,
+			vq->last_avail_idx + *nr_descs);
+
+	return 0;
+}
+
+static __rte_noinline uint32_t
+virtio_dev_rx_async_submit_packed(struct virtio_net *dev,
+	struct vhost_virtqueue *vq, uint16_t queue_id,
+	struct rte_mbuf **pkts, uint32_t count,
+	struct rte_mbuf **comp_pkts, uint32_t *comp_count)
+{
+	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
+	uint16_t num_buffers;
+	uint16_t num_desc;
+
+	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
+	struct iovec *vec_pool = vq->vec_pool;
+	struct rte_vhost_async_desc tdes[MAX_PKT_BURST];
+	struct iovec *src_iovec = vec_pool;
+	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
+	struct rte_vhost_iov_iter *src_it = it_pool;
+	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
+	uint16_t slot_idx = 0;
+	uint16_t segs_await = 0;
+	struct async_inflight_info *pkts_info = vq->async_pkts_info;
+	uint32_t n_pkts = 0, pkt_err = 0;
+	uint32_t num_async_pkts = 0, num_done_pkts = 0;
+
+	rte_prefetch0(&vq->desc[vq->last_avail_idx & (vq->size - 1)]);
+
+	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
+		if (unlikely(virtio_dev_rx_async_single_packed(dev, vq,
+						pkts[pkt_idx],
+						&num_desc, &num_buffers,
+						src_iovec, dst_iovec,
+						src_it, dst_it) < 0)) {
+			break;
+		}
+
+		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
+			dev->vid, vq->last_avail_idx,
+			vq->last_avail_idx + num_desc);
+
+		slot_idx = (vq->async_pkts_idx + num_async_pkts) &
+			(vq->size - 1);
+		if (src_it->count) {
+			uint16_t from, to;
+
+			async_fill_desc(&tdes[pkt_burst_idx++], src_it, dst_it);
+			pkts_info[slot_idx].descs = num_desc;
+			pkts_info[slot_idx].nr_buffers = num_buffers;
+			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
+			num_async_pkts++;
+			src_iovec += src_it->nr_segs;
+			dst_iovec += dst_it->nr_segs;
+			src_it += 2;
+			dst_it += 2;
+			segs_await += src_it->nr_segs;
+
+			/**
+			 * recover shadow used ring and keep DMA-occupied
+			 * descriptors.
+			 */
+			from = vq->shadow_used_idx - num_buffers;
+			to = vq->async_packed_buffer_idx & (vq->size - 1);
+			if (num_buffers + to <= vq->size) {
+				rte_memcpy(&vq->async_buffers_packed[to],
+					&vq->shadow_used_packed[from],
+					num_buffers *
+					sizeof(struct vring_used_elem_packed));
+			} else {
+				int size = vq->size - to;
+
+				rte_memcpy(&vq->async_buffers_packed[to],
+					&vq->shadow_used_packed[from],
+					size *
+					sizeof(struct vring_used_elem_packed));
+				rte_memcpy(vq->async_buffers_packed,
+					&vq->shadow_used_packed[from +
+					size], (num_buffers - size) *
+					sizeof(struct vring_used_elem_packed));
+			}
+			vq->async_packed_buffer_idx += num_buffers;
+			vq->shadow_used_idx -= num_buffers;
+		} else
+			comp_pkts[num_done_pkts++] = pkts[pkt_idx];
+
+		vq_inc_last_avail_packed(vq, num_desc);
+
+		/*
+		 * conditions to trigger async device transfer:
+		 * - buffered packet number reaches transfer threshold
+		 * - unused async iov number is less than max vhost vector
+		 */
+		if (unlikely(pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
+			((VHOST_MAX_ASYNC_VEC >> 1) - segs_await <
+			BUF_VECTOR_MAX))) {
+			n_pkts = vq->async_ops.transfer_data(dev->vid,
+					queue_id, tdes, 0, pkt_burst_idx);
+			src_iovec = vec_pool;
+			dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
+			src_it = it_pool;
+			dst_it = it_pool + 1;
+			segs_await = 0;
+			vq->async_pkts_inflight_n += n_pkts;
+
+			if (unlikely(n_pkts < pkt_burst_idx)) {
+				/*
+				 * log error packets number here and do actual
+				 * error processing when applications poll
+				 * completion
+				 */
+				pkt_err = pkt_burst_idx - n_pkts;
+				pkt_burst_idx = 0;
+				pkt_idx++;
+				break;
+			}
+
+			pkt_burst_idx = 0;
+		}
+	}
+
+	if (pkt_burst_idx) {
+		n_pkts = vq->async_ops.transfer_data(dev->vid,
+				queue_id, tdes, 0, pkt_burst_idx);
+		vq->async_pkts_inflight_n += n_pkts;
+
+		if (unlikely(n_pkts < pkt_burst_idx))
+			pkt_err = pkt_burst_idx - n_pkts;
+	}
+
+	do_data_copy_enqueue(dev, vq);
+
+	if (unlikely(pkt_err)) {
+		uint16_t buffers_err = 0;
+		uint16_t async_buffer_idx;
+		uint16_t i;
+
+		num_async_pkts -= pkt_err;
+		pkt_idx -= pkt_err;
+		/* calculate the sum of buffers of DMA-error packets. */
+		while (pkt_err-- > 0) {
+			buffers_err +=
+				pkts_info[slot_idx & (vq->size - 1)].nr_buffers;
+			slot_idx--;
+		}
+
+		vq->async_packed_buffer_idx -= buffers_err;
+		async_buffer_idx = vq->async_packed_buffer_idx;
+		/* set 0 to the length of descriptors of DMA-error packets */
+		for (i = 0; i < buffers_err; i++) {
+			vq->async_buffers_packed[(async_buffer_idx + i)
+						& (vq->size - 1)].len = 0;
+		}
+		/* write back DMA-error descriptors to used ring */
+		do {
+			uint16_t from = async_buffer_idx & (vq->size - 1);
+			uint16_t to = (from + buffers_err) & (vq->size - 1);
+
+			if (to > from) {
+				vhost_update_used_packed(dev, vq,
+					vq->async_buffers_packed + from,
+					to - from);
+				buffers_err = 0;
+			} else {
+				vhost_update_used_packed(dev, vq,
+					vq->async_buffers_packed + from,
+					vq->size - from);
+				buffers_err -= vq->size - from;
+			}
+		} while (buffers_err > 0);
+		vhost_vring_call_packed(dev, vq);
+		num_done_pkts = pkt_idx - num_async_pkts;
+	}
+
+	vq->async_pkts_idx += num_async_pkts;
+	*comp_count = num_done_pkts;
+
+	if (likely(vq->shadow_used_idx)) {
+		vhost_flush_enqueue_shadow_packed(dev, vq);
+		vhost_vring_call_packed(dev, vq);
+	}
+
+	return pkt_idx;
+}
+
 uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 		struct rte_mbuf **pkts, uint16_t count)
 {
 	struct virtio_net *dev = get_device(vid);
 	struct vhost_virtqueue *vq;
-	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0;
+	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0, n_buffers = 0;
 	uint16_t start_idx, pkts_idx, vq_size;
 	struct async_inflight_info *pkts_info;
 	uint16_t from, i;
@@ -1680,53 +2037,96 @@  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 		goto done;
 	}

-	for (i = 0; i < n_pkts_put; i++) {
-		from = (start_idx + i) & (vq_size - 1);
-		n_descs += pkts_info[from].descs;
-		pkts[i] = pkts_info[from].mbuf;
+	if (vq_is_packed(dev)) {
+		for (i = 0; i < n_pkts_put; i++) {
+			from = (start_idx + i) & (vq_size - 1);
+			n_buffers += pkts_info[from].nr_buffers;
+			pkts[i] = pkts_info[from].mbuf;
+		}
+	} else {
+		for (i = 0; i < n_pkts_put; i++) {
+			from = (start_idx + i) & (vq_size - 1);
+			n_descs += pkts_info[from].descs;
+			pkts[i] = pkts_info[from].mbuf;
+		}
 	}
+
 	vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
 	vq->async_pkts_inflight_n -= n_pkts_put;

 	if (likely(vq->enabled && vq->access_ok)) {
-		uint16_t nr_left = n_descs;
 		uint16_t nr_copy;
 		uint16_t to;

 		/* write back completed descriptors to used ring */
-		do {
-			from = vq->last_async_desc_idx & (vq->size - 1);
-			nr_copy = nr_left + from <= vq->size ? nr_left :
-				vq->size - from;
-			to = vq->last_used_idx & (vq->size - 1);
-
-			if (to + nr_copy <= vq->size) {
-				rte_memcpy(&vq->used->ring[to],
+		if (vq_is_packed(dev)) {
+			uint16_t nr_left = n_buffers;
+			uint16_t to;
+			do {
+				from = vq->last_async_buffer_idx &
+								(vq->size - 1);
+				to = (from + nr_left) & (vq->size - 1);
+
+				if (to > from) {
+					vhost_update_used_packed(dev, vq,
+						vq->async_buffers_packed + from,
+						to - from);
+					vq->last_async_buffer_idx += nr_left;
+					nr_left = 0;
+				} else {
+					vhost_update_used_packed(dev, vq,
+						vq->async_buffers_packed + from,
+						vq->size - from);
+					vq->last_async_buffer_idx +=
+								vq->size - from;
+					nr_left -= vq->size - from;
+				}
+			} while (nr_left > 0);
+			vhost_vring_call_packed(dev, vq);
+		} else {
+			uint16_t nr_left = n_descs;
+			do {
+				from = vq->last_async_desc_idx & (vq->size - 1);
+				nr_copy = nr_left + from <= vq->size ? nr_left :
+					vq->size - from;
+				to = vq->last_used_idx & (vq->size - 1);
+
+				if (to + nr_copy <= vq->size) {
+					rte_memcpy(&vq->used->ring[to],
 						&vq->async_descs_split[from],
 						nr_copy *
 						sizeof(struct vring_used_elem));
-			} else {
-				uint16_t size = vq->size - to;
+				} else {
+					uint16_t size = vq->size - to;

-				rte_memcpy(&vq->used->ring[to],
+					rte_memcpy(&vq->used->ring[to],
 						&vq->async_descs_split[from],
 						size *
 						sizeof(struct vring_used_elem));
-				rte_memcpy(vq->used->ring,
+					rte_memcpy(vq->used->ring,
 						&vq->async_descs_split[from +
 						size], (nr_copy - size) *
 						sizeof(struct vring_used_elem));
-			}
+				}
+
+				vq->last_async_desc_idx += nr_copy;
+				vq->last_used_idx += nr_copy;
+				nr_left -= nr_copy;
+			} while (nr_left > 0);
+
+			__atomic_add_fetch(&vq->used->idx, n_descs,
+					__ATOMIC_RELEASE);
+			vhost_vring_call_split(dev, vq);
+		}

-			vq->last_async_desc_idx += nr_copy;
-			vq->last_used_idx += nr_copy;
-			nr_left -= nr_copy;
-		} while (nr_left > 0);

-		__atomic_add_fetch(&vq->used->idx, n_descs, __ATOMIC_RELEASE);
-		vhost_vring_call_split(dev, vq);
-	} else
-		vq->last_async_desc_idx += n_descs;
+
+	} else {
+		if (vq_is_packed(dev))
+			vq->last_async_buffer_idx += n_buffers;
+		else
+			vq->last_async_desc_idx += n_descs;
+	}

 done:
 	rte_spinlock_unlock(&vq->access_lock);
@@ -1767,9 +2167,10 @@  virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
 	if (count == 0)
 		goto out;

-	/* TODO: packed queue not implemented */
 	if (vq_is_packed(dev))
-		nb_tx = 0;
+		nb_tx = virtio_dev_rx_async_submit_packed(dev,
+				vq, queue_id, pkts, count, comp_pkts,
+				comp_count);
 	else
 		nb_tx = virtio_dev_rx_async_submit_split(dev,
 				vq, queue_id, pkts, count, comp_pkts,