diff mbox series

[v5,2/4] vhost: add support for packed ring in async vhost

Message ID 20210412113430.17587-3-Cheng1.jiang@intel.com (mailing list archive)
State Changes Requested, archived
Delegated to: Maxime Coquelin
Headers show
Series add support for packed ring in async vhost | expand

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Jiang, Cheng1 April 12, 2021, 11:34 a.m. UTC
For now async vhost data path only supports split ring structure. In
order to make async vhost compatible with virtio 1.1 spec this patch
enables packed ring in async vhost data path.

Signed-off-by: Cheng Jiang <Cheng1.jiang@intel.com>
---
 lib/librte_vhost/rte_vhost_async.h |   1 +
 lib/librte_vhost/vhost.c           |  27 +-
 lib/librte_vhost/vhost.h           |   7 +-
 lib/librte_vhost/virtio_net.c      | 438 +++++++++++++++++++++++++++--
 4 files changed, 448 insertions(+), 25 deletions(-)

Comments

Maxime Coquelin April 13, 2021, 8:36 a.m. UTC | #1
On 4/12/21 1:34 PM, Cheng Jiang wrote:
> For now async vhost data path only supports split ring structure. In
> order to make async vhost compatible with virtio 1.1 spec this patch
> enables packed ring in async vhost data path.
> 
> Signed-off-by: Cheng Jiang <Cheng1.jiang@intel.com>
> ---
>  lib/librte_vhost/rte_vhost_async.h |   1 +
>  lib/librte_vhost/vhost.c           |  27 +-
>  lib/librte_vhost/vhost.h           |   7 +-
>  lib/librte_vhost/virtio_net.c      | 438 +++++++++++++++++++++++++++--
>  4 files changed, 448 insertions(+), 25 deletions(-)
> 
> diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
> index c855ff875..6faa31f5a 100644
> --- a/lib/librte_vhost/rte_vhost_async.h
> +++ b/lib/librte_vhost/rte_vhost_async.h
> @@ -89,6 +89,7 @@ struct rte_vhost_async_channel_ops {
>  struct async_inflight_info {
>  	struct rte_mbuf *mbuf;
>  	uint16_t descs; /* num of descs inflight */
> +	uint16_t nr_buffers; /* num of buffers inflight for packed ring */
>  };
>  
>  /**
> diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c
> index a70fe01d8..8c9935c0f 100644
> --- a/lib/librte_vhost/vhost.c
> +++ b/lib/librte_vhost/vhost.c
> @@ -342,15 +342,21 @@ vhost_free_async_mem(struct vhost_virtqueue *vq)
>  {
>  	if (vq->async_pkts_info)
>  		rte_free(vq->async_pkts_info);
> -	if (vq->async_descs_split)
> +	if (vq->async_buffers_packed) {
> +		rte_free(vq->async_buffers_packed);
> +		vq->async_buffers_packed = NULL;
> +	}
> +	if (vq->async_descs_split) {

You can remove the check, rte_free is safe with NULL pointers.
You can do the same for the other ones in this function.

>  		rte_free(vq->async_descs_split);
> +		vq->async_descs_split = NULL;
> +	}
> +
>  	if (vq->it_pool)
>  		rte_free(vq->it_pool);
>  	if (vq->vec_pool)
>  		rte_free(vq->vec_pool);
>  
>  	vq->async_pkts_info = NULL;
> -	vq->async_descs_split = NULL;
>  	vq->it_pool = NULL;
>  	vq->vec_pool = NULL;
>  }
> @@ -1627,9 +1633,9 @@ int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
>  		return -1;
>  
>  	/* packed queue is not supported */
> -	if (unlikely(vq_is_packed(dev) || !f.async_inorder)) {
> +	if (unlikely(!f.async_inorder)) {
>  		VHOST_LOG_CONFIG(ERR,
> -			"async copy is not supported on packed queue or non-inorder mode "
> +			"async copy is not supported on non-inorder mode "
>  			"(vid %d, qid: %d)\n", vid, queue_id);
>  		return -1;
>  	}
> @@ -1667,11 +1673,18 @@ int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
>  	vq->vec_pool = rte_malloc_socket(NULL,
>  			VHOST_MAX_ASYNC_VEC * sizeof(struct iovec),
>  			RTE_CACHE_LINE_SIZE, node);
> -	vq->async_descs_split = rte_malloc_socket(NULL,
> +	if (vq_is_packed(dev)) {
> +		vq->async_buffers_packed = rte_malloc_socket(NULL,
> +			vq->size * sizeof(struct vring_used_elem_packed),
> +			RTE_CACHE_LINE_SIZE, node);
> +	} else {
> +		vq->async_descs_split = rte_malloc_socket(NULL,
>  			vq->size * sizeof(struct vring_used_elem),
>  			RTE_CACHE_LINE_SIZE, node);
> -	if (!vq->async_descs_split || !vq->async_pkts_info ||
> -		!vq->it_pool || !vq->vec_pool) {
> +	}
> +
> +	if (!vq->async_buffers_packed || !vq->async_descs_split ||
> +		!vq->async_pkts_info || !vq->it_pool || !vq->vec_pool) {
>  		vhost_free_async_mem(vq);
>  		VHOST_LOG_CONFIG(ERR,
>  				"async register failed: cannot allocate memory for vq data "
> diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> index f628714c2..fe131ae8f 100644
> --- a/lib/librte_vhost/vhost.h
> +++ b/lib/librte_vhost/vhost.h
> @@ -201,9 +201,14 @@ struct vhost_virtqueue {
>  	uint16_t	async_pkts_idx;
>  	uint16_t	async_pkts_inflight_n;
>  	uint16_t	async_last_pkts_n;
> -	struct vring_used_elem  *async_descs_split;
> +	union {
> +		struct vring_used_elem  *async_descs_split;
> +		struct vring_used_elem_packed *async_buffers_packed;
> +	};
>  	uint16_t async_desc_idx;
> +	uint16_t async_packed_buffer_idx;

Don't dupplicate variable names, async_desc_idx can be reused for packed
ring. Also, they are representing the same thing, why use desc in one
case and buffer in the other?

>  	uint16_t last_async_desc_idx;
> +	uint16_t last_async_buffer_idx;

Same remark here.

>  	/* vq async features */
>  	bool		async_inorder;
> diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> index c43ab0093..410be9678 100644
> --- a/lib/librte_vhost/virtio_net.c
> +++ b/lib/librte_vhost/virtio_net.c
> @@ -363,14 +363,14 @@ vhost_shadow_dequeue_single_packed_inorder(struct vhost_virtqueue *vq,
>  }
>  
>  static __rte_always_inline void
> -vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
> -				   struct vhost_virtqueue *vq,
> -				   uint32_t len[],
> -				   uint16_t id[],
> -				   uint16_t count[],
> +vhost_shadow_enqueue_packed(struct vhost_virtqueue *vq,
> +				   uint32_t *len,
> +				   uint16_t *id,
> +				   uint16_t *count,
>  				   uint16_t num_buffers)
>  {
>  	uint16_t i;
> +
>  	for (i = 0; i < num_buffers; i++) {
>  		/* enqueue shadow flush action aligned with batch num */
>  		if (!vq->shadow_used_idx)
> @@ -382,6 +382,17 @@ vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
>  		vq->shadow_aligned_idx += count[i];
>  		vq->shadow_used_idx++;
>  	}
> +}
> +
> +static __rte_always_inline void
> +vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
> +				   struct vhost_virtqueue *vq,
> +				   uint32_t *len,
> +				   uint16_t *id,
> +				   uint16_t *count,
> +				   uint16_t num_buffers)
> +{
> +	vhost_shadow_enqueue_packed(vq, len, id, count, num_buffers);
>  
>  	if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
>  		do_data_copy_enqueue(dev, vq);
> @@ -1481,6 +1492,62 @@ shadow_ring_store(struct vhost_virtqueue *vq,  void *shadow_ring, void *d_ring,
>  	}
>  }
>  
> +static __rte_always_inline void
> +vhost_update_used_packed(struct vhost_virtqueue *vq,
> +			struct vring_used_elem_packed *shadow_ring,
> +			uint16_t count)
> +{
> +	if (count == 0)
> +		return;

Move this after the variables declaration.

> +
> +	int i;
> +	uint16_t used_idx = vq->last_used_idx;
> +	uint16_t head_idx = vq->last_used_idx;
> +	uint16_t head_flags = 0;
> +
> +	/* Split loop in two to save memory barriers */
> +	for (i = 0; i < count; i++) {
> +		vq->desc_packed[used_idx].id = shadow_ring[i].id;
> +		vq->desc_packed[used_idx].len = shadow_ring[i].len;
> +
> +		used_idx += shadow_ring[i].count;
> +		if (used_idx >= vq->size)
> +			used_idx -= vq->size;
> +	}
> +
> +	/* The ordering for storing desc flags needs to be enforced. */
> +	rte_atomic_thread_fence(__ATOMIC_RELEASE);
> +
> +	for (i = 0; i < count; i++) {
> +		uint16_t flags;
> +
> +		if (vq->shadow_used_packed[i].len)
> +			flags = VRING_DESC_F_WRITE;
> +		else
> +			flags = 0;
> +
> +		if (vq->used_wrap_counter) {
> +			flags |= VRING_DESC_F_USED;
> +			flags |= VRING_DESC_F_AVAIL;
> +		} else {
> +			flags &= ~VRING_DESC_F_USED;
> +			flags &= ~VRING_DESC_F_AVAIL;
> +		}
> +
> +		if (i > 0) {
> +			vq->desc_packed[vq->last_used_idx].flags = flags;
> +
> +		} else {
> +			head_idx = vq->last_used_idx;
> +			head_flags = flags;
> +		}
> +
> +		vq_inc_last_used_packed(vq, shadow_ring[i].count);
> +	}
> +
> +	vq->desc_packed[head_idx].flags = head_flags;
> +}
> +
>  static __rte_noinline uint32_t
>  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
>  	struct vhost_virtqueue *vq, uint16_t queue_id,
> @@ -1656,6 +1723,294 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
>  	return pkt_idx;
>  }
>  
> +static __rte_always_inline int
> +vhost_enqueue_async_single_packed(struct virtio_net *dev,
> +			    struct vhost_virtqueue *vq,
> +			    struct rte_mbuf *pkt,
> +			    struct buf_vector *buf_vec,
> +			    uint16_t *nr_descs,
> +			    uint16_t *nr_buffers,
> +			    struct vring_packed_desc *async_descs,
> +			    struct iovec *src_iovec, struct iovec *dst_iovec,
> +			    struct rte_vhost_iov_iter *src_it,
> +			    struct rte_vhost_iov_iter *dst_it)
> +{
> +	uint16_t nr_vec = 0;
> +	uint16_t avail_idx = vq->last_avail_idx;
> +	uint16_t max_tries, tries = 0;
> +	uint16_t buf_id = 0;
> +	uint32_t len = 0;
> +	uint16_t desc_count = 0;
> +	uint32_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
> +	uint32_t buffer_len[vq->size];
> +	uint16_t buffer_buf_id[vq->size];
> +	uint16_t buffer_desc_count[vq->size];
> +	*nr_buffers = 0;
> +
> +	if (rxvq_is_mergeable(dev))
> +		max_tries = vq->size - 1;
> +	else
> +		max_tries = 1;
> +
> +	while (size > 0) {
> +		/*
> +		 * if we tried all available ring items, and still
> +		 * can't get enough buf, it means something abnormal
> +		 * happened.
> +		 */
> +		if (unlikely(++tries > max_tries))
> +			return -1;
> +
> +		if (unlikely(fill_vec_buf_packed(dev, vq,
> +						avail_idx, &desc_count,
> +						buf_vec, &nr_vec,
> +						&buf_id, &len,
> +						VHOST_ACCESS_RW) < 0))
> +			return -1;
> +
> +		len = RTE_MIN(len, size);
> +		size -= len;
> +
> +		buffer_len[*nr_buffers] = len;
> +		buffer_buf_id[*nr_buffers] = buf_id;
> +		buffer_desc_count[*nr_buffers] = desc_count;
> +		*nr_buffers += 1;
> +
> +		*nr_descs += desc_count;
> +		avail_idx += desc_count;
> +		if (avail_idx >= vq->size)
> +			avail_idx -= vq->size;
> +	}
> +
> +	if (async_mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, *nr_buffers,
> +		src_iovec, dst_iovec, src_it, dst_it) < 0)
> +		return -1;
> +	/* store descriptors for DMA */
> +	if (avail_idx >= *nr_descs)
> +		rte_memcpy(async_descs,
> +			&vq->desc_packed[vq->last_avail_idx],
> +			*nr_descs * sizeof(struct vring_packed_desc));

Please add brackets for the 'if' since there are for the 'else'.

> +	else {
> +		uint16_t nr_copy = vq->size - vq->last_avail_idx;
> +		rte_memcpy(async_descs,
> +			&vq->desc_packed[vq->last_avail_idx],
> +			nr_copy * sizeof(struct vring_packed_desc));
> +		rte_memcpy(async_descs + nr_copy,
> +			vq->desc_packed, (*nr_descs - nr_copy) *
> +			sizeof(struct vring_packed_desc));
> +	}
> +
> +	vhost_shadow_enqueue_packed(vq, buffer_len, buffer_buf_id,
> +					   buffer_desc_count, *nr_buffers);
> +
> +	return 0;
> +}
> +
> +static __rte_always_inline int16_t
> +virtio_dev_rx_async_single_packed(struct virtio_net *dev,
> +			    struct vhost_virtqueue *vq,
> +			    struct rte_mbuf *pkt,
> +			    uint16_t *nr_descs, uint16_t *nr_buffers,
> +			    struct vring_packed_desc *async_descs,
> +			    struct iovec *src_iovec, struct iovec *dst_iovec,
> +			    struct rte_vhost_iov_iter *src_it,
> +			    struct rte_vhost_iov_iter *dst_it)
> +{
> +	struct buf_vector buf_vec[BUF_VECTOR_MAX];
> +	*nr_descs = 0;
> +	*nr_buffers = 0;
> +
> +	if (unlikely(vhost_enqueue_async_single_packed(dev, vq, pkt, buf_vec,
> +						 nr_descs,
> +						 nr_buffers,
> +						 async_descs,
> +						 src_iovec, dst_iovec,
> +						 src_it, dst_it) < 0)) {
> +		VHOST_LOG_DATA(DEBUG,
> +				"(%d) failed to get enough desc from vring\n",
> +				dev->vid);
> +		return -1;
> +	}
> +
> +	VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
> +			dev->vid, vq->last_avail_idx,
> +			vq->last_avail_idx + *nr_descs);
> +
> +	return 0;
> +}
> +
> +static __rte_noinline uint32_t
> +virtio_dev_rx_async_submit_packed(struct virtio_net *dev,
> +	struct vhost_virtqueue *vq, uint16_t queue_id,
> +	struct rte_mbuf **pkts, uint32_t count,
> +	struct rte_mbuf **comp_pkts, uint32_t *comp_count)
> +{
> +	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
> +	uint16_t async_descs_idx = 0;
> +	uint16_t num_buffers;
> +	uint16_t num_desc;
> +
> +	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
> +	struct iovec *vec_pool = vq->vec_pool;
> +	struct rte_vhost_async_desc tdes[MAX_PKT_BURST];
> +	struct iovec *src_iovec = vec_pool;
> +	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
> +	struct rte_vhost_iov_iter *src_it = it_pool;
> +	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
> +	uint16_t slot_idx = 0;
> +	uint16_t segs_await = 0;
> +	uint16_t iovec_idx = 0, it_idx = 0;
> +	struct async_inflight_info *pkts_info = vq->async_pkts_info;
> +	uint32_t n_pkts = 0, pkt_err = 0;
> +	uint32_t num_async_pkts = 0, num_done_pkts = 0;
> +	struct vring_packed_desc async_descs[vq->size];
> +
> +	rte_prefetch0(&vq->desc_packed[vq->last_avail_idx & (vq->size - 1)]);

The size of the ring is not necessarily a power of two with packed ring.

> +
> +	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
> +		if (unlikely(virtio_dev_rx_async_single_packed(dev, vq,
> +						pkts[pkt_idx],
> +						&num_desc, &num_buffers,
> +						&async_descs[async_descs_idx],
> +						&src_iovec[iovec_idx],
> +						&dst_iovec[iovec_idx],
> +						&src_it[it_idx],
> +						&dst_it[it_idx]) < 0))
> +			break;
> +
> +		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
> +			dev->vid, vq->last_avail_idx,
> +			vq->last_avail_idx + num_desc);
> +
> +		slot_idx = (vq->async_pkts_idx + num_async_pkts) &
> +			(vq->size - 1);

Same here.

> +		if (src_it[it_idx].count) {
> +			uint16_t from, to;
> +
> +			async_descs_idx += num_desc;
> +			async_fill_desc(&tdes[pkt_burst_idx++], &src_it[it_idx],
> +					&dst_it[it_idx]);
> +			pkts_info[slot_idx].descs = num_desc;
> +			pkts_info[slot_idx].nr_buffers = num_buffers;
> +			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
> +			num_async_pkts++;
> +			iovec_idx += src_it[it_idx].nr_segs;
> +			it_idx += 2;
> +
> +			segs_await += src_it[it_idx].nr_segs;
> +
> +			/**
> +			 * recover shadow used ring and keep DMA-occupied
> +			 * descriptors.
> +			 */
> +			from = vq->shadow_used_idx - num_buffers;
> +			to = vq->async_packed_buffer_idx & (vq->size - 1);
> +			shadow_ring_store(vq, vq->shadow_used_packed,
> +					vq->async_buffers_packed,
> +					from, to, num_buffers,
> +					sizeof(struct vring_used_elem_packed));
> +
> +			vq->async_packed_buffer_idx += num_buffers;
> +			vq->shadow_used_idx -= num_buffers;
> +		} else

Brackets needed.

> +			comp_pkts[num_done_pkts++] = pkts[pkt_idx];
> +
> +		vq_inc_last_avail_packed(vq, num_desc);
> +
> +		/*
> +		 * conditions to trigger async device transfer:
> +		 * - buffered packet number reaches transfer threshold
> +		 * - unused async iov number is less than max vhost vector
> +		 */
> +		if (unlikely(pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
> +			((VHOST_MAX_ASYNC_VEC >> 1) - segs_await <
> +			BUF_VECTOR_MAX))) {
> +			n_pkts = vq->async_ops.transfer_data(dev->vid,
> +					queue_id, tdes, 0, pkt_burst_idx);
> +			iovec_idx = 0;
> +			it_idx = 0;
> +			segs_await = 0;
> +			vq->async_pkts_inflight_n += n_pkts;
> +
> +			if (unlikely(n_pkts < pkt_burst_idx)) {
> +				/*
> +				 * log error packets number here and do actual
> +				 * error processing when applications poll
> +				 * completion
> +				 */
> +				pkt_err = pkt_burst_idx - n_pkts;
> +				pkt_burst_idx = 0;
> +				pkt_idx++;
> +				break;
> +			}
> +
> +			pkt_burst_idx = 0;
> +		}
> +	}
> +
> +	if (pkt_burst_idx) {
> +		n_pkts = vq->async_ops.transfer_data(dev->vid,
> +				queue_id, tdes, 0, pkt_burst_idx);
> +		vq->async_pkts_inflight_n += n_pkts;
> +
> +		if (unlikely(n_pkts < pkt_burst_idx))
> +			pkt_err = pkt_burst_idx - n_pkts;
> +	}
> +
> +	do_data_copy_enqueue(dev, vq);
> +
> +	if (unlikely(pkt_err)) {
> +		uint16_t descs_err = 0;
> +		uint16_t buffers_err = 0;
> +
> +		num_async_pkts -= pkt_err;
> +		pkt_idx -= pkt_err;
> +	/* calculate the sum of buffers and descs of DMA-error packets. */
> +		while (pkt_err-- > 0) {
> +			descs_err +=
> +				pkts_info[slot_idx & (vq->size - 1)].descs;

The size of the ring is not necessarily a power of two with packed ring.

> +			buffers_err +=
> +				pkts_info[slot_idx & (vq->size - 1)].nr_buffers;

Ditto.

> +			slot_idx--;
> +		}
> +
> +		vq->async_packed_buffer_idx -= buffers_err;
> +
> +		if (vq->last_avail_idx >= descs_err) {
> +			vq->last_avail_idx -= descs_err;
> +
> +			rte_memcpy(&vq->desc_packed[vq->last_avail_idx],
> +				&async_descs[async_descs_idx - descs_err],
> +				descs_err * sizeof(struct vring_packed_desc));
> +		} else {
> +			uint16_t nr_copy;
> +
> +			vq->last_avail_idx = vq->last_avail_idx + vq->size
> +						- descs_err;
> +			nr_copy = vq->size - vq->last_avail_idx;
> +			rte_memcpy(&vq->desc_packed[vq->last_avail_idx],
> +				&async_descs[async_descs_idx - descs_err],
> +				nr_copy * sizeof(struct vring_packed_desc));
> +			descs_err -= nr_copy;
> +			rte_memcpy(vq->desc_packed,
> +				&async_descs[async_descs_idx - descs_err],
> +				descs_err * sizeof(struct vring_packed_desc));
> +			vq->avail_wrap_counter ^= 1;
> +		}
> +
> +		num_done_pkts = pkt_idx - num_async_pkts;
> +	}

This error handling could be moved in a dedicated function.

> +	vq->async_pkts_idx += num_async_pkts;
> +	*comp_count = num_done_pkts;
> +
> +	if (likely(vq->shadow_used_idx)) {
> +		vhost_flush_enqueue_shadow_packed(dev, vq);
> +		vhost_vring_call_packed(dev, vq);
> +	}
> +
> +	return pkt_idx;
> +}

Above function is very big and complex, it should be possible to split
it in several ones to make it maintainable.

> +
>  static __rte_always_inline void
>  write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t n_descs)
>  {
> @@ -1693,12 +2048,40 @@ write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t n_descs)
>  	} while (nr_left > 0);
>  }
>  
> +static __rte_always_inline void
> +write_back_completed_descs_packed(struct vhost_virtqueue *vq,
> +				uint16_t n_buffers)
> +{
> +	uint16_t nr_left = n_buffers;
> +	uint16_t from, to;
> +
> +	do {
> +		from = vq->last_async_buffer_idx &
> +						(vq->size - 1);
> +		to = (from + nr_left) & (vq->size - 1);

The size of the ring is not necessarily a power of two with packed ring.

> +		if (to > from) {
> +			vhost_update_used_packed(vq,
> +				vq->async_buffers_packed + from,
> +				to - from);
> +			vq->last_async_buffer_idx += nr_left;
> +			nr_left = 0;
> +		} else {
> +			vhost_update_used_packed(vq,
> +				vq->async_buffers_packed + from,
> +				vq->size - from);
> +			vq->last_async_buffer_idx +=
> +						vq->size - from;
> +			nr_left -= vq->size - from;
> +		}
> +	} while (nr_left > 0);
> +}
> +
>  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
>  		struct rte_mbuf **pkts, uint16_t count)
>  {
>  	struct virtio_net *dev = get_device(vid);
>  	struct vhost_virtqueue *vq;
> -	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0;
> +	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0, n_buffers = 0;
>  	uint16_t start_idx, pkts_idx, vq_size;
>  	struct async_inflight_info *pkts_info;
>  	uint16_t from, i;
> @@ -1740,21 +2123,41 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
>  		goto done;
>  	}
>  
> -	for (i = 0; i < n_pkts_put; i++) {
> -		from = (start_idx + i) & (vq_size - 1);
> -		n_descs += pkts_info[from].descs;
> -		pkts[i] = pkts_info[from].mbuf;
> +	if (vq_is_packed(dev)) {
> +		for (i = 0; i < n_pkts_put; i++) {
> +			from = (start_idx + i) & (vq_size - 1);

Unlike split ring, packed ring size is not necessarily a power of 2.

> +			n_buffers += pkts_info[from].nr_buffers;
> +			pkts[i] = pkts_info[from].mbuf;
> +		}
> +	} else {
> +		for (i = 0; i < n_pkts_put; i++) {
> +			from = (start_idx + i) & (vq_size - 1);
> +			n_descs += pkts_info[from].descs;
> +			pkts[i] = pkts_info[from].mbuf;
> +		}
>  	}
> +
>  	vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
>  	vq->async_pkts_inflight_n -= n_pkts_put;
>  
>  	if (likely(vq->enabled && vq->access_ok)) {
> -		write_back_completed_descs_split(vq, n_descs);
> +		if (vq_is_packed(dev)) {
> +			write_back_completed_descs_packed(vq, n_buffers);
>  
> -		__atomic_add_fetch(&vq->used->idx, n_descs, __ATOMIC_RELEASE);
> -		vhost_vring_call_split(dev, vq);
> -	} else
> -		vq->last_async_desc_idx += n_descs;
> +			vhost_vring_call_packed(dev, vq);
> +		} else {
> +			write_back_completed_descs_split(vq, n_descs);
> +
> +			__atomic_add_fetch(&vq->used->idx, n_descs,
> +					__ATOMIC_RELEASE);
> +			vhost_vring_call_split(dev, vq);
> +		}
> +	} else {
> +		if (vq_is_packed(dev))
> +			vq->last_async_buffer_idx += n_buffers;
> +		else
> +			vq->last_async_desc_idx += n_descs;
> +	}
>  
>  done:
>  	rte_spinlock_unlock(&vq->access_lock);
> @@ -1795,9 +2198,10 @@ virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
>  	if (count == 0)
>  		goto out;
>  
> -	/* TODO: packed queue not implemented */
>  	if (vq_is_packed(dev))
> -		nb_tx = 0;
> +		nb_tx = virtio_dev_rx_async_submit_packed(dev,
> +				vq, queue_id, pkts, count, comp_pkts,
> +				comp_count);
>  	else
>  		nb_tx = virtio_dev_rx_async_submit_split(dev,
>  				vq, queue_id, pkts, count, comp_pkts,
>
Jiang, Cheng1 April 13, 2021, 11:48 a.m. UTC | #2
Hi Maxime,

> -----Original Message-----
> From: Maxime Coquelin <maxime.coquelin@redhat.com>
> Sent: Tuesday, April 13, 2021 4:37 PM
> To: Jiang, Cheng1 <cheng1.jiang@intel.com>; Xia, Chenbo
> <chenbo.xia@intel.com>
> Cc: dev@dpdk.org; Hu, Jiayu <jiayu.hu@intel.com>; Yang, YvonneX
> <yvonnex.yang@intel.com>; Wang, Yinan <yinan.wang@intel.com>; Liu,
> Yong <yong.liu@intel.com>
> Subject: Re: [PATCH v5 2/4] vhost: add support for packed ring in async vhost
> 
> 
> 
> On 4/12/21 1:34 PM, Cheng Jiang wrote:
> > For now async vhost data path only supports split ring structure. In
> > order to make async vhost compatible with virtio 1.1 spec this patch
> > enables packed ring in async vhost data path.
> >
> > Signed-off-by: Cheng Jiang <Cheng1.jiang@intel.com>
> > ---
> >  lib/librte_vhost/rte_vhost_async.h |   1 +
> >  lib/librte_vhost/vhost.c           |  27 +-
> >  lib/librte_vhost/vhost.h           |   7 +-
> >  lib/librte_vhost/virtio_net.c      | 438 +++++++++++++++++++++++++++--
> >  4 files changed, 448 insertions(+), 25 deletions(-)
> >
> > diff --git a/lib/librte_vhost/rte_vhost_async.h
> > b/lib/librte_vhost/rte_vhost_async.h
> > index c855ff875..6faa31f5a 100644
> > --- a/lib/librte_vhost/rte_vhost_async.h
> > +++ b/lib/librte_vhost/rte_vhost_async.h
> > @@ -89,6 +89,7 @@ struct rte_vhost_async_channel_ops {  struct
> > async_inflight_info {
> >  	struct rte_mbuf *mbuf;
> >  	uint16_t descs; /* num of descs inflight */
> > +	uint16_t nr_buffers; /* num of buffers inflight for packed ring */
> >  };
> >
> >  /**
> > diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c index
> > a70fe01d8..8c9935c0f 100644
> > --- a/lib/librte_vhost/vhost.c
> > +++ b/lib/librte_vhost/vhost.c
> > @@ -342,15 +342,21 @@ vhost_free_async_mem(struct vhost_virtqueue
> *vq)
> > {
> >  	if (vq->async_pkts_info)
> >  		rte_free(vq->async_pkts_info);
> > -	if (vq->async_descs_split)
> > +	if (vq->async_buffers_packed) {
> > +		rte_free(vq->async_buffers_packed);
> > +		vq->async_buffers_packed = NULL;
> > +	}
> > +	if (vq->async_descs_split) {
> 
> You can remove the check, rte_free is safe with NULL pointers.
> You can do the same for the other ones in this function.

OK, it will be fixed.

> 
> >  		rte_free(vq->async_descs_split);
> > +		vq->async_descs_split = NULL;
> > +	}
> > +
> >  	if (vq->it_pool)
> >  		rte_free(vq->it_pool);
> >  	if (vq->vec_pool)
> >  		rte_free(vq->vec_pool);
> >
> >  	vq->async_pkts_info = NULL;
> > -	vq->async_descs_split = NULL;
> >  	vq->it_pool = NULL;
> >  	vq->vec_pool = NULL;
> >  }
> > @@ -1627,9 +1633,9 @@ int rte_vhost_async_channel_register(int vid,
> uint16_t queue_id,
> >  		return -1;
> >
> >  	/* packed queue is not supported */
> > -	if (unlikely(vq_is_packed(dev) || !f.async_inorder)) {
> > +	if (unlikely(!f.async_inorder)) {
> >  		VHOST_LOG_CONFIG(ERR,
> > -			"async copy is not supported on packed queue or
> non-inorder mode "
> > +			"async copy is not supported on non-inorder mode "
> >  			"(vid %d, qid: %d)\n", vid, queue_id);
> >  		return -1;
> >  	}
> > @@ -1667,11 +1673,18 @@ int rte_vhost_async_channel_register(int vid,
> uint16_t queue_id,
> >  	vq->vec_pool = rte_malloc_socket(NULL,
> >  			VHOST_MAX_ASYNC_VEC * sizeof(struct iovec),
> >  			RTE_CACHE_LINE_SIZE, node);
> > -	vq->async_descs_split = rte_malloc_socket(NULL,
> > +	if (vq_is_packed(dev)) {
> > +		vq->async_buffers_packed = rte_malloc_socket(NULL,
> > +			vq->size * sizeof(struct vring_used_elem_packed),
> > +			RTE_CACHE_LINE_SIZE, node);
> > +	} else {
> > +		vq->async_descs_split = rte_malloc_socket(NULL,
> >  			vq->size * sizeof(struct vring_used_elem),
> >  			RTE_CACHE_LINE_SIZE, node);
> > -	if (!vq->async_descs_split || !vq->async_pkts_info ||
> > -		!vq->it_pool || !vq->vec_pool) {
> > +	}
> > +
> > +	if (!vq->async_buffers_packed || !vq->async_descs_split ||
> > +		!vq->async_pkts_info || !vq->it_pool || !vq->vec_pool) {
> >  		vhost_free_async_mem(vq);
> >  		VHOST_LOG_CONFIG(ERR,
> >  				"async register failed: cannot allocate
> memory for vq data "
> > diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h index
> > f628714c2..fe131ae8f 100644
> > --- a/lib/librte_vhost/vhost.h
> > +++ b/lib/librte_vhost/vhost.h
> > @@ -201,9 +201,14 @@ struct vhost_virtqueue {
> >  	uint16_t	async_pkts_idx;
> >  	uint16_t	async_pkts_inflight_n;
> >  	uint16_t	async_last_pkts_n;
> > -	struct vring_used_elem  *async_descs_split;
> > +	union {
> > +		struct vring_used_elem  *async_descs_split;
> > +		struct vring_used_elem_packed *async_buffers_packed;
> > +	};
> >  	uint16_t async_desc_idx;
> > +	uint16_t async_packed_buffer_idx;
> 
> Don't dupplicate variable names, async_desc_idx can be reused for packed
> ring. Also, they are representing the same thing, why use desc in one case
> and buffer in the other?

The main reason is that the unit of the packed used ring is buffer, which can contain many desc.
I think using desc_idx will cause ambiguity, but if you think that I should reuse the desc_idx, I have no problem with that.

> 
> >  	uint16_t last_async_desc_idx;
> > +	uint16_t last_async_buffer_idx;
> 
> Same remark here.
> 
> >  	/* vq async features */
> >  	bool		async_inorder;
> > diff --git a/lib/librte_vhost/virtio_net.c
> > b/lib/librte_vhost/virtio_net.c index c43ab0093..410be9678 100644
> > --- a/lib/librte_vhost/virtio_net.c
> > +++ b/lib/librte_vhost/virtio_net.c
> > @@ -363,14 +363,14 @@
> > vhost_shadow_dequeue_single_packed_inorder(struct vhost_virtqueue
> *vq,
> > }
> >
> >  static __rte_always_inline void
> > -vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
> > -				   struct vhost_virtqueue *vq,
> > -				   uint32_t len[],
> > -				   uint16_t id[],
> > -				   uint16_t count[],
> > +vhost_shadow_enqueue_packed(struct vhost_virtqueue *vq,
> > +				   uint32_t *len,
> > +				   uint16_t *id,
> > +				   uint16_t *count,
> >  				   uint16_t num_buffers)
> >  {
> >  	uint16_t i;
> > +
> >  	for (i = 0; i < num_buffers; i++) {
> >  		/* enqueue shadow flush action aligned with batch num */
> >  		if (!vq->shadow_used_idx)
> > @@ -382,6 +382,17 @@ vhost_shadow_enqueue_single_packed(struct
> virtio_net *dev,
> >  		vq->shadow_aligned_idx += count[i];
> >  		vq->shadow_used_idx++;
> >  	}
> > +}
> > +
> > +static __rte_always_inline void
> > +vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
> > +				   struct vhost_virtqueue *vq,
> > +				   uint32_t *len,
> > +				   uint16_t *id,
> > +				   uint16_t *count,
> > +				   uint16_t num_buffers)
> > +{
> > +	vhost_shadow_enqueue_packed(vq, len, id, count, num_buffers);
> >
> >  	if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
> >  		do_data_copy_enqueue(dev, vq);
> > @@ -1481,6 +1492,62 @@ shadow_ring_store(struct vhost_virtqueue *vq,
> void *shadow_ring, void *d_ring,
> >  	}
> >  }
> >
> > +static __rte_always_inline void
> > +vhost_update_used_packed(struct vhost_virtqueue *vq,
> > +			struct vring_used_elem_packed *shadow_ring,
> > +			uint16_t count)
> > +{
> > +	if (count == 0)
> > +		return;
> 
> Move this after the variables declaration.

Sure.

> 
> > +
> > +	int i;
> > +	uint16_t used_idx = vq->last_used_idx;
> > +	uint16_t head_idx = vq->last_used_idx;
> > +	uint16_t head_flags = 0;
> > +
> > +	/* Split loop in two to save memory barriers */
> > +	for (i = 0; i < count; i++) {
> > +		vq->desc_packed[used_idx].id = shadow_ring[i].id;
> > +		vq->desc_packed[used_idx].len = shadow_ring[i].len;
> > +
> > +		used_idx += shadow_ring[i].count;
> > +		if (used_idx >= vq->size)
> > +			used_idx -= vq->size;
> > +	}
> > +
> > +	/* The ordering for storing desc flags needs to be enforced. */
> > +	rte_atomic_thread_fence(__ATOMIC_RELEASE);
> > +
> > +	for (i = 0; i < count; i++) {
> > +		uint16_t flags;
> > +
> > +		if (vq->shadow_used_packed[i].len)
> > +			flags = VRING_DESC_F_WRITE;
> > +		else
> > +			flags = 0;
> > +
> > +		if (vq->used_wrap_counter) {
> > +			flags |= VRING_DESC_F_USED;
> > +			flags |= VRING_DESC_F_AVAIL;
> > +		} else {
> > +			flags &= ~VRING_DESC_F_USED;
> > +			flags &= ~VRING_DESC_F_AVAIL;
> > +		}
> > +
> > +		if (i > 0) {
> > +			vq->desc_packed[vq->last_used_idx].flags = flags;
> > +
> > +		} else {
> > +			head_idx = vq->last_used_idx;
> > +			head_flags = flags;
> > +		}
> > +
> > +		vq_inc_last_used_packed(vq, shadow_ring[i].count);
> > +	}
> > +
> > +	vq->desc_packed[head_idx].flags = head_flags; }
> > +
> >  static __rte_noinline uint32_t
> >  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> >  	struct vhost_virtqueue *vq, uint16_t queue_id, @@ -1656,6
> +1723,294
> > @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> >  	return pkt_idx;
> >  }
> >
> > +static __rte_always_inline int
> > +vhost_enqueue_async_single_packed(struct virtio_net *dev,
> > +			    struct vhost_virtqueue *vq,
> > +			    struct rte_mbuf *pkt,
> > +			    struct buf_vector *buf_vec,
> > +			    uint16_t *nr_descs,
> > +			    uint16_t *nr_buffers,
> > +			    struct vring_packed_desc *async_descs,
> > +			    struct iovec *src_iovec, struct iovec *dst_iovec,
> > +			    struct rte_vhost_iov_iter *src_it,
> > +			    struct rte_vhost_iov_iter *dst_it) {
> > +	uint16_t nr_vec = 0;
> > +	uint16_t avail_idx = vq->last_avail_idx;
> > +	uint16_t max_tries, tries = 0;
> > +	uint16_t buf_id = 0;
> > +	uint32_t len = 0;
> > +	uint16_t desc_count = 0;
> > +	uint32_t size = pkt->pkt_len + sizeof(struct
> virtio_net_hdr_mrg_rxbuf);
> > +	uint32_t buffer_len[vq->size];
> > +	uint16_t buffer_buf_id[vq->size];
> > +	uint16_t buffer_desc_count[vq->size];
> > +	*nr_buffers = 0;
> > +
> > +	if (rxvq_is_mergeable(dev))
> > +		max_tries = vq->size - 1;
> > +	else
> > +		max_tries = 1;
> > +
> > +	while (size > 0) {
> > +		/*
> > +		 * if we tried all available ring items, and still
> > +		 * can't get enough buf, it means something abnormal
> > +		 * happened.
> > +		 */
> > +		if (unlikely(++tries > max_tries))
> > +			return -1;
> > +
> > +		if (unlikely(fill_vec_buf_packed(dev, vq,
> > +						avail_idx, &desc_count,
> > +						buf_vec, &nr_vec,
> > +						&buf_id, &len,
> > +						VHOST_ACCESS_RW) < 0))
> > +			return -1;
> > +
> > +		len = RTE_MIN(len, size);
> > +		size -= len;
> > +
> > +		buffer_len[*nr_buffers] = len;
> > +		buffer_buf_id[*nr_buffers] = buf_id;
> > +		buffer_desc_count[*nr_buffers] = desc_count;
> > +		*nr_buffers += 1;
> > +
> > +		*nr_descs += desc_count;
> > +		avail_idx += desc_count;
> > +		if (avail_idx >= vq->size)
> > +			avail_idx -= vq->size;
> > +	}
> > +
> > +	if (async_mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, *nr_buffers,
> > +		src_iovec, dst_iovec, src_it, dst_it) < 0)
> > +		return -1;
> > +	/* store descriptors for DMA */
> > +	if (avail_idx >= *nr_descs)
> > +		rte_memcpy(async_descs,
> > +			&vq->desc_packed[vq->last_avail_idx],
> > +			*nr_descs * sizeof(struct vring_packed_desc));
> 
> Please add brackets for the 'if' since there are for the 'else'.

Sure, sorry for that.

> 
> > +	else {
> > +		uint16_t nr_copy = vq->size - vq->last_avail_idx;
> > +		rte_memcpy(async_descs,
> > +			&vq->desc_packed[vq->last_avail_idx],
> > +			nr_copy * sizeof(struct vring_packed_desc));
> > +		rte_memcpy(async_descs + nr_copy,
> > +			vq->desc_packed, (*nr_descs - nr_copy) *
> > +			sizeof(struct vring_packed_desc));
> > +	}
> > +
> > +	vhost_shadow_enqueue_packed(vq, buffer_len, buffer_buf_id,
> > +					   buffer_desc_count, *nr_buffers);
> > +
> > +	return 0;
> > +}
> > +
> > +static __rte_always_inline int16_t
> > +virtio_dev_rx_async_single_packed(struct virtio_net *dev,
> > +			    struct vhost_virtqueue *vq,
> > +			    struct rte_mbuf *pkt,
> > +			    uint16_t *nr_descs, uint16_t *nr_buffers,
> > +			    struct vring_packed_desc *async_descs,
> > +			    struct iovec *src_iovec, struct iovec *dst_iovec,
> > +			    struct rte_vhost_iov_iter *src_it,
> > +			    struct rte_vhost_iov_iter *dst_it) {
> > +	struct buf_vector buf_vec[BUF_VECTOR_MAX];
> > +	*nr_descs = 0;
> > +	*nr_buffers = 0;
> > +
> > +	if (unlikely(vhost_enqueue_async_single_packed(dev, vq, pkt,
> buf_vec,
> > +						 nr_descs,
> > +						 nr_buffers,
> > +						 async_descs,
> > +						 src_iovec, dst_iovec,
> > +						 src_it, dst_it) < 0)) {
> > +		VHOST_LOG_DATA(DEBUG,
> > +				"(%d) failed to get enough desc from vring\n",
> > +				dev->vid);
> > +		return -1;
> > +	}
> > +
> > +	VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end
> index %d\n",
> > +			dev->vid, vq->last_avail_idx,
> > +			vq->last_avail_idx + *nr_descs);
> > +
> > +	return 0;
> > +}
> > +
> > +static __rte_noinline uint32_t
> > +virtio_dev_rx_async_submit_packed(struct virtio_net *dev,
> > +	struct vhost_virtqueue *vq, uint16_t queue_id,
> > +	struct rte_mbuf **pkts, uint32_t count,
> > +	struct rte_mbuf **comp_pkts, uint32_t *comp_count) {
> > +	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
> > +	uint16_t async_descs_idx = 0;
> > +	uint16_t num_buffers;
> > +	uint16_t num_desc;
> > +
> > +	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
> > +	struct iovec *vec_pool = vq->vec_pool;
> > +	struct rte_vhost_async_desc tdes[MAX_PKT_BURST];
> > +	struct iovec *src_iovec = vec_pool;
> > +	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >>
> 1);
> > +	struct rte_vhost_iov_iter *src_it = it_pool;
> > +	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
> > +	uint16_t slot_idx = 0;
> > +	uint16_t segs_await = 0;
> > +	uint16_t iovec_idx = 0, it_idx = 0;
> > +	struct async_inflight_info *pkts_info = vq->async_pkts_info;
> > +	uint32_t n_pkts = 0, pkt_err = 0;
> > +	uint32_t num_async_pkts = 0, num_done_pkts = 0;
> > +	struct vring_packed_desc async_descs[vq->size];
> > +
> > +	rte_prefetch0(&vq->desc_packed[vq->last_avail_idx & (vq->size -
> > +1)]);
> 
> The size of the ring is not necessarily a power of two with packed ring.

For the size of the ring is not necessarily a power of two,
so maybe I can use codes like 
Indx % vq->size  ?
I'm not sure if it's a good way to do that.

> 
> > +
> > +	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
> > +		if (unlikely(virtio_dev_rx_async_single_packed(dev, vq,
> > +						pkts[pkt_idx],
> > +						&num_desc, &num_buffers,
> > +
> 	&async_descs[async_descs_idx],
> > +						&src_iovec[iovec_idx],
> > +						&dst_iovec[iovec_idx],
> > +						&src_it[it_idx],
> > +						&dst_it[it_idx]) < 0))
> > +			break;
> > +
> > +		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end
> index %d\n",
> > +			dev->vid, vq->last_avail_idx,
> > +			vq->last_avail_idx + num_desc);
> > +
> > +		slot_idx = (vq->async_pkts_idx + num_async_pkts) &
> > +			(vq->size - 1);
> 
> Same here.

Sure.

> 
> > +		if (src_it[it_idx].count) {
> > +			uint16_t from, to;
> > +
> > +			async_descs_idx += num_desc;
> > +			async_fill_desc(&tdes[pkt_burst_idx++],
> &src_it[it_idx],
> > +					&dst_it[it_idx]);
> > +			pkts_info[slot_idx].descs = num_desc;
> > +			pkts_info[slot_idx].nr_buffers = num_buffers;
> > +			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
> > +			num_async_pkts++;
> > +			iovec_idx += src_it[it_idx].nr_segs;
> > +			it_idx += 2;
> > +
> > +			segs_await += src_it[it_idx].nr_segs;
> > +
> > +			/**
> > +			 * recover shadow used ring and keep DMA-occupied
> > +			 * descriptors.
> > +			 */
> > +			from = vq->shadow_used_idx - num_buffers;
> > +			to = vq->async_packed_buffer_idx & (vq->size - 1);
> > +			shadow_ring_store(vq, vq->shadow_used_packed,
> > +					vq->async_buffers_packed,
> > +					from, to, num_buffers,
> > +					sizeof(struct
> vring_used_elem_packed));
> > +
> > +			vq->async_packed_buffer_idx += num_buffers;
> > +			vq->shadow_used_idx -= num_buffers;
> > +		} else
> 
> Brackets needed.

Sure.

> 
> > +			comp_pkts[num_done_pkts++] = pkts[pkt_idx];
> > +
> > +		vq_inc_last_avail_packed(vq, num_desc);
> > +
> > +		/*
> > +		 * conditions to trigger async device transfer:
> > +		 * - buffered packet number reaches transfer threshold
> > +		 * - unused async iov number is less than max vhost vector
> > +		 */
> > +		if (unlikely(pkt_burst_idx >=
> VHOST_ASYNC_BATCH_THRESHOLD ||
> > +			((VHOST_MAX_ASYNC_VEC >> 1) - segs_await <
> > +			BUF_VECTOR_MAX))) {
> > +			n_pkts = vq->async_ops.transfer_data(dev->vid,
> > +					queue_id, tdes, 0, pkt_burst_idx);
> > +			iovec_idx = 0;
> > +			it_idx = 0;
> > +			segs_await = 0;
> > +			vq->async_pkts_inflight_n += n_pkts;
> > +
> > +			if (unlikely(n_pkts < pkt_burst_idx)) {
> > +				/*
> > +				 * log error packets number here and do
> actual
> > +				 * error processing when applications poll
> > +				 * completion
> > +				 */
> > +				pkt_err = pkt_burst_idx - n_pkts;
> > +				pkt_burst_idx = 0;
> > +				pkt_idx++;
> > +				break;
> > +			}
> > +
> > +			pkt_burst_idx = 0;
> > +		}
> > +	}
> > +
> > +	if (pkt_burst_idx) {
> > +		n_pkts = vq->async_ops.transfer_data(dev->vid,
> > +				queue_id, tdes, 0, pkt_burst_idx);
> > +		vq->async_pkts_inflight_n += n_pkts;
> > +
> > +		if (unlikely(n_pkts < pkt_burst_idx))
> > +			pkt_err = pkt_burst_idx - n_pkts;
> > +	}
> > +
> > +	do_data_copy_enqueue(dev, vq);
> > +
> > +	if (unlikely(pkt_err)) {
> > +		uint16_t descs_err = 0;
> > +		uint16_t buffers_err = 0;
> > +
> > +		num_async_pkts -= pkt_err;
> > +		pkt_idx -= pkt_err;
> > +	/* calculate the sum of buffers and descs of DMA-error packets. */
> > +		while (pkt_err-- > 0) {
> > +			descs_err +=
> > +				pkts_info[slot_idx & (vq->size - 1)].descs;
> 
> The size of the ring is not necessarily a power of two with packed ring.

Will be fixed.

> 
> > +			buffers_err +=
> > +				pkts_info[slot_idx & (vq->size -
> 1)].nr_buffers;
> 
> Ditto.

Will be fixed.

> 
> > +			slot_idx--;
> > +		}
> > +
> > +		vq->async_packed_buffer_idx -= buffers_err;
> > +
> > +		if (vq->last_avail_idx >= descs_err) {
> > +			vq->last_avail_idx -= descs_err;
> > +
> > +			rte_memcpy(&vq->desc_packed[vq->last_avail_idx],
> > +				&async_descs[async_descs_idx - descs_err],
> > +				descs_err * sizeof(struct
> vring_packed_desc));
> > +		} else {
> > +			uint16_t nr_copy;
> > +
> > +			vq->last_avail_idx = vq->last_avail_idx + vq->size
> > +						- descs_err;
> > +			nr_copy = vq->size - vq->last_avail_idx;
> > +			rte_memcpy(&vq->desc_packed[vq->last_avail_idx],
> > +				&async_descs[async_descs_idx - descs_err],
> > +				nr_copy * sizeof(struct vring_packed_desc));
> > +			descs_err -= nr_copy;
> > +			rte_memcpy(vq->desc_packed,
> > +				&async_descs[async_descs_idx - descs_err],
> > +				descs_err * sizeof(struct
> vring_packed_desc));
> > +			vq->avail_wrap_counter ^= 1;
> > +		}
> > +
> > +		num_done_pkts = pkt_idx - num_async_pkts;
> > +	}
> 
> This error handling could be moved in a dedicated function.

Sure, will fix it in the next version.

> 
> > +	vq->async_pkts_idx += num_async_pkts;
> > +	*comp_count = num_done_pkts;
> > +
> > +	if (likely(vq->shadow_used_idx)) {
> > +		vhost_flush_enqueue_shadow_packed(dev, vq);
> > +		vhost_vring_call_packed(dev, vq);
> > +	}
> > +
> > +	return pkt_idx;
> > +}
> 
> Above function is very big and complex, it should be possible to split it in
> several ones to make it maintainable.

I think move the error handling code will make it smaller.

Thanks.
Cheng

> 
> > +
> >  static __rte_always_inline void
> >  write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t
> > n_descs)  { @@ -1693,12 +2048,40 @@
> > write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t
> n_descs)
> >  	} while (nr_left > 0);
> >  }
> >
> > +static __rte_always_inline void
> > +write_back_completed_descs_packed(struct vhost_virtqueue *vq,
> > +				uint16_t n_buffers)
> > +{
> > +	uint16_t nr_left = n_buffers;
> > +	uint16_t from, to;
> > +
> > +	do {
> > +		from = vq->last_async_buffer_idx &
> > +						(vq->size - 1);
> > +		to = (from + nr_left) & (vq->size - 1);
> 
> The size of the ring is not necessarily a power of two with packed ring.

Sure.

> 
> > +		if (to > from) {
> > +			vhost_update_used_packed(vq,
> > +				vq->async_buffers_packed + from,
> > +				to - from);
> > +			vq->last_async_buffer_idx += nr_left;
> > +			nr_left = 0;
> > +		} else {
> > +			vhost_update_used_packed(vq,
> > +				vq->async_buffers_packed + from,
> > +				vq->size - from);
> > +			vq->last_async_buffer_idx +=
> > +						vq->size - from;
> > +			nr_left -= vq->size - from;
> > +		}
> > +	} while (nr_left > 0);
> > +}
> > +
> >  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> >  		struct rte_mbuf **pkts, uint16_t count)  {
> >  	struct virtio_net *dev = get_device(vid);
> >  	struct vhost_virtqueue *vq;
> > -	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0;
> > +	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0, n_buffers = 0;
> >  	uint16_t start_idx, pkts_idx, vq_size;
> >  	struct async_inflight_info *pkts_info;
> >  	uint16_t from, i;
> > @@ -1740,21 +2123,41 @@ uint16_t
> rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> >  		goto done;
> >  	}
> >
> > -	for (i = 0; i < n_pkts_put; i++) {
> > -		from = (start_idx + i) & (vq_size - 1);
> > -		n_descs += pkts_info[from].descs;
> > -		pkts[i] = pkts_info[from].mbuf;
> > +	if (vq_is_packed(dev)) {
> > +		for (i = 0; i < n_pkts_put; i++) {
> > +			from = (start_idx + i) & (vq_size - 1);
> 
> Unlike split ring, packed ring size is not necessarily a power of 2.

Sure.
Thanks.

> 
> > +			n_buffers += pkts_info[from].nr_buffers;
> > +			pkts[i] = pkts_info[from].mbuf;
> > +		}
> > +	} else {
> > +		for (i = 0; i < n_pkts_put; i++) {
> > +			from = (start_idx + i) & (vq_size - 1);
> > +			n_descs += pkts_info[from].descs;
> > +			pkts[i] = pkts_info[from].mbuf;
> > +		}
> >  	}
> > +
> >  	vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
> >  	vq->async_pkts_inflight_n -= n_pkts_put;
> >
> >  	if (likely(vq->enabled && vq->access_ok)) {
> > -		write_back_completed_descs_split(vq, n_descs);
> > +		if (vq_is_packed(dev)) {
> > +			write_back_completed_descs_packed(vq,
> n_buffers);
> >
> > -		__atomic_add_fetch(&vq->used->idx, n_descs,
> __ATOMIC_RELEASE);
> > -		vhost_vring_call_split(dev, vq);
> > -	} else
> > -		vq->last_async_desc_idx += n_descs;
> > +			vhost_vring_call_packed(dev, vq);
> > +		} else {
> > +			write_back_completed_descs_split(vq, n_descs);
> > +
> > +			__atomic_add_fetch(&vq->used->idx, n_descs,
> > +					__ATOMIC_RELEASE);
> > +			vhost_vring_call_split(dev, vq);
> > +		}
> > +	} else {
> > +		if (vq_is_packed(dev))
> > +			vq->last_async_buffer_idx += n_buffers;
> > +		else
> > +			vq->last_async_desc_idx += n_descs;
> > +	}
> >
> >  done:
> >  	rte_spinlock_unlock(&vq->access_lock);
> > @@ -1795,9 +2198,10 @@ virtio_dev_rx_async_submit(struct virtio_net
> *dev, uint16_t queue_id,
> >  	if (count == 0)
> >  		goto out;
> >
> > -	/* TODO: packed queue not implemented */
> >  	if (vq_is_packed(dev))
> > -		nb_tx = 0;
> > +		nb_tx = virtio_dev_rx_async_submit_packed(dev,
> > +				vq, queue_id, pkts, count, comp_pkts,
> > +				comp_count);
> >  	else
> >  		nb_tx = virtio_dev_rx_async_submit_split(dev,
> >  				vq, queue_id, pkts, count, comp_pkts,
> >
Maxime Coquelin April 13, 2021, 1:08 p.m. UTC | #3
On 4/13/21 1:48 PM, Jiang, Cheng1 wrote:
> Hi Maxime,
> 
>> -----Original Message-----
>> From: Maxime Coquelin <maxime.coquelin@redhat.com>
>> Sent: Tuesday, April 13, 2021 4:37 PM
>> To: Jiang, Cheng1 <cheng1.jiang@intel.com>; Xia, Chenbo
>> <chenbo.xia@intel.com>
>> Cc: dev@dpdk.org; Hu, Jiayu <jiayu.hu@intel.com>; Yang, YvonneX
>> <yvonnex.yang@intel.com>; Wang, Yinan <yinan.wang@intel.com>; Liu,
>> Yong <yong.liu@intel.com>
>> Subject: Re: [PATCH v5 2/4] vhost: add support for packed ring in async vhost
>>
>>
>>
>> On 4/12/21 1:34 PM, Cheng Jiang wrote:
>>> For now async vhost data path only supports split ring structure. In
>>> order to make async vhost compatible with virtio 1.1 spec this patch
>>> enables packed ring in async vhost data path.
>>>
>>> Signed-off-by: Cheng Jiang <Cheng1.jiang@intel.com>
>>> ---
>>>  lib/librte_vhost/rte_vhost_async.h |   1 +
>>>  lib/librte_vhost/vhost.c           |  27 +-
>>>  lib/librte_vhost/vhost.h           |   7 +-
>>>  lib/librte_vhost/virtio_net.c      | 438 +++++++++++++++++++++++++++--
>>>  4 files changed, 448 insertions(+), 25 deletions(-)
>>>
>>> diff --git a/lib/librte_vhost/rte_vhost_async.h
>>> b/lib/librte_vhost/rte_vhost_async.h
>>> index c855ff875..6faa31f5a 100644
>>> --- a/lib/librte_vhost/rte_vhost_async.h
>>> +++ b/lib/librte_vhost/rte_vhost_async.h
>>> @@ -89,6 +89,7 @@ struct rte_vhost_async_channel_ops {  struct
>>> async_inflight_info {
>>>  	struct rte_mbuf *mbuf;
>>>  	uint16_t descs; /* num of descs inflight */
>>> +	uint16_t nr_buffers; /* num of buffers inflight for packed ring */
>>>  };
>>>
>>>  /**
>>> diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c index
>>> a70fe01d8..8c9935c0f 100644
>>> --- a/lib/librte_vhost/vhost.c
>>> +++ b/lib/librte_vhost/vhost.c
>>> @@ -342,15 +342,21 @@ vhost_free_async_mem(struct vhost_virtqueue
>> *vq)
>>> {
>>>  	if (vq->async_pkts_info)
>>>  		rte_free(vq->async_pkts_info);
>>> -	if (vq->async_descs_split)
>>> +	if (vq->async_buffers_packed) {
>>> +		rte_free(vq->async_buffers_packed);
>>> +		vq->async_buffers_packed = NULL;
>>> +	}
>>> +	if (vq->async_descs_split) {
>>
>> You can remove the check, rte_free is safe with NULL pointers.
>> You can do the same for the other ones in this function.
> 
> OK, it will be fixed.
> 
>>
>>>  		rte_free(vq->async_descs_split);
>>> +		vq->async_descs_split = NULL;
>>> +	}
>>> +
>>>  	if (vq->it_pool)
>>>  		rte_free(vq->it_pool);
>>>  	if (vq->vec_pool)
>>>  		rte_free(vq->vec_pool);
>>>
>>>  	vq->async_pkts_info = NULL;
>>> -	vq->async_descs_split = NULL;
>>>  	vq->it_pool = NULL;
>>>  	vq->vec_pool = NULL;
>>>  }
>>> @@ -1627,9 +1633,9 @@ int rte_vhost_async_channel_register(int vid,
>> uint16_t queue_id,
>>>  		return -1;
>>>
>>>  	/* packed queue is not supported */
>>> -	if (unlikely(vq_is_packed(dev) || !f.async_inorder)) {
>>> +	if (unlikely(!f.async_inorder)) {
>>>  		VHOST_LOG_CONFIG(ERR,
>>> -			"async copy is not supported on packed queue or
>> non-inorder mode "
>>> +			"async copy is not supported on non-inorder mode "
>>>  			"(vid %d, qid: %d)\n", vid, queue_id);
>>>  		return -1;
>>>  	}
>>> @@ -1667,11 +1673,18 @@ int rte_vhost_async_channel_register(int vid,
>> uint16_t queue_id,
>>>  	vq->vec_pool = rte_malloc_socket(NULL,
>>>  			VHOST_MAX_ASYNC_VEC * sizeof(struct iovec),
>>>  			RTE_CACHE_LINE_SIZE, node);
>>> -	vq->async_descs_split = rte_malloc_socket(NULL,
>>> +	if (vq_is_packed(dev)) {
>>> +		vq->async_buffers_packed = rte_malloc_socket(NULL,
>>> +			vq->size * sizeof(struct vring_used_elem_packed),
>>> +			RTE_CACHE_LINE_SIZE, node);
>>> +	} else {
>>> +		vq->async_descs_split = rte_malloc_socket(NULL,
>>>  			vq->size * sizeof(struct vring_used_elem),
>>>  			RTE_CACHE_LINE_SIZE, node);
>>> -	if (!vq->async_descs_split || !vq->async_pkts_info ||
>>> -		!vq->it_pool || !vq->vec_pool) {
>>> +	}
>>> +
>>> +	if (!vq->async_buffers_packed || !vq->async_descs_split ||
>>> +		!vq->async_pkts_info || !vq->it_pool || !vq->vec_pool) {
>>>  		vhost_free_async_mem(vq);
>>>  		VHOST_LOG_CONFIG(ERR,
>>>  				"async register failed: cannot allocate
>> memory for vq data "
>>> diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h index
>>> f628714c2..fe131ae8f 100644
>>> --- a/lib/librte_vhost/vhost.h
>>> +++ b/lib/librte_vhost/vhost.h
>>> @@ -201,9 +201,14 @@ struct vhost_virtqueue {
>>>  	uint16_t	async_pkts_idx;
>>>  	uint16_t	async_pkts_inflight_n;
>>>  	uint16_t	async_last_pkts_n;
>>> -	struct vring_used_elem  *async_descs_split;
>>> +	union {
>>> +		struct vring_used_elem  *async_descs_split;
>>> +		struct vring_used_elem_packed *async_buffers_packed;
>>> +	};
>>>  	uint16_t async_desc_idx;
>>> +	uint16_t async_packed_buffer_idx;
>>
>> Don't dupplicate variable names, async_desc_idx can be reused for packed
>> ring. Also, they are representing the same thing, why use desc in one case
>> and buffer in the other?
> 
> The main reason is that the unit of the packed used ring is buffer, which can contain many desc.
> I think using desc_idx will cause ambiguity, but if you think that I should reuse the desc_idx, I have no problem with that.

OK, in this case please use a union not to waste memory.

>>
>>>  	uint16_t last_async_desc_idx;
>>> +	uint16_t last_async_buffer_idx;
>>
>> Same remark here.
>>
>>>  	/* vq async features */
>>>  	bool		async_inorder;
>>> diff --git a/lib/librte_vhost/virtio_net.c
>>> b/lib/librte_vhost/virtio_net.c index c43ab0093..410be9678 100644
>>> --- a/lib/librte_vhost/virtio_net.c
>>> +++ b/lib/librte_vhost/virtio_net.c
>>> @@ -363,14 +363,14 @@
>>> vhost_shadow_dequeue_single_packed_inorder(struct vhost_virtqueue
>> *vq,
>>> }
>>>
>>>  static __rte_always_inline void
>>> -vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
>>> -				   struct vhost_virtqueue *vq,
>>> -				   uint32_t len[],
>>> -				   uint16_t id[],
>>> -				   uint16_t count[],
>>> +vhost_shadow_enqueue_packed(struct vhost_virtqueue *vq,
>>> +				   uint32_t *len,
>>> +				   uint16_t *id,
>>> +				   uint16_t *count,
>>>  				   uint16_t num_buffers)
>>>  {
>>>  	uint16_t i;
>>> +
>>>  	for (i = 0; i < num_buffers; i++) {
>>>  		/* enqueue shadow flush action aligned with batch num */
>>>  		if (!vq->shadow_used_idx)
>>> @@ -382,6 +382,17 @@ vhost_shadow_enqueue_single_packed(struct
>> virtio_net *dev,
>>>  		vq->shadow_aligned_idx += count[i];
>>>  		vq->shadow_used_idx++;
>>>  	}
>>> +}
>>> +
>>> +static __rte_always_inline void
>>> +vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
>>> +				   struct vhost_virtqueue *vq,
>>> +				   uint32_t *len,
>>> +				   uint16_t *id,
>>> +				   uint16_t *count,
>>> +				   uint16_t num_buffers)
>>> +{
>>> +	vhost_shadow_enqueue_packed(vq, len, id, count, num_buffers);
>>>
>>>  	if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
>>>  		do_data_copy_enqueue(dev, vq);
>>> @@ -1481,6 +1492,62 @@ shadow_ring_store(struct vhost_virtqueue *vq,
>> void *shadow_ring, void *d_ring,
>>>  	}
>>>  }
>>>
>>> +static __rte_always_inline void
>>> +vhost_update_used_packed(struct vhost_virtqueue *vq,
>>> +			struct vring_used_elem_packed *shadow_ring,
>>> +			uint16_t count)
>>> +{
>>> +	if (count == 0)
>>> +		return;
>>
>> Move this after the variables declaration.
> 
> Sure.
> 
>>
>>> +
>>> +	int i;
>>> +	uint16_t used_idx = vq->last_used_idx;
>>> +	uint16_t head_idx = vq->last_used_idx;
>>> +	uint16_t head_flags = 0;
>>> +
>>> +	/* Split loop in two to save memory barriers */
>>> +	for (i = 0; i < count; i++) {
>>> +		vq->desc_packed[used_idx].id = shadow_ring[i].id;
>>> +		vq->desc_packed[used_idx].len = shadow_ring[i].len;
>>> +
>>> +		used_idx += shadow_ring[i].count;
>>> +		if (used_idx >= vq->size)
>>> +			used_idx -= vq->size;
>>> +	}
>>> +
>>> +	/* The ordering for storing desc flags needs to be enforced. */
>>> +	rte_atomic_thread_fence(__ATOMIC_RELEASE);
>>> +
>>> +	for (i = 0; i < count; i++) {
>>> +		uint16_t flags;
>>> +
>>> +		if (vq->shadow_used_packed[i].len)
>>> +			flags = VRING_DESC_F_WRITE;
>>> +		else
>>> +			flags = 0;
>>> +
>>> +		if (vq->used_wrap_counter) {
>>> +			flags |= VRING_DESC_F_USED;
>>> +			flags |= VRING_DESC_F_AVAIL;
>>> +		} else {
>>> +			flags &= ~VRING_DESC_F_USED;
>>> +			flags &= ~VRING_DESC_F_AVAIL;
>>> +		}
>>> +
>>> +		if (i > 0) {
>>> +			vq->desc_packed[vq->last_used_idx].flags = flags;
>>> +
>>> +		} else {
>>> +			head_idx = vq->last_used_idx;
>>> +			head_flags = flags;
>>> +		}
>>> +
>>> +		vq_inc_last_used_packed(vq, shadow_ring[i].count);
>>> +	}
>>> +
>>> +	vq->desc_packed[head_idx].flags = head_flags; }
>>> +
>>>  static __rte_noinline uint32_t
>>>  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
>>>  	struct vhost_virtqueue *vq, uint16_t queue_id, @@ -1656,6
>> +1723,294
>>> @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
>>>  	return pkt_idx;
>>>  }
>>>
>>> +static __rte_always_inline int
>>> +vhost_enqueue_async_single_packed(struct virtio_net *dev,
>>> +			    struct vhost_virtqueue *vq,
>>> +			    struct rte_mbuf *pkt,
>>> +			    struct buf_vector *buf_vec,
>>> +			    uint16_t *nr_descs,
>>> +			    uint16_t *nr_buffers,
>>> +			    struct vring_packed_desc *async_descs,
>>> +			    struct iovec *src_iovec, struct iovec *dst_iovec,
>>> +			    struct rte_vhost_iov_iter *src_it,
>>> +			    struct rte_vhost_iov_iter *dst_it) {
>>> +	uint16_t nr_vec = 0;
>>> +	uint16_t avail_idx = vq->last_avail_idx;
>>> +	uint16_t max_tries, tries = 0;
>>> +	uint16_t buf_id = 0;
>>> +	uint32_t len = 0;
>>> +	uint16_t desc_count = 0;
>>> +	uint32_t size = pkt->pkt_len + sizeof(struct
>> virtio_net_hdr_mrg_rxbuf);
>>> +	uint32_t buffer_len[vq->size];
>>> +	uint16_t buffer_buf_id[vq->size];
>>> +	uint16_t buffer_desc_count[vq->size];
>>> +	*nr_buffers = 0;
>>> +
>>> +	if (rxvq_is_mergeable(dev))
>>> +		max_tries = vq->size - 1;
>>> +	else
>>> +		max_tries = 1;
>>> +
>>> +	while (size > 0) {
>>> +		/*
>>> +		 * if we tried all available ring items, and still
>>> +		 * can't get enough buf, it means something abnormal
>>> +		 * happened.
>>> +		 */
>>> +		if (unlikely(++tries > max_tries))
>>> +			return -1;
>>> +
>>> +		if (unlikely(fill_vec_buf_packed(dev, vq,
>>> +						avail_idx, &desc_count,
>>> +						buf_vec, &nr_vec,
>>> +						&buf_id, &len,
>>> +						VHOST_ACCESS_RW) < 0))
>>> +			return -1;
>>> +
>>> +		len = RTE_MIN(len, size);
>>> +		size -= len;
>>> +
>>> +		buffer_len[*nr_buffers] = len;
>>> +		buffer_buf_id[*nr_buffers] = buf_id;
>>> +		buffer_desc_count[*nr_buffers] = desc_count;
>>> +		*nr_buffers += 1;
>>> +
>>> +		*nr_descs += desc_count;
>>> +		avail_idx += desc_count;
>>> +		if (avail_idx >= vq->size)
>>> +			avail_idx -= vq->size;
>>> +	}
>>> +
>>> +	if (async_mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, *nr_buffers,
>>> +		src_iovec, dst_iovec, src_it, dst_it) < 0)
>>> +		return -1;
>>> +	/* store descriptors for DMA */
>>> +	if (avail_idx >= *nr_descs)
>>> +		rte_memcpy(async_descs,
>>> +			&vq->desc_packed[vq->last_avail_idx],
>>> +			*nr_descs * sizeof(struct vring_packed_desc));
>>
>> Please add brackets for the 'if' since there are for the 'else'.
> 
> Sure, sorry for that.
> 
>>
>>> +	else {
>>> +		uint16_t nr_copy = vq->size - vq->last_avail_idx;
>>> +		rte_memcpy(async_descs,
>>> +			&vq->desc_packed[vq->last_avail_idx],
>>> +			nr_copy * sizeof(struct vring_packed_desc));
>>> +		rte_memcpy(async_descs + nr_copy,
>>> +			vq->desc_packed, (*nr_descs - nr_copy) *
>>> +			sizeof(struct vring_packed_desc));
>>> +	}
>>> +
>>> +	vhost_shadow_enqueue_packed(vq, buffer_len, buffer_buf_id,
>>> +					   buffer_desc_count, *nr_buffers);
>>> +
>>> +	return 0;
>>> +}
>>> +
>>> +static __rte_always_inline int16_t
>>> +virtio_dev_rx_async_single_packed(struct virtio_net *dev,
>>> +			    struct vhost_virtqueue *vq,
>>> +			    struct rte_mbuf *pkt,
>>> +			    uint16_t *nr_descs, uint16_t *nr_buffers,
>>> +			    struct vring_packed_desc *async_descs,
>>> +			    struct iovec *src_iovec, struct iovec *dst_iovec,
>>> +			    struct rte_vhost_iov_iter *src_it,
>>> +			    struct rte_vhost_iov_iter *dst_it) {
>>> +	struct buf_vector buf_vec[BUF_VECTOR_MAX];
>>> +	*nr_descs = 0;
>>> +	*nr_buffers = 0;
>>> +
>>> +	if (unlikely(vhost_enqueue_async_single_packed(dev, vq, pkt,
>> buf_vec,
>>> +						 nr_descs,
>>> +						 nr_buffers,
>>> +						 async_descs,
>>> +						 src_iovec, dst_iovec,
>>> +						 src_it, dst_it) < 0)) {
>>> +		VHOST_LOG_DATA(DEBUG,
>>> +				"(%d) failed to get enough desc from vring\n",
>>> +				dev->vid);
>>> +		return -1;
>>> +	}
>>> +
>>> +	VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end
>> index %d\n",
>>> +			dev->vid, vq->last_avail_idx,
>>> +			vq->last_avail_idx + *nr_descs);
>>> +
>>> +	return 0;
>>> +}
>>> +
>>> +static __rte_noinline uint32_t
>>> +virtio_dev_rx_async_submit_packed(struct virtio_net *dev,
>>> +	struct vhost_virtqueue *vq, uint16_t queue_id,
>>> +	struct rte_mbuf **pkts, uint32_t count,
>>> +	struct rte_mbuf **comp_pkts, uint32_t *comp_count) {
>>> +	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
>>> +	uint16_t async_descs_idx = 0;
>>> +	uint16_t num_buffers;
>>> +	uint16_t num_desc;
>>> +
>>> +	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
>>> +	struct iovec *vec_pool = vq->vec_pool;
>>> +	struct rte_vhost_async_desc tdes[MAX_PKT_BURST];
>>> +	struct iovec *src_iovec = vec_pool;
>>> +	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >>
>> 1);
>>> +	struct rte_vhost_iov_iter *src_it = it_pool;
>>> +	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
>>> +	uint16_t slot_idx = 0;
>>> +	uint16_t segs_await = 0;
>>> +	uint16_t iovec_idx = 0, it_idx = 0;
>>> +	struct async_inflight_info *pkts_info = vq->async_pkts_info;
>>> +	uint32_t n_pkts = 0, pkt_err = 0;
>>> +	uint32_t num_async_pkts = 0, num_done_pkts = 0;
>>> +	struct vring_packed_desc async_descs[vq->size];
>>> +
>>> +	rte_prefetch0(&vq->desc_packed[vq->last_avail_idx & (vq->size -
>>> +1)]);
>>
>> The size of the ring is not necessarily a power of two with packed ring.
> 
> For the size of the ring is not necessarily a power of two,
> so maybe I can use codes like 
> Indx % vq->size  ?
> I'm not sure if it's a good way to do that.

In this case it is OK.

>>
>>> +
>>> +	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
>>> +		if (unlikely(virtio_dev_rx_async_single_packed(dev, vq,
>>> +						pkts[pkt_idx],
>>> +						&num_desc, &num_buffers,
>>> +
>> 	&async_descs[async_descs_idx],
>>> +						&src_iovec[iovec_idx],
>>> +						&dst_iovec[iovec_idx],
>>> +						&src_it[it_idx],
>>> +						&dst_it[it_idx]) < 0))
>>> +			break;
>>> +
>>> +		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end
>> index %d\n",
>>> +			dev->vid, vq->last_avail_idx,
>>> +			vq->last_avail_idx + num_desc);
>>> +
>>> +		slot_idx = (vq->async_pkts_idx + num_async_pkts) &
>>> +			(vq->size - 1);
>>
>> Same here.
> 
> Sure.
> 
>>
>>> +		if (src_it[it_idx].count) {
>>> +			uint16_t from, to;
>>> +
>>> +			async_descs_idx += num_desc;
>>> +			async_fill_desc(&tdes[pkt_burst_idx++],
>> &src_it[it_idx],
>>> +					&dst_it[it_idx]);
>>> +			pkts_info[slot_idx].descs = num_desc;
>>> +			pkts_info[slot_idx].nr_buffers = num_buffers;
>>> +			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
>>> +			num_async_pkts++;
>>> +			iovec_idx += src_it[it_idx].nr_segs;
>>> +			it_idx += 2;
>>> +
>>> +			segs_await += src_it[it_idx].nr_segs;
>>> +
>>> +			/**
>>> +			 * recover shadow used ring and keep DMA-occupied
>>> +			 * descriptors.
>>> +			 */
>>> +			from = vq->shadow_used_idx - num_buffers;
>>> +			to = vq->async_packed_buffer_idx & (vq->size - 1);
>>> +			shadow_ring_store(vq, vq->shadow_used_packed,
>>> +					vq->async_buffers_packed,
>>> +					from, to, num_buffers,
>>> +					sizeof(struct
>> vring_used_elem_packed));
>>> +
>>> +			vq->async_packed_buffer_idx += num_buffers;
>>> +			vq->shadow_used_idx -= num_buffers;
>>> +		} else
>>
>> Brackets needed.
> 
> Sure.
> 
>>
>>> +			comp_pkts[num_done_pkts++] = pkts[pkt_idx];
>>> +
>>> +		vq_inc_last_avail_packed(vq, num_desc);
>>> +
>>> +		/*
>>> +		 * conditions to trigger async device transfer:
>>> +		 * - buffered packet number reaches transfer threshold
>>> +		 * - unused async iov number is less than max vhost vector
>>> +		 */
>>> +		if (unlikely(pkt_burst_idx >=
>> VHOST_ASYNC_BATCH_THRESHOLD ||
>>> +			((VHOST_MAX_ASYNC_VEC >> 1) - segs_await <
>>> +			BUF_VECTOR_MAX))) {
>>> +			n_pkts = vq->async_ops.transfer_data(dev->vid,
>>> +					queue_id, tdes, 0, pkt_burst_idx);
>>> +			iovec_idx = 0;
>>> +			it_idx = 0;
>>> +			segs_await = 0;
>>> +			vq->async_pkts_inflight_n += n_pkts;
>>> +
>>> +			if (unlikely(n_pkts < pkt_burst_idx)) {
>>> +				/*
>>> +				 * log error packets number here and do
>> actual
>>> +				 * error processing when applications poll
>>> +				 * completion
>>> +				 */
>>> +				pkt_err = pkt_burst_idx - n_pkts;
>>> +				pkt_burst_idx = 0;
>>> +				pkt_idx++;
>>> +				break;
>>> +			}
>>> +
>>> +			pkt_burst_idx = 0;
>>> +		}
>>> +	}
>>> +
>>> +	if (pkt_burst_idx) {
>>> +		n_pkts = vq->async_ops.transfer_data(dev->vid,
>>> +				queue_id, tdes, 0, pkt_burst_idx);
>>> +		vq->async_pkts_inflight_n += n_pkts;
>>> +
>>> +		if (unlikely(n_pkts < pkt_burst_idx))
>>> +			pkt_err = pkt_burst_idx - n_pkts;
>>> +	}
>>> +
>>> +	do_data_copy_enqueue(dev, vq);
>>> +
>>> +	if (unlikely(pkt_err)) {
>>> +		uint16_t descs_err = 0;
>>> +		uint16_t buffers_err = 0;
>>> +
>>> +		num_async_pkts -= pkt_err;
>>> +		pkt_idx -= pkt_err;
>>> +	/* calculate the sum of buffers and descs of DMA-error packets. */
>>> +		while (pkt_err-- > 0) {
>>> +			descs_err +=
>>> +				pkts_info[slot_idx & (vq->size - 1)].descs;
>>
>> The size of the ring is not necessarily a power of two with packed ring.
> 
> Will be fixed.
> 
>>
>>> +			buffers_err +=
>>> +				pkts_info[slot_idx & (vq->size -
>> 1)].nr_buffers;
>>
>> Ditto.
> 
> Will be fixed.
> 
>>
>>> +			slot_idx--;
>>> +		}
>>> +
>>> +		vq->async_packed_buffer_idx -= buffers_err;
>>> +
>>> +		if (vq->last_avail_idx >= descs_err) {
>>> +			vq->last_avail_idx -= descs_err;
>>> +
>>> +			rte_memcpy(&vq->desc_packed[vq->last_avail_idx],
>>> +				&async_descs[async_descs_idx - descs_err],
>>> +				descs_err * sizeof(struct
>> vring_packed_desc));
>>> +		} else {
>>> +			uint16_t nr_copy;
>>> +
>>> +			vq->last_avail_idx = vq->last_avail_idx + vq->size
>>> +						- descs_err;
>>> +			nr_copy = vq->size - vq->last_avail_idx;
>>> +			rte_memcpy(&vq->desc_packed[vq->last_avail_idx],
>>> +				&async_descs[async_descs_idx - descs_err],
>>> +				nr_copy * sizeof(struct vring_packed_desc));
>>> +			descs_err -= nr_copy;
>>> +			rte_memcpy(vq->desc_packed,
>>> +				&async_descs[async_descs_idx - descs_err],
>>> +				descs_err * sizeof(struct
>> vring_packed_desc));
>>> +			vq->avail_wrap_counter ^= 1;
>>> +		}
>>> +
>>> +		num_done_pkts = pkt_idx - num_async_pkts;
>>> +	}
>>
>> This error handling could be moved in a dedicated function.
> 
> Sure, will fix it in the next version.
> 
>>
>>> +	vq->async_pkts_idx += num_async_pkts;
>>> +	*comp_count = num_done_pkts;
>>> +
>>> +	if (likely(vq->shadow_used_idx)) {
>>> +		vhost_flush_enqueue_shadow_packed(dev, vq);
>>> +		vhost_vring_call_packed(dev, vq);
>>> +	}
>>> +
>>> +	return pkt_idx;
>>> +}
>>
>> Above function is very big and complex, it should be possible to split it in
>> several ones to make it maintainable.
> 
> I think move the error handling code will make it smaller.
> 
> Thanks.
> Cheng
> 
>>
>>> +
>>>  static __rte_always_inline void
>>>  write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t
>>> n_descs)  { @@ -1693,12 +2048,40 @@
>>> write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t
>> n_descs)
>>>  	} while (nr_left > 0);
>>>  }
>>>
>>> +static __rte_always_inline void
>>> +write_back_completed_descs_packed(struct vhost_virtqueue *vq,
>>> +				uint16_t n_buffers)
>>> +{
>>> +	uint16_t nr_left = n_buffers;
>>> +	uint16_t from, to;
>>> +
>>> +	do {
>>> +		from = vq->last_async_buffer_idx &
>>> +						(vq->size - 1);
>>> +		to = (from + nr_left) & (vq->size - 1);
>>
>> The size of the ring is not necessarily a power of two with packed ring.
> 
> Sure.
> 
>>
>>> +		if (to > from) {
>>> +			vhost_update_used_packed(vq,
>>> +				vq->async_buffers_packed + from,
>>> +				to - from);
>>> +			vq->last_async_buffer_idx += nr_left;
>>> +			nr_left = 0;
>>> +		} else {
>>> +			vhost_update_used_packed(vq,
>>> +				vq->async_buffers_packed + from,
>>> +				vq->size - from);
>>> +			vq->last_async_buffer_idx +=
>>> +						vq->size - from;
>>> +			nr_left -= vq->size - from;
>>> +		}
>>> +	} while (nr_left > 0);
>>> +}
>>> +
>>>  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
>>>  		struct rte_mbuf **pkts, uint16_t count)  {
>>>  	struct virtio_net *dev = get_device(vid);
>>>  	struct vhost_virtqueue *vq;
>>> -	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0;
>>> +	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0, n_buffers = 0;
>>>  	uint16_t start_idx, pkts_idx, vq_size;
>>>  	struct async_inflight_info *pkts_info;
>>>  	uint16_t from, i;
>>> @@ -1740,21 +2123,41 @@ uint16_t
>> rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
>>>  		goto done;
>>>  	}
>>>
>>> -	for (i = 0; i < n_pkts_put; i++) {
>>> -		from = (start_idx + i) & (vq_size - 1);
>>> -		n_descs += pkts_info[from].descs;
>>> -		pkts[i] = pkts_info[from].mbuf;
>>> +	if (vq_is_packed(dev)) {
>>> +		for (i = 0; i < n_pkts_put; i++) {
>>> +			from = (start_idx + i) & (vq_size - 1);
>>
>> Unlike split ring, packed ring size is not necessarily a power of 2.
> 
> Sure.
> Thanks.
> 
>>
>>> +			n_buffers += pkts_info[from].nr_buffers;
>>> +			pkts[i] = pkts_info[from].mbuf;
>>> +		}
>>> +	} else {
>>> +		for (i = 0; i < n_pkts_put; i++) {
>>> +			from = (start_idx + i) & (vq_size - 1);
>>> +			n_descs += pkts_info[from].descs;
>>> +			pkts[i] = pkts_info[from].mbuf;
>>> +		}
>>>  	}
>>> +
>>>  	vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
>>>  	vq->async_pkts_inflight_n -= n_pkts_put;
>>>
>>>  	if (likely(vq->enabled && vq->access_ok)) {
>>> -		write_back_completed_descs_split(vq, n_descs);
>>> +		if (vq_is_packed(dev)) {
>>> +			write_back_completed_descs_packed(vq,
>> n_buffers);
>>>
>>> -		__atomic_add_fetch(&vq->used->idx, n_descs,
>> __ATOMIC_RELEASE);
>>> -		vhost_vring_call_split(dev, vq);
>>> -	} else
>>> -		vq->last_async_desc_idx += n_descs;
>>> +			vhost_vring_call_packed(dev, vq);
>>> +		} else {
>>> +			write_back_completed_descs_split(vq, n_descs);
>>> +
>>> +			__atomic_add_fetch(&vq->used->idx, n_descs,
>>> +					__ATOMIC_RELEASE);
>>> +			vhost_vring_call_split(dev, vq);
>>> +		}
>>> +	} else {
>>> +		if (vq_is_packed(dev))
>>> +			vq->last_async_buffer_idx += n_buffers;
>>> +		else
>>> +			vq->last_async_desc_idx += n_descs;
>>> +	}
>>>
>>>  done:
>>>  	rte_spinlock_unlock(&vq->access_lock);
>>> @@ -1795,9 +2198,10 @@ virtio_dev_rx_async_submit(struct virtio_net
>> *dev, uint16_t queue_id,
>>>  	if (count == 0)
>>>  		goto out;
>>>
>>> -	/* TODO: packed queue not implemented */
>>>  	if (vq_is_packed(dev))
>>> -		nb_tx = 0;
>>> +		nb_tx = virtio_dev_rx_async_submit_packed(dev,
>>> +				vq, queue_id, pkts, count, comp_pkts,
>>> +				comp_count);
>>>  	else
>>>  		nb_tx = virtio_dev_rx_async_submit_split(dev,
>>>  				vq, queue_id, pkts, count, comp_pkts,
>>>
>
Jiang, Cheng1 April 13, 2021, 1:50 p.m. UTC | #4
> -----Original Message-----
> From: Maxime Coquelin <maxime.coquelin@redhat.com>
> Sent: Tuesday, April 13, 2021 9:08 PM
> To: Jiang, Cheng1 <cheng1.jiang@intel.com>; Xia, Chenbo
> <chenbo.xia@intel.com>
> Cc: dev@dpdk.org; Hu, Jiayu <jiayu.hu@intel.com>; Yang, YvonneX
> <yvonnex.yang@intel.com>; Wang, Yinan <yinan.wang@intel.com>; Liu,
> Yong <yong.liu@intel.com>
> Subject: Re: [PATCH v5 2/4] vhost: add support for packed ring in async vhost
> 
> 
> 
> On 4/13/21 1:48 PM, Jiang, Cheng1 wrote:
> > Hi Maxime,
> >
> >> -----Original Message-----
> >> From: Maxime Coquelin <maxime.coquelin@redhat.com>
> >> Sent: Tuesday, April 13, 2021 4:37 PM
> >> To: Jiang, Cheng1 <cheng1.jiang@intel.com>; Xia, Chenbo
> >> <chenbo.xia@intel.com>
> >> Cc: dev@dpdk.org; Hu, Jiayu <jiayu.hu@intel.com>; Yang, YvonneX
> >> <yvonnex.yang@intel.com>; Wang, Yinan <yinan.wang@intel.com>; Liu,
> >> Yong <yong.liu@intel.com>
> >> Subject: Re: [PATCH v5 2/4] vhost: add support for packed ring in
> >> async vhost
> >>
> >>
> >>
> >> On 4/12/21 1:34 PM, Cheng Jiang wrote:
> >>> For now async vhost data path only supports split ring structure. In
> >>> order to make async vhost compatible with virtio 1.1 spec this patch
> >>> enables packed ring in async vhost data path.
> >>>
> >>> Signed-off-by: Cheng Jiang <Cheng1.jiang@intel.com>
> >>> ---
> >>>  lib/librte_vhost/rte_vhost_async.h |   1 +
> >>>  lib/librte_vhost/vhost.c           |  27 +-
> >>>  lib/librte_vhost/vhost.h           |   7 +-
> >>>  lib/librte_vhost/virtio_net.c      | 438
> +++++++++++++++++++++++++++--
> >>>  4 files changed, 448 insertions(+), 25 deletions(-)
> >>>
> >>> diff --git a/lib/librte_vhost/rte_vhost_async.h
> >>> b/lib/librte_vhost/rte_vhost_async.h
> >>> index c855ff875..6faa31f5a 100644
> >>> --- a/lib/librte_vhost/rte_vhost_async.h
> >>> +++ b/lib/librte_vhost/rte_vhost_async.h
> >>> @@ -89,6 +89,7 @@ struct rte_vhost_async_channel_ops {  struct
> >>> async_inflight_info {
> >>>  	struct rte_mbuf *mbuf;
> >>>  	uint16_t descs; /* num of descs inflight */
> >>> +	uint16_t nr_buffers; /* num of buffers inflight for packed ring */
> >>>  };
> >>>
> >>>  /**
> >>> diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c
> >>> index a70fe01d8..8c9935c0f 100644
> >>> --- a/lib/librte_vhost/vhost.c
> >>> +++ b/lib/librte_vhost/vhost.c
> >>> @@ -342,15 +342,21 @@ vhost_free_async_mem(struct
> vhost_virtqueue
> >> *vq)
> >>> {
> >>>  	if (vq->async_pkts_info)
> >>>  		rte_free(vq->async_pkts_info);
> >>> -	if (vq->async_descs_split)
> >>> +	if (vq->async_buffers_packed) {
> >>> +		rte_free(vq->async_buffers_packed);
> >>> +		vq->async_buffers_packed = NULL;
> >>> +	}
> >>> +	if (vq->async_descs_split) {
> >>
> >> You can remove the check, rte_free is safe with NULL pointers.
> >> You can do the same for the other ones in this function.
> >
> > OK, it will be fixed.
> >
> >>
> >>>  		rte_free(vq->async_descs_split);
> >>> +		vq->async_descs_split = NULL;
> >>> +	}
> >>> +
> >>>  	if (vq->it_pool)
> >>>  		rte_free(vq->it_pool);
> >>>  	if (vq->vec_pool)
> >>>  		rte_free(vq->vec_pool);
> >>>
> >>>  	vq->async_pkts_info = NULL;
> >>> -	vq->async_descs_split = NULL;
> >>>  	vq->it_pool = NULL;
> >>>  	vq->vec_pool = NULL;
> >>>  }
> >>> @@ -1627,9 +1633,9 @@ int rte_vhost_async_channel_register(int vid,
> >> uint16_t queue_id,
> >>>  		return -1;
> >>>
> >>>  	/* packed queue is not supported */
> >>> -	if (unlikely(vq_is_packed(dev) || !f.async_inorder)) {
> >>> +	if (unlikely(!f.async_inorder)) {
> >>>  		VHOST_LOG_CONFIG(ERR,
> >>> -			"async copy is not supported on packed queue or
> >> non-inorder mode "
> >>> +			"async copy is not supported on non-inorder mode "
> >>>  			"(vid %d, qid: %d)\n", vid, queue_id);
> >>>  		return -1;
> >>>  	}
> >>> @@ -1667,11 +1673,18 @@ int rte_vhost_async_channel_register(int
> >>> vid,
> >> uint16_t queue_id,
> >>>  	vq->vec_pool = rte_malloc_socket(NULL,
> >>>  			VHOST_MAX_ASYNC_VEC * sizeof(struct iovec),
> >>>  			RTE_CACHE_LINE_SIZE, node);
> >>> -	vq->async_descs_split = rte_malloc_socket(NULL,
> >>> +	if (vq_is_packed(dev)) {
> >>> +		vq->async_buffers_packed = rte_malloc_socket(NULL,
> >>> +			vq->size * sizeof(struct vring_used_elem_packed),
> >>> +			RTE_CACHE_LINE_SIZE, node);
> >>> +	} else {
> >>> +		vq->async_descs_split = rte_malloc_socket(NULL,
> >>>  			vq->size * sizeof(struct vring_used_elem),
> >>>  			RTE_CACHE_LINE_SIZE, node);
> >>> -	if (!vq->async_descs_split || !vq->async_pkts_info ||
> >>> -		!vq->it_pool || !vq->vec_pool) {
> >>> +	}
> >>> +
> >>> +	if (!vq->async_buffers_packed || !vq->async_descs_split ||
> >>> +		!vq->async_pkts_info || !vq->it_pool || !vq->vec_pool) {
> >>>  		vhost_free_async_mem(vq);
> >>>  		VHOST_LOG_CONFIG(ERR,
> >>>  				"async register failed: cannot allocate
> >> memory for vq data "
> >>> diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> >>> index f628714c2..fe131ae8f 100644
> >>> --- a/lib/librte_vhost/vhost.h
> >>> +++ b/lib/librte_vhost/vhost.h
> >>> @@ -201,9 +201,14 @@ struct vhost_virtqueue {
> >>>  	uint16_t	async_pkts_idx;
> >>>  	uint16_t	async_pkts_inflight_n;
> >>>  	uint16_t	async_last_pkts_n;
> >>> -	struct vring_used_elem  *async_descs_split;
> >>> +	union {
> >>> +		struct vring_used_elem  *async_descs_split;
> >>> +		struct vring_used_elem_packed *async_buffers_packed;
> >>> +	};
> >>>  	uint16_t async_desc_idx;
> >>> +	uint16_t async_packed_buffer_idx;
> >>
> >> Don't dupplicate variable names, async_desc_idx can be reused for
> >> packed ring. Also, they are representing the same thing, why use desc
> >> in one case and buffer in the other?
> >
> > The main reason is that the unit of the packed used ring is buffer, which can
> contain many desc.
> > I think using desc_idx will cause ambiguity, but if you think that I should
> reuse the desc_idx, I have no problem with that.
> 
> OK, in this case please use a union not to waste memory.

Sure, I'll fix it.

> 
> >>
> >>>  	uint16_t last_async_desc_idx;
> >>> +	uint16_t last_async_buffer_idx;
> >>
> >> Same remark here.
> >>
> >>>  	/* vq async features */
> >>>  	bool		async_inorder;
> >>> diff --git a/lib/librte_vhost/virtio_net.c
> >>> b/lib/librte_vhost/virtio_net.c index c43ab0093..410be9678 100644
> >>> --- a/lib/librte_vhost/virtio_net.c
> >>> +++ b/lib/librte_vhost/virtio_net.c
> >>> @@ -363,14 +363,14 @@
> >>> vhost_shadow_dequeue_single_packed_inorder(struct
> vhost_virtqueue
> >> *vq,
> >>> }
> >>>
> >>>  static __rte_always_inline void
> >>> -vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
> >>> -				   struct vhost_virtqueue *vq,
> >>> -				   uint32_t len[],
> >>> -				   uint16_t id[],
> >>> -				   uint16_t count[],
> >>> +vhost_shadow_enqueue_packed(struct vhost_virtqueue *vq,
> >>> +				   uint32_t *len,
> >>> +				   uint16_t *id,
> >>> +				   uint16_t *count,
> >>>  				   uint16_t num_buffers)
> >>>  {
> >>>  	uint16_t i;
> >>> +
> >>>  	for (i = 0; i < num_buffers; i++) {
> >>>  		/* enqueue shadow flush action aligned with batch num */
> >>>  		if (!vq->shadow_used_idx)
> >>> @@ -382,6 +382,17 @@ vhost_shadow_enqueue_single_packed(struct
> >> virtio_net *dev,
> >>>  		vq->shadow_aligned_idx += count[i];
> >>>  		vq->shadow_used_idx++;
> >>>  	}
> >>> +}
> >>> +
> >>> +static __rte_always_inline void
> >>> +vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
> >>> +				   struct vhost_virtqueue *vq,
> >>> +				   uint32_t *len,
> >>> +				   uint16_t *id,
> >>> +				   uint16_t *count,
> >>> +				   uint16_t num_buffers)
> >>> +{
> >>> +	vhost_shadow_enqueue_packed(vq, len, id, count, num_buffers);
> >>>
> >>>  	if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
> >>>  		do_data_copy_enqueue(dev, vq);
> >>> @@ -1481,6 +1492,62 @@ shadow_ring_store(struct vhost_virtqueue
> *vq,
> >> void *shadow_ring, void *d_ring,
> >>>  	}
> >>>  }
> >>>
> >>> +static __rte_always_inline void
> >>> +vhost_update_used_packed(struct vhost_virtqueue *vq,
> >>> +			struct vring_used_elem_packed *shadow_ring,
> >>> +			uint16_t count)
> >>> +{
> >>> +	if (count == 0)
> >>> +		return;
> >>
> >> Move this after the variables declaration.
> >
> > Sure.
> >
> >>
> >>> +
> >>> +	int i;
> >>> +	uint16_t used_idx = vq->last_used_idx;
> >>> +	uint16_t head_idx = vq->last_used_idx;
> >>> +	uint16_t head_flags = 0;
> >>> +
> >>> +	/* Split loop in two to save memory barriers */
> >>> +	for (i = 0; i < count; i++) {
> >>> +		vq->desc_packed[used_idx].id = shadow_ring[i].id;
> >>> +		vq->desc_packed[used_idx].len = shadow_ring[i].len;
> >>> +
> >>> +		used_idx += shadow_ring[i].count;
> >>> +		if (used_idx >= vq->size)
> >>> +			used_idx -= vq->size;
> >>> +	}
> >>> +
> >>> +	/* The ordering for storing desc flags needs to be enforced. */
> >>> +	rte_atomic_thread_fence(__ATOMIC_RELEASE);
> >>> +
> >>> +	for (i = 0; i < count; i++) {
> >>> +		uint16_t flags;
> >>> +
> >>> +		if (vq->shadow_used_packed[i].len)
> >>> +			flags = VRING_DESC_F_WRITE;
> >>> +		else
> >>> +			flags = 0;
> >>> +
> >>> +		if (vq->used_wrap_counter) {
> >>> +			flags |= VRING_DESC_F_USED;
> >>> +			flags |= VRING_DESC_F_AVAIL;
> >>> +		} else {
> >>> +			flags &= ~VRING_DESC_F_USED;
> >>> +			flags &= ~VRING_DESC_F_AVAIL;
> >>> +		}
> >>> +
> >>> +		if (i > 0) {
> >>> +			vq->desc_packed[vq->last_used_idx].flags = flags;
> >>> +
> >>> +		} else {
> >>> +			head_idx = vq->last_used_idx;
> >>> +			head_flags = flags;
> >>> +		}
> >>> +
> >>> +		vq_inc_last_used_packed(vq, shadow_ring[i].count);
> >>> +	}
> >>> +
> >>> +	vq->desc_packed[head_idx].flags = head_flags; }
> >>> +
> >>>  static __rte_noinline uint32_t
> >>>  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> >>>  	struct vhost_virtqueue *vq, uint16_t queue_id, @@ -1656,6
> >> +1723,294
> >>> @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> >>>  	return pkt_idx;
> >>>  }
> >>>
> >>> +static __rte_always_inline int
> >>> +vhost_enqueue_async_single_packed(struct virtio_net *dev,
> >>> +			    struct vhost_virtqueue *vq,
> >>> +			    struct rte_mbuf *pkt,
> >>> +			    struct buf_vector *buf_vec,
> >>> +			    uint16_t *nr_descs,
> >>> +			    uint16_t *nr_buffers,
> >>> +			    struct vring_packed_desc *async_descs,
> >>> +			    struct iovec *src_iovec, struct iovec *dst_iovec,
> >>> +			    struct rte_vhost_iov_iter *src_it,
> >>> +			    struct rte_vhost_iov_iter *dst_it) {
> >>> +	uint16_t nr_vec = 0;
> >>> +	uint16_t avail_idx = vq->last_avail_idx;
> >>> +	uint16_t max_tries, tries = 0;
> >>> +	uint16_t buf_id = 0;
> >>> +	uint32_t len = 0;
> >>> +	uint16_t desc_count = 0;
> >>> +	uint32_t size = pkt->pkt_len + sizeof(struct
> >> virtio_net_hdr_mrg_rxbuf);
> >>> +	uint32_t buffer_len[vq->size];
> >>> +	uint16_t buffer_buf_id[vq->size];
> >>> +	uint16_t buffer_desc_count[vq->size];
> >>> +	*nr_buffers = 0;
> >>> +
> >>> +	if (rxvq_is_mergeable(dev))
> >>> +		max_tries = vq->size - 1;
> >>> +	else
> >>> +		max_tries = 1;
> >>> +
> >>> +	while (size > 0) {
> >>> +		/*
> >>> +		 * if we tried all available ring items, and still
> >>> +		 * can't get enough buf, it means something abnormal
> >>> +		 * happened.
> >>> +		 */
> >>> +		if (unlikely(++tries > max_tries))
> >>> +			return -1;
> >>> +
> >>> +		if (unlikely(fill_vec_buf_packed(dev, vq,
> >>> +						avail_idx, &desc_count,
> >>> +						buf_vec, &nr_vec,
> >>> +						&buf_id, &len,
> >>> +						VHOST_ACCESS_RW) < 0))
> >>> +			return -1;
> >>> +
> >>> +		len = RTE_MIN(len, size);
> >>> +		size -= len;
> >>> +
> >>> +		buffer_len[*nr_buffers] = len;
> >>> +		buffer_buf_id[*nr_buffers] = buf_id;
> >>> +		buffer_desc_count[*nr_buffers] = desc_count;
> >>> +		*nr_buffers += 1;
> >>> +
> >>> +		*nr_descs += desc_count;
> >>> +		avail_idx += desc_count;
> >>> +		if (avail_idx >= vq->size)
> >>> +			avail_idx -= vq->size;
> >>> +	}
> >>> +
> >>> +	if (async_mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, *nr_buffers,
> >>> +		src_iovec, dst_iovec, src_it, dst_it) < 0)
> >>> +		return -1;
> >>> +	/* store descriptors for DMA */
> >>> +	if (avail_idx >= *nr_descs)
> >>> +		rte_memcpy(async_descs,
> >>> +			&vq->desc_packed[vq->last_avail_idx],
> >>> +			*nr_descs * sizeof(struct vring_packed_desc));
> >>
> >> Please add brackets for the 'if' since there are for the 'else'.
> >
> > Sure, sorry for that.
> >
> >>
> >>> +	else {
> >>> +		uint16_t nr_copy = vq->size - vq->last_avail_idx;
> >>> +		rte_memcpy(async_descs,
> >>> +			&vq->desc_packed[vq->last_avail_idx],
> >>> +			nr_copy * sizeof(struct vring_packed_desc));
> >>> +		rte_memcpy(async_descs + nr_copy,
> >>> +			vq->desc_packed, (*nr_descs - nr_copy) *
> >>> +			sizeof(struct vring_packed_desc));
> >>> +	}
> >>> +
> >>> +	vhost_shadow_enqueue_packed(vq, buffer_len, buffer_buf_id,
> >>> +					   buffer_desc_count, *nr_buffers);
> >>> +
> >>> +	return 0;
> >>> +}
> >>> +
> >>> +static __rte_always_inline int16_t
> >>> +virtio_dev_rx_async_single_packed(struct virtio_net *dev,
> >>> +			    struct vhost_virtqueue *vq,
> >>> +			    struct rte_mbuf *pkt,
> >>> +			    uint16_t *nr_descs, uint16_t *nr_buffers,
> >>> +			    struct vring_packed_desc *async_descs,
> >>> +			    struct iovec *src_iovec, struct iovec *dst_iovec,
> >>> +			    struct rte_vhost_iov_iter *src_it,
> >>> +			    struct rte_vhost_iov_iter *dst_it) {
> >>> +	struct buf_vector buf_vec[BUF_VECTOR_MAX];
> >>> +	*nr_descs = 0;
> >>> +	*nr_buffers = 0;
> >>> +
> >>> +	if (unlikely(vhost_enqueue_async_single_packed(dev, vq, pkt,
> >> buf_vec,
> >>> +						 nr_descs,
> >>> +						 nr_buffers,
> >>> +						 async_descs,
> >>> +						 src_iovec, dst_iovec,
> >>> +						 src_it, dst_it) < 0)) {
> >>> +		VHOST_LOG_DATA(DEBUG,
> >>> +				"(%d) failed to get enough desc from vring\n",
> >>> +				dev->vid);
> >>> +		return -1;
> >>> +	}
> >>> +
> >>> +	VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end
> >> index %d\n",
> >>> +			dev->vid, vq->last_avail_idx,
> >>> +			vq->last_avail_idx + *nr_descs);
> >>> +
> >>> +	return 0;
> >>> +}
> >>> +
> >>> +static __rte_noinline uint32_t
> >>> +virtio_dev_rx_async_submit_packed(struct virtio_net *dev,
> >>> +	struct vhost_virtqueue *vq, uint16_t queue_id,
> >>> +	struct rte_mbuf **pkts, uint32_t count,
> >>> +	struct rte_mbuf **comp_pkts, uint32_t *comp_count) {
> >>> +	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
> >>> +	uint16_t async_descs_idx = 0;
> >>> +	uint16_t num_buffers;
> >>> +	uint16_t num_desc;
> >>> +
> >>> +	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
> >>> +	struct iovec *vec_pool = vq->vec_pool;
> >>> +	struct rte_vhost_async_desc tdes[MAX_PKT_BURST];
> >>> +	struct iovec *src_iovec = vec_pool;
> >>> +	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >>
> >> 1);
> >>> +	struct rte_vhost_iov_iter *src_it = it_pool;
> >>> +	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
> >>> +	uint16_t slot_idx = 0;
> >>> +	uint16_t segs_await = 0;
> >>> +	uint16_t iovec_idx = 0, it_idx = 0;
> >>> +	struct async_inflight_info *pkts_info = vq->async_pkts_info;
> >>> +	uint32_t n_pkts = 0, pkt_err = 0;
> >>> +	uint32_t num_async_pkts = 0, num_done_pkts = 0;
> >>> +	struct vring_packed_desc async_descs[vq->size];
> >>> +
> >>> +	rte_prefetch0(&vq->desc_packed[vq->last_avail_idx & (vq->size -
> >>> +1)]);
> >>
> >> The size of the ring is not necessarily a power of two with packed ring.
> >
> > For the size of the ring is not necessarily a power of two, so maybe I
> > can use codes like Indx % vq->size  ?
> > I'm not sure if it's a good way to do that.
> 
> In this case it is OK.

OK. I'll fix them in the next version.

Thanks a lot.
Cheng

> 
> >>
> >>> +
> >>> +	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
> >>> +		if (unlikely(virtio_dev_rx_async_single_packed(dev, vq,
> >>> +						pkts[pkt_idx],
> >>> +						&num_desc, &num_buffers,
> >>> +
> >> 	&async_descs[async_descs_idx],
> >>> +						&src_iovec[iovec_idx],
> >>> +						&dst_iovec[iovec_idx],
> >>> +						&src_it[it_idx],
> >>> +						&dst_it[it_idx]) < 0))
> >>> +			break;
> >>> +
> >>> +		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end
> >> index %d\n",
> >>> +			dev->vid, vq->last_avail_idx,
> >>> +			vq->last_avail_idx + num_desc);
> >>> +
> >>> +		slot_idx = (vq->async_pkts_idx + num_async_pkts) &
> >>> +			(vq->size - 1);
> >>
> >> Same here.
> >
> > Sure.
> >
> >>
> >>> +		if (src_it[it_idx].count) {
> >>> +			uint16_t from, to;
> >>> +
> >>> +			async_descs_idx += num_desc;
> >>> +			async_fill_desc(&tdes[pkt_burst_idx++],
> >> &src_it[it_idx],
> >>> +					&dst_it[it_idx]);
> >>> +			pkts_info[slot_idx].descs = num_desc;
> >>> +			pkts_info[slot_idx].nr_buffers = num_buffers;
> >>> +			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
> >>> +			num_async_pkts++;
> >>> +			iovec_idx += src_it[it_idx].nr_segs;
> >>> +			it_idx += 2;
> >>> +
> >>> +			segs_await += src_it[it_idx].nr_segs;
> >>> +
> >>> +			/**
> >>> +			 * recover shadow used ring and keep DMA-occupied
> >>> +			 * descriptors.
> >>> +			 */
> >>> +			from = vq->shadow_used_idx - num_buffers;
> >>> +			to = vq->async_packed_buffer_idx & (vq->size - 1);
> >>> +			shadow_ring_store(vq, vq->shadow_used_packed,
> >>> +					vq->async_buffers_packed,
> >>> +					from, to, num_buffers,
> >>> +					sizeof(struct
> >> vring_used_elem_packed));
> >>> +
> >>> +			vq->async_packed_buffer_idx += num_buffers;
> >>> +			vq->shadow_used_idx -= num_buffers;
> >>> +		} else
> >>
> >> Brackets needed.
> >
> > Sure.
> >
> >>
> >>> +			comp_pkts[num_done_pkts++] = pkts[pkt_idx];
> >>> +
> >>> +		vq_inc_last_avail_packed(vq, num_desc);
> >>> +
> >>> +		/*
> >>> +		 * conditions to trigger async device transfer:
> >>> +		 * - buffered packet number reaches transfer threshold
> >>> +		 * - unused async iov number is less than max vhost vector
> >>> +		 */
> >>> +		if (unlikely(pkt_burst_idx >=
> >> VHOST_ASYNC_BATCH_THRESHOLD ||
> >>> +			((VHOST_MAX_ASYNC_VEC >> 1) - segs_await <
> >>> +			BUF_VECTOR_MAX))) {
> >>> +			n_pkts = vq->async_ops.transfer_data(dev->vid,
> >>> +					queue_id, tdes, 0, pkt_burst_idx);
> >>> +			iovec_idx = 0;
> >>> +			it_idx = 0;
> >>> +			segs_await = 0;
> >>> +			vq->async_pkts_inflight_n += n_pkts;
> >>> +
> >>> +			if (unlikely(n_pkts < pkt_burst_idx)) {
> >>> +				/*
> >>> +				 * log error packets number here and do
> >> actual
> >>> +				 * error processing when applications poll
> >>> +				 * completion
> >>> +				 */
> >>> +				pkt_err = pkt_burst_idx - n_pkts;
> >>> +				pkt_burst_idx = 0;
> >>> +				pkt_idx++;
> >>> +				break;
> >>> +			}
> >>> +
> >>> +			pkt_burst_idx = 0;
> >>> +		}
> >>> +	}
> >>> +
> >>> +	if (pkt_burst_idx) {
> >>> +		n_pkts = vq->async_ops.transfer_data(dev->vid,
> >>> +				queue_id, tdes, 0, pkt_burst_idx);
> >>> +		vq->async_pkts_inflight_n += n_pkts;
> >>> +
> >>> +		if (unlikely(n_pkts < pkt_burst_idx))
> >>> +			pkt_err = pkt_burst_idx - n_pkts;
> >>> +	}
> >>> +
> >>> +	do_data_copy_enqueue(dev, vq);
> >>> +
> >>> +	if (unlikely(pkt_err)) {
> >>> +		uint16_t descs_err = 0;
> >>> +		uint16_t buffers_err = 0;
> >>> +
> >>> +		num_async_pkts -= pkt_err;
> >>> +		pkt_idx -= pkt_err;
> >>> +	/* calculate the sum of buffers and descs of DMA-error packets. */
> >>> +		while (pkt_err-- > 0) {
> >>> +			descs_err +=
> >>> +				pkts_info[slot_idx & (vq->size - 1)].descs;
> >>
> >> The size of the ring is not necessarily a power of two with packed ring.
> >
> > Will be fixed.
> >
> >>
> >>> +			buffers_err +=
> >>> +				pkts_info[slot_idx & (vq->size -
> >> 1)].nr_buffers;
> >>
> >> Ditto.
> >
> > Will be fixed.
> >
> >>
> >>> +			slot_idx--;
> >>> +		}
> >>> +
> >>> +		vq->async_packed_buffer_idx -= buffers_err;
> >>> +
> >>> +		if (vq->last_avail_idx >= descs_err) {
> >>> +			vq->last_avail_idx -= descs_err;
> >>> +
> >>> +			rte_memcpy(&vq->desc_packed[vq->last_avail_idx],
> >>> +				&async_descs[async_descs_idx - descs_err],
> >>> +				descs_err * sizeof(struct
> >> vring_packed_desc));
> >>> +		} else {
> >>> +			uint16_t nr_copy;
> >>> +
> >>> +			vq->last_avail_idx = vq->last_avail_idx + vq->size
> >>> +						- descs_err;
> >>> +			nr_copy = vq->size - vq->last_avail_idx;
> >>> +			rte_memcpy(&vq->desc_packed[vq->last_avail_idx],
> >>> +				&async_descs[async_descs_idx - descs_err],
> >>> +				nr_copy * sizeof(struct vring_packed_desc));
> >>> +			descs_err -= nr_copy;
> >>> +			rte_memcpy(vq->desc_packed,
> >>> +				&async_descs[async_descs_idx - descs_err],
> >>> +				descs_err * sizeof(struct
> >> vring_packed_desc));
> >>> +			vq->avail_wrap_counter ^= 1;
> >>> +		}
> >>> +
> >>> +		num_done_pkts = pkt_idx - num_async_pkts;
> >>> +	}
> >>
> >> This error handling could be moved in a dedicated function.
> >
> > Sure, will fix it in the next version.
> >
> >>
> >>> +	vq->async_pkts_idx += num_async_pkts;
> >>> +	*comp_count = num_done_pkts;
> >>> +
> >>> +	if (likely(vq->shadow_used_idx)) {
> >>> +		vhost_flush_enqueue_shadow_packed(dev, vq);
> >>> +		vhost_vring_call_packed(dev, vq);
> >>> +	}
> >>> +
> >>> +	return pkt_idx;
> >>> +}
> >>
> >> Above function is very big and complex, it should be possible to
> >> split it in several ones to make it maintainable.
> >
> > I think move the error handling code will make it smaller.
> >
> > Thanks.
> > Cheng
> >
> >>
> >>> +
> >>>  static __rte_always_inline void
> >>>  write_back_completed_descs_split(struct vhost_virtqueue *vq,
> >>> uint16_t
> >>> n_descs)  { @@ -1693,12 +2048,40 @@
> >>> write_back_completed_descs_split(struct vhost_virtqueue *vq,
> >>> uint16_t
> >> n_descs)
> >>>  	} while (nr_left > 0);
> >>>  }
> >>>
> >>> +static __rte_always_inline void
> >>> +write_back_completed_descs_packed(struct vhost_virtqueue *vq,
> >>> +				uint16_t n_buffers)
> >>> +{
> >>> +	uint16_t nr_left = n_buffers;
> >>> +	uint16_t from, to;
> >>> +
> >>> +	do {
> >>> +		from = vq->last_async_buffer_idx &
> >>> +						(vq->size - 1);
> >>> +		to = (from + nr_left) & (vq->size - 1);
> >>
> >> The size of the ring is not necessarily a power of two with packed ring.
> >
> > Sure.
> >
> >>
> >>> +		if (to > from) {
> >>> +			vhost_update_used_packed(vq,
> >>> +				vq->async_buffers_packed + from,
> >>> +				to - from);
> >>> +			vq->last_async_buffer_idx += nr_left;
> >>> +			nr_left = 0;
> >>> +		} else {
> >>> +			vhost_update_used_packed(vq,
> >>> +				vq->async_buffers_packed + from,
> >>> +				vq->size - from);
> >>> +			vq->last_async_buffer_idx +=
> >>> +						vq->size - from;
> >>> +			nr_left -= vq->size - from;
> >>> +		}
> >>> +	} while (nr_left > 0);
> >>> +}
> >>> +
> >>>  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t
> queue_id,
> >>>  		struct rte_mbuf **pkts, uint16_t count)  {
> >>>  	struct virtio_net *dev = get_device(vid);
> >>>  	struct vhost_virtqueue *vq;
> >>> -	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0;
> >>> +	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0, n_buffers =
> >>> +0;
> >>>  	uint16_t start_idx, pkts_idx, vq_size;
> >>>  	struct async_inflight_info *pkts_info;
> >>>  	uint16_t from, i;
> >>> @@ -1740,21 +2123,41 @@ uint16_t
> >> rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> >>>  		goto done;
> >>>  	}
> >>>
> >>> -	for (i = 0; i < n_pkts_put; i++) {
> >>> -		from = (start_idx + i) & (vq_size - 1);
> >>> -		n_descs += pkts_info[from].descs;
> >>> -		pkts[i] = pkts_info[from].mbuf;
> >>> +	if (vq_is_packed(dev)) {
> >>> +		for (i = 0; i < n_pkts_put; i++) {
> >>> +			from = (start_idx + i) & (vq_size - 1);
> >>
> >> Unlike split ring, packed ring size is not necessarily a power of 2.
> >
> > Sure.
> > Thanks.
> >
> >>
> >>> +			n_buffers += pkts_info[from].nr_buffers;
> >>> +			pkts[i] = pkts_info[from].mbuf;
> >>> +		}
> >>> +	} else {
> >>> +		for (i = 0; i < n_pkts_put; i++) {
> >>> +			from = (start_idx + i) & (vq_size - 1);
> >>> +			n_descs += pkts_info[from].descs;
> >>> +			pkts[i] = pkts_info[from].mbuf;
> >>> +		}
> >>>  	}
> >>> +
> >>>  	vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
> >>>  	vq->async_pkts_inflight_n -= n_pkts_put;
> >>>
> >>>  	if (likely(vq->enabled && vq->access_ok)) {
> >>> -		write_back_completed_descs_split(vq, n_descs);
> >>> +		if (vq_is_packed(dev)) {
> >>> +			write_back_completed_descs_packed(vq,
> >> n_buffers);
> >>>
> >>> -		__atomic_add_fetch(&vq->used->idx, n_descs,
> >> __ATOMIC_RELEASE);
> >>> -		vhost_vring_call_split(dev, vq);
> >>> -	} else
> >>> -		vq->last_async_desc_idx += n_descs;
> >>> +			vhost_vring_call_packed(dev, vq);
> >>> +		} else {
> >>> +			write_back_completed_descs_split(vq, n_descs);
> >>> +
> >>> +			__atomic_add_fetch(&vq->used->idx, n_descs,
> >>> +					__ATOMIC_RELEASE);
> >>> +			vhost_vring_call_split(dev, vq);
> >>> +		}
> >>> +	} else {
> >>> +		if (vq_is_packed(dev))
> >>> +			vq->last_async_buffer_idx += n_buffers;
> >>> +		else
> >>> +			vq->last_async_desc_idx += n_descs;
> >>> +	}
> >>>
> >>>  done:
> >>>  	rte_spinlock_unlock(&vq->access_lock);
> >>> @@ -1795,9 +2198,10 @@ virtio_dev_rx_async_submit(struct virtio_net
> >> *dev, uint16_t queue_id,
> >>>  	if (count == 0)
> >>>  		goto out;
> >>>
> >>> -	/* TODO: packed queue not implemented */
> >>>  	if (vq_is_packed(dev))
> >>> -		nb_tx = 0;
> >>> +		nb_tx = virtio_dev_rx_async_submit_packed(dev,
> >>> +				vq, queue_id, pkts, count, comp_pkts,
> >>> +				comp_count);
> >>>  	else
> >>>  		nb_tx = virtio_dev_rx_async_submit_split(dev,
> >>>  				vq, queue_id, pkts, count, comp_pkts,
> >>>
> >
diff mbox series

Patch

diff --git a/lib/librte_vhost/rte_vhost_async.h b/lib/librte_vhost/rte_vhost_async.h
index c855ff875..6faa31f5a 100644
--- a/lib/librte_vhost/rte_vhost_async.h
+++ b/lib/librte_vhost/rte_vhost_async.h
@@ -89,6 +89,7 @@  struct rte_vhost_async_channel_ops {
 struct async_inflight_info {
 	struct rte_mbuf *mbuf;
 	uint16_t descs; /* num of descs inflight */
+	uint16_t nr_buffers; /* num of buffers inflight for packed ring */
 };
 
 /**
diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c
index a70fe01d8..8c9935c0f 100644
--- a/lib/librte_vhost/vhost.c
+++ b/lib/librte_vhost/vhost.c
@@ -342,15 +342,21 @@  vhost_free_async_mem(struct vhost_virtqueue *vq)
 {
 	if (vq->async_pkts_info)
 		rte_free(vq->async_pkts_info);
-	if (vq->async_descs_split)
+	if (vq->async_buffers_packed) {
+		rte_free(vq->async_buffers_packed);
+		vq->async_buffers_packed = NULL;
+	}
+	if (vq->async_descs_split) {
 		rte_free(vq->async_descs_split);
+		vq->async_descs_split = NULL;
+	}
+
 	if (vq->it_pool)
 		rte_free(vq->it_pool);
 	if (vq->vec_pool)
 		rte_free(vq->vec_pool);
 
 	vq->async_pkts_info = NULL;
-	vq->async_descs_split = NULL;
 	vq->it_pool = NULL;
 	vq->vec_pool = NULL;
 }
@@ -1627,9 +1633,9 @@  int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
 		return -1;
 
 	/* packed queue is not supported */
-	if (unlikely(vq_is_packed(dev) || !f.async_inorder)) {
+	if (unlikely(!f.async_inorder)) {
 		VHOST_LOG_CONFIG(ERR,
-			"async copy is not supported on packed queue or non-inorder mode "
+			"async copy is not supported on non-inorder mode "
 			"(vid %d, qid: %d)\n", vid, queue_id);
 		return -1;
 	}
@@ -1667,11 +1673,18 @@  int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
 	vq->vec_pool = rte_malloc_socket(NULL,
 			VHOST_MAX_ASYNC_VEC * sizeof(struct iovec),
 			RTE_CACHE_LINE_SIZE, node);
-	vq->async_descs_split = rte_malloc_socket(NULL,
+	if (vq_is_packed(dev)) {
+		vq->async_buffers_packed = rte_malloc_socket(NULL,
+			vq->size * sizeof(struct vring_used_elem_packed),
+			RTE_CACHE_LINE_SIZE, node);
+	} else {
+		vq->async_descs_split = rte_malloc_socket(NULL,
 			vq->size * sizeof(struct vring_used_elem),
 			RTE_CACHE_LINE_SIZE, node);
-	if (!vq->async_descs_split || !vq->async_pkts_info ||
-		!vq->it_pool || !vq->vec_pool) {
+	}
+
+	if (!vq->async_buffers_packed || !vq->async_descs_split ||
+		!vq->async_pkts_info || !vq->it_pool || !vq->vec_pool) {
 		vhost_free_async_mem(vq);
 		VHOST_LOG_CONFIG(ERR,
 				"async register failed: cannot allocate memory for vq data "
diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
index f628714c2..fe131ae8f 100644
--- a/lib/librte_vhost/vhost.h
+++ b/lib/librte_vhost/vhost.h
@@ -201,9 +201,14 @@  struct vhost_virtqueue {
 	uint16_t	async_pkts_idx;
 	uint16_t	async_pkts_inflight_n;
 	uint16_t	async_last_pkts_n;
-	struct vring_used_elem  *async_descs_split;
+	union {
+		struct vring_used_elem  *async_descs_split;
+		struct vring_used_elem_packed *async_buffers_packed;
+	};
 	uint16_t async_desc_idx;
+	uint16_t async_packed_buffer_idx;
 	uint16_t last_async_desc_idx;
+	uint16_t last_async_buffer_idx;
 
 	/* vq async features */
 	bool		async_inorder;
diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index c43ab0093..410be9678 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -363,14 +363,14 @@  vhost_shadow_dequeue_single_packed_inorder(struct vhost_virtqueue *vq,
 }
 
 static __rte_always_inline void
-vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
-				   struct vhost_virtqueue *vq,
-				   uint32_t len[],
-				   uint16_t id[],
-				   uint16_t count[],
+vhost_shadow_enqueue_packed(struct vhost_virtqueue *vq,
+				   uint32_t *len,
+				   uint16_t *id,
+				   uint16_t *count,
 				   uint16_t num_buffers)
 {
 	uint16_t i;
+
 	for (i = 0; i < num_buffers; i++) {
 		/* enqueue shadow flush action aligned with batch num */
 		if (!vq->shadow_used_idx)
@@ -382,6 +382,17 @@  vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
 		vq->shadow_aligned_idx += count[i];
 		vq->shadow_used_idx++;
 	}
+}
+
+static __rte_always_inline void
+vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
+				   struct vhost_virtqueue *vq,
+				   uint32_t *len,
+				   uint16_t *id,
+				   uint16_t *count,
+				   uint16_t num_buffers)
+{
+	vhost_shadow_enqueue_packed(vq, len, id, count, num_buffers);
 
 	if (vq->shadow_aligned_idx >= PACKED_BATCH_SIZE) {
 		do_data_copy_enqueue(dev, vq);
@@ -1481,6 +1492,62 @@  shadow_ring_store(struct vhost_virtqueue *vq,  void *shadow_ring, void *d_ring,
 	}
 }
 
+static __rte_always_inline void
+vhost_update_used_packed(struct vhost_virtqueue *vq,
+			struct vring_used_elem_packed *shadow_ring,
+			uint16_t count)
+{
+	if (count == 0)
+		return;
+
+	int i;
+	uint16_t used_idx = vq->last_used_idx;
+	uint16_t head_idx = vq->last_used_idx;
+	uint16_t head_flags = 0;
+
+	/* Split loop in two to save memory barriers */
+	for (i = 0; i < count; i++) {
+		vq->desc_packed[used_idx].id = shadow_ring[i].id;
+		vq->desc_packed[used_idx].len = shadow_ring[i].len;
+
+		used_idx += shadow_ring[i].count;
+		if (used_idx >= vq->size)
+			used_idx -= vq->size;
+	}
+
+	/* The ordering for storing desc flags needs to be enforced. */
+	rte_atomic_thread_fence(__ATOMIC_RELEASE);
+
+	for (i = 0; i < count; i++) {
+		uint16_t flags;
+
+		if (vq->shadow_used_packed[i].len)
+			flags = VRING_DESC_F_WRITE;
+		else
+			flags = 0;
+
+		if (vq->used_wrap_counter) {
+			flags |= VRING_DESC_F_USED;
+			flags |= VRING_DESC_F_AVAIL;
+		} else {
+			flags &= ~VRING_DESC_F_USED;
+			flags &= ~VRING_DESC_F_AVAIL;
+		}
+
+		if (i > 0) {
+			vq->desc_packed[vq->last_used_idx].flags = flags;
+
+		} else {
+			head_idx = vq->last_used_idx;
+			head_flags = flags;
+		}
+
+		vq_inc_last_used_packed(vq, shadow_ring[i].count);
+	}
+
+	vq->desc_packed[head_idx].flags = head_flags;
+}
+
 static __rte_noinline uint32_t
 virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	struct vhost_virtqueue *vq, uint16_t queue_id,
@@ -1656,6 +1723,294 @@  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	return pkt_idx;
 }
 
+static __rte_always_inline int
+vhost_enqueue_async_single_packed(struct virtio_net *dev,
+			    struct vhost_virtqueue *vq,
+			    struct rte_mbuf *pkt,
+			    struct buf_vector *buf_vec,
+			    uint16_t *nr_descs,
+			    uint16_t *nr_buffers,
+			    struct vring_packed_desc *async_descs,
+			    struct iovec *src_iovec, struct iovec *dst_iovec,
+			    struct rte_vhost_iov_iter *src_it,
+			    struct rte_vhost_iov_iter *dst_it)
+{
+	uint16_t nr_vec = 0;
+	uint16_t avail_idx = vq->last_avail_idx;
+	uint16_t max_tries, tries = 0;
+	uint16_t buf_id = 0;
+	uint32_t len = 0;
+	uint16_t desc_count = 0;
+	uint32_t size = pkt->pkt_len + sizeof(struct virtio_net_hdr_mrg_rxbuf);
+	uint32_t buffer_len[vq->size];
+	uint16_t buffer_buf_id[vq->size];
+	uint16_t buffer_desc_count[vq->size];
+	*nr_buffers = 0;
+
+	if (rxvq_is_mergeable(dev))
+		max_tries = vq->size - 1;
+	else
+		max_tries = 1;
+
+	while (size > 0) {
+		/*
+		 * if we tried all available ring items, and still
+		 * can't get enough buf, it means something abnormal
+		 * happened.
+		 */
+		if (unlikely(++tries > max_tries))
+			return -1;
+
+		if (unlikely(fill_vec_buf_packed(dev, vq,
+						avail_idx, &desc_count,
+						buf_vec, &nr_vec,
+						&buf_id, &len,
+						VHOST_ACCESS_RW) < 0))
+			return -1;
+
+		len = RTE_MIN(len, size);
+		size -= len;
+
+		buffer_len[*nr_buffers] = len;
+		buffer_buf_id[*nr_buffers] = buf_id;
+		buffer_desc_count[*nr_buffers] = desc_count;
+		*nr_buffers += 1;
+
+		*nr_descs += desc_count;
+		avail_idx += desc_count;
+		if (avail_idx >= vq->size)
+			avail_idx -= vq->size;
+	}
+
+	if (async_mbuf_to_desc(dev, vq, pkt, buf_vec, nr_vec, *nr_buffers,
+		src_iovec, dst_iovec, src_it, dst_it) < 0)
+		return -1;
+	/* store descriptors for DMA */
+	if (avail_idx >= *nr_descs)
+		rte_memcpy(async_descs,
+			&vq->desc_packed[vq->last_avail_idx],
+			*nr_descs * sizeof(struct vring_packed_desc));
+	else {
+		uint16_t nr_copy = vq->size - vq->last_avail_idx;
+		rte_memcpy(async_descs,
+			&vq->desc_packed[vq->last_avail_idx],
+			nr_copy * sizeof(struct vring_packed_desc));
+		rte_memcpy(async_descs + nr_copy,
+			vq->desc_packed, (*nr_descs - nr_copy) *
+			sizeof(struct vring_packed_desc));
+	}
+
+	vhost_shadow_enqueue_packed(vq, buffer_len, buffer_buf_id,
+					   buffer_desc_count, *nr_buffers);
+
+	return 0;
+}
+
+static __rte_always_inline int16_t
+virtio_dev_rx_async_single_packed(struct virtio_net *dev,
+			    struct vhost_virtqueue *vq,
+			    struct rte_mbuf *pkt,
+			    uint16_t *nr_descs, uint16_t *nr_buffers,
+			    struct vring_packed_desc *async_descs,
+			    struct iovec *src_iovec, struct iovec *dst_iovec,
+			    struct rte_vhost_iov_iter *src_it,
+			    struct rte_vhost_iov_iter *dst_it)
+{
+	struct buf_vector buf_vec[BUF_VECTOR_MAX];
+	*nr_descs = 0;
+	*nr_buffers = 0;
+
+	if (unlikely(vhost_enqueue_async_single_packed(dev, vq, pkt, buf_vec,
+						 nr_descs,
+						 nr_buffers,
+						 async_descs,
+						 src_iovec, dst_iovec,
+						 src_it, dst_it) < 0)) {
+		VHOST_LOG_DATA(DEBUG,
+				"(%d) failed to get enough desc from vring\n",
+				dev->vid);
+		return -1;
+	}
+
+	VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
+			dev->vid, vq->last_avail_idx,
+			vq->last_avail_idx + *nr_descs);
+
+	return 0;
+}
+
+static __rte_noinline uint32_t
+virtio_dev_rx_async_submit_packed(struct virtio_net *dev,
+	struct vhost_virtqueue *vq, uint16_t queue_id,
+	struct rte_mbuf **pkts, uint32_t count,
+	struct rte_mbuf **comp_pkts, uint32_t *comp_count)
+{
+	uint32_t pkt_idx = 0, pkt_burst_idx = 0;
+	uint16_t async_descs_idx = 0;
+	uint16_t num_buffers;
+	uint16_t num_desc;
+
+	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
+	struct iovec *vec_pool = vq->vec_pool;
+	struct rte_vhost_async_desc tdes[MAX_PKT_BURST];
+	struct iovec *src_iovec = vec_pool;
+	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
+	struct rte_vhost_iov_iter *src_it = it_pool;
+	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
+	uint16_t slot_idx = 0;
+	uint16_t segs_await = 0;
+	uint16_t iovec_idx = 0, it_idx = 0;
+	struct async_inflight_info *pkts_info = vq->async_pkts_info;
+	uint32_t n_pkts = 0, pkt_err = 0;
+	uint32_t num_async_pkts = 0, num_done_pkts = 0;
+	struct vring_packed_desc async_descs[vq->size];
+
+	rte_prefetch0(&vq->desc_packed[vq->last_avail_idx & (vq->size - 1)]);
+
+	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
+		if (unlikely(virtio_dev_rx_async_single_packed(dev, vq,
+						pkts[pkt_idx],
+						&num_desc, &num_buffers,
+						&async_descs[async_descs_idx],
+						&src_iovec[iovec_idx],
+						&dst_iovec[iovec_idx],
+						&src_it[it_idx],
+						&dst_it[it_idx]) < 0))
+			break;
+
+		VHOST_LOG_DATA(DEBUG, "(%d) current index %d | end index %d\n",
+			dev->vid, vq->last_avail_idx,
+			vq->last_avail_idx + num_desc);
+
+		slot_idx = (vq->async_pkts_idx + num_async_pkts) &
+			(vq->size - 1);
+		if (src_it[it_idx].count) {
+			uint16_t from, to;
+
+			async_descs_idx += num_desc;
+			async_fill_desc(&tdes[pkt_burst_idx++], &src_it[it_idx],
+					&dst_it[it_idx]);
+			pkts_info[slot_idx].descs = num_desc;
+			pkts_info[slot_idx].nr_buffers = num_buffers;
+			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
+			num_async_pkts++;
+			iovec_idx += src_it[it_idx].nr_segs;
+			it_idx += 2;
+
+			segs_await += src_it[it_idx].nr_segs;
+
+			/**
+			 * recover shadow used ring and keep DMA-occupied
+			 * descriptors.
+			 */
+			from = vq->shadow_used_idx - num_buffers;
+			to = vq->async_packed_buffer_idx & (vq->size - 1);
+			shadow_ring_store(vq, vq->shadow_used_packed,
+					vq->async_buffers_packed,
+					from, to, num_buffers,
+					sizeof(struct vring_used_elem_packed));
+
+			vq->async_packed_buffer_idx += num_buffers;
+			vq->shadow_used_idx -= num_buffers;
+		} else
+			comp_pkts[num_done_pkts++] = pkts[pkt_idx];
+
+		vq_inc_last_avail_packed(vq, num_desc);
+
+		/*
+		 * conditions to trigger async device transfer:
+		 * - buffered packet number reaches transfer threshold
+		 * - unused async iov number is less than max vhost vector
+		 */
+		if (unlikely(pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
+			((VHOST_MAX_ASYNC_VEC >> 1) - segs_await <
+			BUF_VECTOR_MAX))) {
+			n_pkts = vq->async_ops.transfer_data(dev->vid,
+					queue_id, tdes, 0, pkt_burst_idx);
+			iovec_idx = 0;
+			it_idx = 0;
+			segs_await = 0;
+			vq->async_pkts_inflight_n += n_pkts;
+
+			if (unlikely(n_pkts < pkt_burst_idx)) {
+				/*
+				 * log error packets number here and do actual
+				 * error processing when applications poll
+				 * completion
+				 */
+				pkt_err = pkt_burst_idx - n_pkts;
+				pkt_burst_idx = 0;
+				pkt_idx++;
+				break;
+			}
+
+			pkt_burst_idx = 0;
+		}
+	}
+
+	if (pkt_burst_idx) {
+		n_pkts = vq->async_ops.transfer_data(dev->vid,
+				queue_id, tdes, 0, pkt_burst_idx);
+		vq->async_pkts_inflight_n += n_pkts;
+
+		if (unlikely(n_pkts < pkt_burst_idx))
+			pkt_err = pkt_burst_idx - n_pkts;
+	}
+
+	do_data_copy_enqueue(dev, vq);
+
+	if (unlikely(pkt_err)) {
+		uint16_t descs_err = 0;
+		uint16_t buffers_err = 0;
+
+		num_async_pkts -= pkt_err;
+		pkt_idx -= pkt_err;
+	/* calculate the sum of buffers and descs of DMA-error packets. */
+		while (pkt_err-- > 0) {
+			descs_err +=
+				pkts_info[slot_idx & (vq->size - 1)].descs;
+			buffers_err +=
+				pkts_info[slot_idx & (vq->size - 1)].nr_buffers;
+			slot_idx--;
+		}
+
+		vq->async_packed_buffer_idx -= buffers_err;
+
+		if (vq->last_avail_idx >= descs_err) {
+			vq->last_avail_idx -= descs_err;
+
+			rte_memcpy(&vq->desc_packed[vq->last_avail_idx],
+				&async_descs[async_descs_idx - descs_err],
+				descs_err * sizeof(struct vring_packed_desc));
+		} else {
+			uint16_t nr_copy;
+
+			vq->last_avail_idx = vq->last_avail_idx + vq->size
+						- descs_err;
+			nr_copy = vq->size - vq->last_avail_idx;
+			rte_memcpy(&vq->desc_packed[vq->last_avail_idx],
+				&async_descs[async_descs_idx - descs_err],
+				nr_copy * sizeof(struct vring_packed_desc));
+			descs_err -= nr_copy;
+			rte_memcpy(vq->desc_packed,
+				&async_descs[async_descs_idx - descs_err],
+				descs_err * sizeof(struct vring_packed_desc));
+			vq->avail_wrap_counter ^= 1;
+		}
+
+		num_done_pkts = pkt_idx - num_async_pkts;
+	}
+	vq->async_pkts_idx += num_async_pkts;
+	*comp_count = num_done_pkts;
+
+	if (likely(vq->shadow_used_idx)) {
+		vhost_flush_enqueue_shadow_packed(dev, vq);
+		vhost_vring_call_packed(dev, vq);
+	}
+
+	return pkt_idx;
+}
+
 static __rte_always_inline void
 write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t n_descs)
 {
@@ -1693,12 +2048,40 @@  write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t n_descs)
 	} while (nr_left > 0);
 }
 
+static __rte_always_inline void
+write_back_completed_descs_packed(struct vhost_virtqueue *vq,
+				uint16_t n_buffers)
+{
+	uint16_t nr_left = n_buffers;
+	uint16_t from, to;
+
+	do {
+		from = vq->last_async_buffer_idx &
+						(vq->size - 1);
+		to = (from + nr_left) & (vq->size - 1);
+		if (to > from) {
+			vhost_update_used_packed(vq,
+				vq->async_buffers_packed + from,
+				to - from);
+			vq->last_async_buffer_idx += nr_left;
+			nr_left = 0;
+		} else {
+			vhost_update_used_packed(vq,
+				vq->async_buffers_packed + from,
+				vq->size - from);
+			vq->last_async_buffer_idx +=
+						vq->size - from;
+			nr_left -= vq->size - from;
+		}
+	} while (nr_left > 0);
+}
+
 uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 		struct rte_mbuf **pkts, uint16_t count)
 {
 	struct virtio_net *dev = get_device(vid);
 	struct vhost_virtqueue *vq;
-	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0;
+	uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0, n_buffers = 0;
 	uint16_t start_idx, pkts_idx, vq_size;
 	struct async_inflight_info *pkts_info;
 	uint16_t from, i;
@@ -1740,21 +2123,41 @@  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 		goto done;
 	}
 
-	for (i = 0; i < n_pkts_put; i++) {
-		from = (start_idx + i) & (vq_size - 1);
-		n_descs += pkts_info[from].descs;
-		pkts[i] = pkts_info[from].mbuf;
+	if (vq_is_packed(dev)) {
+		for (i = 0; i < n_pkts_put; i++) {
+			from = (start_idx + i) & (vq_size - 1);
+			n_buffers += pkts_info[from].nr_buffers;
+			pkts[i] = pkts_info[from].mbuf;
+		}
+	} else {
+		for (i = 0; i < n_pkts_put; i++) {
+			from = (start_idx + i) & (vq_size - 1);
+			n_descs += pkts_info[from].descs;
+			pkts[i] = pkts_info[from].mbuf;
+		}
 	}
+
 	vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
 	vq->async_pkts_inflight_n -= n_pkts_put;
 
 	if (likely(vq->enabled && vq->access_ok)) {
-		write_back_completed_descs_split(vq, n_descs);
+		if (vq_is_packed(dev)) {
+			write_back_completed_descs_packed(vq, n_buffers);
 
-		__atomic_add_fetch(&vq->used->idx, n_descs, __ATOMIC_RELEASE);
-		vhost_vring_call_split(dev, vq);
-	} else
-		vq->last_async_desc_idx += n_descs;
+			vhost_vring_call_packed(dev, vq);
+		} else {
+			write_back_completed_descs_split(vq, n_descs);
+
+			__atomic_add_fetch(&vq->used->idx, n_descs,
+					__ATOMIC_RELEASE);
+			vhost_vring_call_split(dev, vq);
+		}
+	} else {
+		if (vq_is_packed(dev))
+			vq->last_async_buffer_idx += n_buffers;
+		else
+			vq->last_async_desc_idx += n_descs;
+	}
 
 done:
 	rte_spinlock_unlock(&vq->access_lock);
@@ -1795,9 +2198,10 @@  virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
 	if (count == 0)
 		goto out;
 
-	/* TODO: packed queue not implemented */
 	if (vq_is_packed(dev))
-		nb_tx = 0;
+		nb_tx = virtio_dev_rx_async_submit_packed(dev,
+				vq, queue_id, pkts, count, comp_pkts,
+				comp_count);
 	else
 		nb_tx = virtio_dev_rx_async_submit_split(dev,
 				vq, queue_id, pkts, count, comp_pkts,