[v5,1/4] vhost: abstract and reorganize async split ring code

Message ID 20210412113430.17587-2-Cheng1.jiang@intel.com (mailing list archive)
State Changes Requested, archived
Delegated to: Maxime Coquelin
Headers
Series add support for packed ring in async vhost |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Jiang, Cheng1 April 12, 2021, 11:34 a.m. UTC
  In order to improve code efficiency and readability when async packed
ring support is enabled. This patch abstract some functions like
shadow_ring_store and write_back_completed_descs_split. And improve
the efficiency of some pointer offset calculation.

Signed-off-by: Cheng Jiang <Cheng1.jiang@intel.com>
---
 lib/librte_vhost/virtio_net.c | 146 +++++++++++++++++++---------------
 1 file changed, 84 insertions(+), 62 deletions(-)
  

Comments

Hu, Jiayu April 13, 2021, 2:44 a.m. UTC | #1
Hi Cheng,

Some comments inline.

> -----Original Message-----
> From: Jiang, Cheng1 <cheng1.jiang@intel.com>
> Sent: Monday, April 12, 2021 7:34 PM
> To: maxime.coquelin@redhat.com; Xia, Chenbo <chenbo.xia@intel.com>
> Cc: dev@dpdk.org; Hu, Jiayu <jiayu.hu@intel.com>; Yang, YvonneX
> <yvonnex.yang@intel.com>; Wang, Yinan <yinan.wang@intel.com>; Liu,
> Yong <yong.liu@intel.com>; Jiang, Cheng1 <cheng1.jiang@intel.com>
> Subject: [PATCH v5 1/4] vhost: abstract and reorganize async split ring code
> 
> In order to improve code efficiency and readability when async packed
> ring support is enabled. This patch abstract some functions like
> shadow_ring_store and write_back_completed_descs_split. And improve
> the efficiency of some pointer offset calculation.

Need to improve grammar for commit log, as there is typo and incomplete
sentence.

> 
> Signed-off-by: Cheng Jiang <Cheng1.jiang@intel.com>
> ---
>  lib/librte_vhost/virtio_net.c | 146 +++++++++++++++++++---------------
>  1 file changed, 84 insertions(+), 62 deletions(-)
> 
> diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> index ff3987860..c43ab0093 100644
> --- a/lib/librte_vhost/virtio_net.c
> +++ b/lib/librte_vhost/virtio_net.c
> @@ -1458,6 +1458,29 @@ virtio_dev_rx_async_get_info_idx(uint16_t
> pkts_idx,
>  		(vq_size - n_inflight + pkts_idx) & (vq_size - 1);
>  }
> 
> +static __rte_always_inline void
> +shadow_ring_store(struct vhost_virtqueue *vq,  void *shadow_ring, void
> *d_ring,
> +		uint16_t s_idx, uint16_t d_idx,
> +		uint16_t count, uint16_t elem_size)
> +{
> +	if (d_idx + count <= vq->size) {
> +		rte_memcpy((void *)((uintptr_t)d_ring + d_idx * elem_size),
> +			(void *)((uintptr_t)shadow_ring + s_idx * elem_size),
> +			count * elem_size);
> +	} else {
> +		uint16_t size = vq->size - d_idx;
> +
> +		rte_memcpy((void *)((uintptr_t)d_ring + d_idx * elem_size),
> +			(void *)((uintptr_t)shadow_ring + s_idx * elem_size),
> +			size * elem_size);
> +
> +		rte_memcpy((void *)((uintptr_t)d_ring),
> +			(void *)((uintptr_t)shadow_ring +
> +				(s_idx + size) * elem_size),
> +			(count - size) * elem_size);
> +	}
> +}
> +
>  static __rte_noinline uint32_t
>  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
>  	struct vhost_virtqueue *vq, uint16_t queue_id,
> @@ -1478,6 +1501,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net
> *dev,
>  	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
>  	uint16_t slot_idx = 0;
>  	uint16_t segs_await = 0;
> +	uint16_t iovec_idx = 0, it_idx = 0;
>  	struct async_inflight_info *pkts_info = vq->async_pkts_info;
>  	uint32_t n_pkts = 0, pkt_err = 0;
>  	uint32_t num_async_pkts = 0, num_done_pkts = 0;
> @@ -1513,27 +1537,32 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
> 
>  		if (async_mbuf_to_desc(dev, vq, pkts[pkt_idx],
>  				buf_vec, nr_vec, num_buffers,
> -				src_iovec, dst_iovec, src_it, dst_it) < 0) {
> +				&src_iovec[iovec_idx],
> +				&dst_iovec[iovec_idx],
> +				&src_it[it_idx],
> +				&dst_it[it_idx]) < 0) {

When use index, it's strange to get src and dst iov_iter from dst_it and src_it respectively,
as they are not start addresses of two separated iov_iter arrays but have overlapped
elements. IMO, there is no need to use src/dst_it, as they can be simply indexed by
it_pool[it_idx] and it_pool[it_idx+1].

>  			vq->shadow_used_idx -= num_buffers;
>  			break;
>  		}
> 
>  		slot_idx = (vq->async_pkts_idx + num_async_pkts) &
>  			(vq->size - 1);
> -		if (src_it->count) {
> +		if (src_it[it_idx].count) {
>  			uint16_t from, to;
> 
> -			async_fill_desc(&tdes[pkt_burst_idx++], src_it, dst_it);
> +			async_fill_desc(&tdes[pkt_burst_idx++],
> +				&src_it[it_idx],
> +				&dst_it[it_idx]);
>  			pkts_info[slot_idx].descs = num_buffers;
>  			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
>  			async_pkts_log[num_async_pkts].pkt_idx = pkt_idx;
>  			async_pkts_log[num_async_pkts++].last_avail_idx =
>  				vq->last_avail_idx;
> -			src_iovec += src_it->nr_segs;
> -			dst_iovec += dst_it->nr_segs;
> -			src_it += 2;
> -			dst_it += 2;
> -			segs_await += src_it->nr_segs;
> +
> +			iovec_idx += src_it[it_idx].nr_segs;
> +			it_idx += 2;
> +
> +			segs_await += src_it[it_idx].nr_segs;
> 
>  			/**
>  			 * recover shadow used ring and keep DMA-occupied
> @@ -1541,23 +1570,12 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
>  			 */
>  			from = vq->shadow_used_idx - num_buffers;
>  			to = vq->async_desc_idx & (vq->size - 1);
> -			if (num_buffers + to <= vq->size) {
> -				rte_memcpy(&vq->async_descs_split[to],
> -						&vq-
> >shadow_used_split[from],
> -						num_buffers *
> -						sizeof(struct
> vring_used_elem));
> -			} else {
> -				int size = vq->size - to;
> -
> -				rte_memcpy(&vq->async_descs_split[to],
> -						&vq-
> >shadow_used_split[from],
> -						size *
> -						sizeof(struct
> vring_used_elem));
> -				rte_memcpy(vq->async_descs_split,
> -						&vq-
> >shadow_used_split[from +
> -						size], (num_buffers - size) *
> -					   sizeof(struct vring_used_elem));
> -			}
> +
> +			shadow_ring_store(vq, vq->shadow_used_split,
> +					vq->async_descs_split,
> +					from, to, num_buffers,
> +					sizeof(struct vring_used_elem));

This function is to store DMA-occupied desc, but " shadow_ring_store" is not a good name
for it. In addition, I think there is no need to pass vq as a parameter. What you need is the
size of shadow ring and async desc ring.

Thanks,
Jiayu
> +
>  			vq->async_desc_idx += num_buffers;
>  			vq->shadow_used_idx -= num_buffers;
>  		} else
> @@ -1575,10 +1593,9 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
>  			BUF_VECTOR_MAX))) {
>  			n_pkts = vq->async_ops.transfer_data(dev->vid,
>  					queue_id, tdes, 0, pkt_burst_idx);
> -			src_iovec = vec_pool;
> -			dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >>
> 1);
> -			src_it = it_pool;
> -			dst_it = it_pool + 1;
> +			iovec_idx = 0;
> +			it_idx = 0;
> +
>  			segs_await = 0;
>  			vq->async_pkts_inflight_n += n_pkts;
> 
> @@ -1639,6 +1656,43 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
>  	return pkt_idx;
>  }
> 
> +static __rte_always_inline void
> +write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t
> n_descs)
> +{
> +	uint16_t nr_left = n_descs;
> +	uint16_t nr_copy;
> +	uint16_t to, from;
> +
> +	do {
> +		from = vq->last_async_desc_idx & (vq->size - 1);
> +		nr_copy = nr_left + from <= vq->size ? nr_left :
> +			vq->size - from;
> +		to = vq->last_used_idx & (vq->size - 1);
> +
> +		if (to + nr_copy <= vq->size) {
> +			rte_memcpy(&vq->used->ring[to],
> +					&vq->async_descs_split[from],
> +					nr_copy *
> +					sizeof(struct vring_used_elem));
> +		} else {
> +			uint16_t size = vq->size - to;
> +
> +			rte_memcpy(&vq->used->ring[to],
> +					&vq->async_descs_split[from],
> +					size *
> +					sizeof(struct vring_used_elem));
> +			rte_memcpy(vq->used->ring,
> +					&vq->async_descs_split[from +
> +					size], (nr_copy - size) *
> +					sizeof(struct vring_used_elem));
> +		}
> +
> +		vq->last_async_desc_idx += nr_copy;
> +		vq->last_used_idx += nr_copy;
> +		nr_left -= nr_copy;
> +	} while (nr_left > 0);
> +}
> +
>  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
>  		struct rte_mbuf **pkts, uint16_t count)
>  {
> @@ -1695,39 +1749,7 @@ uint16_t rte_vhost_poll_enqueue_completed(int
> vid, uint16_t queue_id,
>  	vq->async_pkts_inflight_n -= n_pkts_put;
> 
>  	if (likely(vq->enabled && vq->access_ok)) {
> -		uint16_t nr_left = n_descs;
> -		uint16_t nr_copy;
> -		uint16_t to;
> -
> -		/* write back completed descriptors to used ring */
> -		do {
> -			from = vq->last_async_desc_idx & (vq->size - 1);
> -			nr_copy = nr_left + from <= vq->size ? nr_left :
> -				vq->size - from;
> -			to = vq->last_used_idx & (vq->size - 1);
> -
> -			if (to + nr_copy <= vq->size) {
> -				rte_memcpy(&vq->used->ring[to],
> -						&vq-
> >async_descs_split[from],
> -						nr_copy *
> -						sizeof(struct
> vring_used_elem));
> -			} else {
> -				uint16_t size = vq->size - to;
> -
> -				rte_memcpy(&vq->used->ring[to],
> -						&vq-
> >async_descs_split[from],
> -						size *
> -						sizeof(struct
> vring_used_elem));
> -				rte_memcpy(vq->used->ring,
> -						&vq->async_descs_split[from
> +
> -						size], (nr_copy - size) *
> -						sizeof(struct
> vring_used_elem));
> -			}
> -
> -			vq->last_async_desc_idx += nr_copy;
> -			vq->last_used_idx += nr_copy;
> -			nr_left -= nr_copy;
> -		} while (nr_left > 0);
> +		write_back_completed_descs_split(vq, n_descs);
> 
>  		__atomic_add_fetch(&vq->used->idx, n_descs,
> __ATOMIC_RELEASE);
>  		vhost_vring_call_split(dev, vq);
> --
> 2.29.2
  
Jiang, Cheng1 April 13, 2021, 3:26 a.m. UTC | #2
Hi Jiayu,

> -----Original Message-----
> From: Hu, Jiayu <jiayu.hu@intel.com>
> Sent: Tuesday, April 13, 2021 10:44 AM
> To: Jiang, Cheng1 <cheng1.jiang@intel.com>; maxime.coquelin@redhat.com;
> Xia, Chenbo <chenbo.xia@intel.com>
> Cc: dev@dpdk.org; Yang, YvonneX <yvonnex.yang@intel.com>; Wang, Yinan
> <yinan.wang@intel.com>; Liu, Yong <yong.liu@intel.com>
> Subject: RE: [PATCH v5 1/4] vhost: abstract and reorganize async split ring
> code
> 
> Hi Cheng,
> 
> Some comments inline.
> 
> > -----Original Message-----
> > From: Jiang, Cheng1 <cheng1.jiang@intel.com>
> > Sent: Monday, April 12, 2021 7:34 PM
> > To: maxime.coquelin@redhat.com; Xia, Chenbo <chenbo.xia@intel.com>
> > Cc: dev@dpdk.org; Hu, Jiayu <jiayu.hu@intel.com>; Yang, YvonneX
> > <yvonnex.yang@intel.com>; Wang, Yinan <yinan.wang@intel.com>; Liu,
> > Yong <yong.liu@intel.com>; Jiang, Cheng1 <cheng1.jiang@intel.com>
> > Subject: [PATCH v5 1/4] vhost: abstract and reorganize async split
> > ring code
> >
> > In order to improve code efficiency and readability when async packed
> > ring support is enabled. This patch abstract some functions like
> > shadow_ring_store and write_back_completed_descs_split. And improve
> > the efficiency of some pointer offset calculation.
> 
> Need to improve grammar for commit log, as there is typo and incomplete
> sentence.
> 

Sure, I'll fix it in the next version.

> >
> > Signed-off-by: Cheng Jiang <Cheng1.jiang@intel.com>
> > ---
> >  lib/librte_vhost/virtio_net.c | 146
> > +++++++++++++++++++---------------
> >  1 file changed, 84 insertions(+), 62 deletions(-)
> >
> > diff --git a/lib/librte_vhost/virtio_net.c
> > b/lib/librte_vhost/virtio_net.c index ff3987860..c43ab0093 100644
> > --- a/lib/librte_vhost/virtio_net.c
> > +++ b/lib/librte_vhost/virtio_net.c
> > @@ -1458,6 +1458,29 @@ virtio_dev_rx_async_get_info_idx(uint16_t
> > pkts_idx,
> >  (vq_size - n_inflight + pkts_idx) & (vq_size - 1);  }
> >
> > +static __rte_always_inline void
> > +shadow_ring_store(struct vhost_virtqueue *vq,  void *shadow_ring,
> > +void
> > *d_ring,
> > +uint16_t s_idx, uint16_t d_idx,
> > +uint16_t count, uint16_t elem_size)
> > +{
> > +if (d_idx + count <= vq->size) {
> > +rte_memcpy((void *)((uintptr_t)d_ring + d_idx * elem_size), (void
> > +*)((uintptr_t)shadow_ring + s_idx * elem_size), count * elem_size); }
> > +else { uint16_t size = vq->size - d_idx;
> > +
> > +rte_memcpy((void *)((uintptr_t)d_ring + d_idx * elem_size), (void
> > +*)((uintptr_t)shadow_ring + s_idx * elem_size), size * elem_size);
> > +
> > +rte_memcpy((void *)((uintptr_t)d_ring), (void
> > +*)((uintptr_t)shadow_ring + (s_idx + size) * elem_size), (count -
> > +size) * elem_size); } }
> > +
> >  static __rte_noinline uint32_t
> >  virtio_dev_rx_async_submit_split(struct virtio_net *dev,  struct
> > vhost_virtqueue *vq, uint16_t queue_id, @@ -1478,6 +1501,7 @@
> > virtio_dev_rx_async_submit_split(struct virtio_net *dev,  struct
> > rte_vhost_iov_iter *dst_it = it_pool + 1;  uint16_t slot_idx = 0;
> > uint16_t segs_await = 0;
> > +uint16_t iovec_idx = 0, it_idx = 0;
> >  struct async_inflight_info *pkts_info = vq->async_pkts_info;
> > uint32_t n_pkts = 0, pkt_err = 0;  uint32_t num_async_pkts = 0,
> > num_done_pkts = 0; @@ -1513,27 +1537,32 @@
> > virtio_dev_rx_async_submit_split(struct
> > virtio_net *dev,
> >
> >  if (async_mbuf_to_desc(dev, vq, pkts[pkt_idx],  buf_vec, nr_vec,
> > num_buffers, -src_iovec, dst_iovec, src_it, dst_it) < 0) {
> > +&src_iovec[iovec_idx],
> > +&dst_iovec[iovec_idx],
> > +&src_it[it_idx],
> > +&dst_it[it_idx]) < 0) {
> 
> When use index, it's strange to get src and dst iov_iter from dst_it and src_it
> respectively, as they are not start addresses of two separated iov_iter arrays
> but have overlapped elements. IMO, there is no need to use src/dst_it, as
> they can be simply indexed by it_pool[it_idx] and it_pool[it_idx+1].

Yes, I think it make sense, I'll fix it in the next version.

> 
> >  vq->shadow_used_idx -= num_buffers;
> >  break;
> >  }
> >
> >  slot_idx = (vq->async_pkts_idx + num_async_pkts) &  (vq->size - 1);
> > -if (src_it->count) {
> > +if (src_it[it_idx].count) {
> >  uint16_t from, to;
> >
> > -async_fill_desc(&tdes[pkt_burst_idx++], src_it, dst_it);
> > +async_fill_desc(&tdes[pkt_burst_idx++],
> > +&src_it[it_idx],
> > +&dst_it[it_idx]);
> >  pkts_info[slot_idx].descs = num_buffers;  pkts_info[slot_idx].mbuf =
> > pkts[pkt_idx];  async_pkts_log[num_async_pkts].pkt_idx = pkt_idx;
> > async_pkts_log[num_async_pkts++].last_avail_idx =  vq->last_avail_idx;
> > -src_iovec += src_it->nr_segs; -dst_iovec += dst_it->nr_segs; -src_it
> > += 2; -dst_it += 2; -segs_await += src_it->nr_segs;
> > +
> > +iovec_idx += src_it[it_idx].nr_segs;
> > +it_idx += 2;
> > +
> > +segs_await += src_it[it_idx].nr_segs;
> >
> >  /**
> >   * recover shadow used ring and keep DMA-occupied @@ -1541,23
> > +1570,12 @@ virtio_dev_rx_async_submit_split(struct
> > virtio_net *dev,
> >   */
> >  from = vq->shadow_used_idx - num_buffers;  to = vq->async_desc_idx &
> > (vq->size - 1); -if (num_buffers + to <= vq->size) {
> > -rte_memcpy(&vq->async_descs_split[to],
> > -&vq-
> > >shadow_used_split[from],
> > -num_buffers *
> > -sizeof(struct
> > vring_used_elem));
> > -} else {
> > -int size = vq->size - to;
> > -
> > -rte_memcpy(&vq->async_descs_split[to],
> > -&vq-
> > >shadow_used_split[from],
> > -size *
> > -sizeof(struct
> > vring_used_elem));
> > -rte_memcpy(vq->async_descs_split,
> > -&vq-
> > >shadow_used_split[from +
> > -size], (num_buffers - size) *
> > -   sizeof(struct vring_used_elem));
> > -}
> > +
> > +shadow_ring_store(vq, vq->shadow_used_split,
> > +vq->async_descs_split,
> > +from, to, num_buffers,
> > +sizeof(struct vring_used_elem));
> 
> This function is to store DMA-occupied desc, but " shadow_ring_store" is not
> a good name for it. In addition, I think there is no need to pass vq as a
> parameter. What you need is the size of shadow ring and async desc ring.

Ok, I think we can use the name " store_dma_desc_info()".

Thanks a lot.
Cheng

> 
> Thanks,
> Jiayu
> > +
> >  vq->async_desc_idx += num_buffers;
> >  vq->shadow_used_idx -= num_buffers;
> >  } else
> > @@ -1575,10 +1593,9 @@ virtio_dev_rx_async_submit_split(struct
> > virtio_net *dev,
> >  BUF_VECTOR_MAX))) {
> >  n_pkts = vq->async_ops.transfer_data(dev->vid,
> >  queue_id, tdes, 0, pkt_burst_idx);
> > -src_iovec = vec_pool;
> > -dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1); -src_it = it_pool;
> > -dst_it = it_pool + 1;
> > +iovec_idx = 0;
> > +it_idx = 0;
> > +
> >  segs_await = 0;
> >  vq->async_pkts_inflight_n += n_pkts;
> >
> > @@ -1639,6 +1656,43 @@ virtio_dev_rx_async_submit_split(struct
> > virtio_net *dev,
> >  return pkt_idx;
> >  }
> >
> > +static __rte_always_inline void
> > +write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t
> > n_descs)
> > +{
> > +uint16_t nr_left = n_descs;
> > +uint16_t nr_copy;
> > +uint16_t to, from;
> > +
> > +do {
> > +from = vq->last_async_desc_idx & (vq->size - 1); nr_copy = nr_left +
> > +from <= vq->size ? nr_left :
> > +vq->size - from;
> > +to = vq->last_used_idx & (vq->size - 1);
> > +
> > +if (to + nr_copy <= vq->size) {
> > +rte_memcpy(&vq->used->ring[to],
> > +&vq->async_descs_split[from],
> > +nr_copy *
> > +sizeof(struct vring_used_elem));
> > +} else {
> > +uint16_t size = vq->size - to;
> > +
> > +rte_memcpy(&vq->used->ring[to],
> > +&vq->async_descs_split[from],
> > +size *
> > +sizeof(struct vring_used_elem));
> > +rte_memcpy(vq->used->ring,
> > +&vq->async_descs_split[from +
> > +size], (nr_copy - size) *
> > +sizeof(struct vring_used_elem));
> > +}
> > +
> > +vq->last_async_desc_idx += nr_copy;
> > +vq->last_used_idx += nr_copy;
> > +nr_left -= nr_copy;
> > +} while (nr_left > 0);
> > +}
> > +
> >  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> > struct rte_mbuf **pkts, uint16_t count)  { @@ -1695,39 +1749,7 @@
> > uint16_t rte_vhost_poll_enqueue_completed(int
> > vid, uint16_t queue_id,
> >  vq->async_pkts_inflight_n -= n_pkts_put;
> >
> >  if (likely(vq->enabled && vq->access_ok)) { -uint16_t nr_left =
> > n_descs; -uint16_t nr_copy; -uint16_t to;
> > -
> > -/* write back completed descriptors to used ring */ -do { -from =
> > vq->last_async_desc_idx & (vq->size - 1); -nr_copy = nr_left + from <=
> > vq->size ? nr_left :
> > -vq->size - from;
> > -to = vq->last_used_idx & (vq->size - 1);
> > -
> > -if (to + nr_copy <= vq->size) {
> > -rte_memcpy(&vq->used->ring[to],
> > -&vq-
> > >async_descs_split[from],
> > -nr_copy *
> > -sizeof(struct
> > vring_used_elem));
> > -} else {
> > -uint16_t size = vq->size - to;
> > -
> > -rte_memcpy(&vq->used->ring[to],
> > -&vq-
> > >async_descs_split[from],
> > -size *
> > -sizeof(struct
> > vring_used_elem));
> > -rte_memcpy(vq->used->ring,
> > -&vq->async_descs_split[from
> > +
> > -size], (nr_copy - size) *
> > -sizeof(struct
> > vring_used_elem));
> > -}
> > -
> > -vq->last_async_desc_idx += nr_copy;
> > -vq->last_used_idx += nr_copy;
> > -nr_left -= nr_copy;
> > -} while (nr_left > 0);
> > +write_back_completed_descs_split(vq, n_descs);
> >
> >  __atomic_add_fetch(&vq->used->idx, n_descs, __ATOMIC_RELEASE);
> > vhost_vring_call_split(dev, vq);
> > --
> > 2.29.2
>
  
Maxime Coquelin April 13, 2021, 7:11 a.m. UTC | #3
Hi Cheng,

On 4/12/21 1:34 PM, Cheng Jiang wrote:
> In order to improve code efficiency and readability when async packed
> ring support is enabled. This patch abstract some functions like
> shadow_ring_store and write_back_completed_descs_split. And improve
> the efficiency of some pointer offset calculation.
> 
> Signed-off-by: Cheng Jiang <Cheng1.jiang@intel.com>
> ---
>  lib/librte_vhost/virtio_net.c | 146 +++++++++++++++++++---------------
>  1 file changed, 84 insertions(+), 62 deletions(-)
> 
> diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> index ff3987860..c43ab0093 100644
> --- a/lib/librte_vhost/virtio_net.c
> +++ b/lib/librte_vhost/virtio_net.c
> @@ -1458,6 +1458,29 @@ virtio_dev_rx_async_get_info_idx(uint16_t pkts_idx,
>  		(vq_size - n_inflight + pkts_idx) & (vq_size - 1);
>  }
>  
> +static __rte_always_inline void
> +shadow_ring_store(struct vhost_virtqueue *vq,  void *shadow_ring, void *d_ring,
> +		uint16_t s_idx, uint16_t d_idx,
> +		uint16_t count, uint16_t elem_size)
> +{
> +	if (d_idx + count <= vq->size) {
> +		rte_memcpy((void *)((uintptr_t)d_ring + d_idx * elem_size),
> +			(void *)((uintptr_t)shadow_ring + s_idx * elem_size),
> +			count * elem_size);
> +	} else {
> +		uint16_t size = vq->size - d_idx;
> +
> +		rte_memcpy((void *)((uintptr_t)d_ring + d_idx * elem_size),
> +			(void *)((uintptr_t)shadow_ring + s_idx * elem_size),
> +			size * elem_size);
> +
> +		rte_memcpy((void *)((uintptr_t)d_ring),
> +			(void *)((uintptr_t)shadow_ring +
> +				(s_idx + size) * elem_size),
> +			(count - size) * elem_size);
> +	}
> +}
> +
>  static __rte_noinline uint32_t
>  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
>  	struct vhost_virtqueue *vq, uint16_t queue_id,
> @@ -1478,6 +1501,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
>  	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
>  	uint16_t slot_idx = 0;
>  	uint16_t segs_await = 0;
> +	uint16_t iovec_idx = 0, it_idx = 0;
>  	struct async_inflight_info *pkts_info = vq->async_pkts_info;
>  	uint32_t n_pkts = 0, pkt_err = 0;
>  	uint32_t num_async_pkts = 0, num_done_pkts = 0;
> @@ -1513,27 +1537,32 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
>  
>  		if (async_mbuf_to_desc(dev, vq, pkts[pkt_idx],
>  				buf_vec, nr_vec, num_buffers,
> -				src_iovec, dst_iovec, src_it, dst_it) < 0) {
> +				&src_iovec[iovec_idx],
> +				&dst_iovec[iovec_idx],
> +				&src_it[it_idx],
> +				&dst_it[it_idx]) < 0) {
>  			vq->shadow_used_idx -= num_buffers;
>  			break;
>  		}
>  
>  		slot_idx = (vq->async_pkts_idx + num_async_pkts) &
>  			(vq->size - 1);
> -		if (src_it->count) {
> +		if (src_it[it_idx].count) {
>  			uint16_t from, to;
>  
> -			async_fill_desc(&tdes[pkt_burst_idx++], src_it, dst_it);
> +			async_fill_desc(&tdes[pkt_burst_idx++],
> +				&src_it[it_idx],
> +				&dst_it[it_idx]);
>  			pkts_info[slot_idx].descs = num_buffers;
>  			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
>  			async_pkts_log[num_async_pkts].pkt_idx = pkt_idx;
>  			async_pkts_log[num_async_pkts++].last_avail_idx =
>  				vq->last_avail_idx;
> -			src_iovec += src_it->nr_segs;
> -			dst_iovec += dst_it->nr_segs;
> -			src_it += 2;
> -			dst_it += 2;
> -			segs_await += src_it->nr_segs;
> +
> +			iovec_idx += src_it[it_idx].nr_segs;
> +			it_idx += 2;
> +
> +			segs_await += src_it[it_idx].nr_segs;
>  
>  			/**
>  			 * recover shadow used ring and keep DMA-occupied
> @@ -1541,23 +1570,12 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
>  			 */
>  			from = vq->shadow_used_idx - num_buffers;
>  			to = vq->async_desc_idx & (vq->size - 1);
> -			if (num_buffers + to <= vq->size) {
> -				rte_memcpy(&vq->async_descs_split[to],
> -						&vq->shadow_used_split[from],
> -						num_buffers *
> -						sizeof(struct vring_used_elem));
> -			} else {
> -				int size = vq->size - to;
> -
> -				rte_memcpy(&vq->async_descs_split[to],
> -						&vq->shadow_used_split[from],
> -						size *
> -						sizeof(struct vring_used_elem));
> -				rte_memcpy(vq->async_descs_split,
> -						&vq->shadow_used_split[from +
> -						size], (num_buffers - size) *
> -					   sizeof(struct vring_used_elem));
> -			}
> +
> +			shadow_ring_store(vq, vq->shadow_used_split,
> +					vq->async_descs_split,
> +					from, to, num_buffers,
> +					sizeof(struct vring_used_elem));
> +

I'm not convinced with this rework.

I think it is good to create a dedicated function for this to simplify
this huge virtio_dev_rx_async_submit_split() function. But we should
have a dedicated version for split ring. Having a single function for
both split and packed ring does not improve readability, and unlikely
improve performance.

>  			vq->async_desc_idx += num_buffers;
>  			vq->shadow_used_idx -= num_buffers;
>  		} else
> @@ -1575,10 +1593,9 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
>  			BUF_VECTOR_MAX))) {
>  			n_pkts = vq->async_ops.transfer_data(dev->vid,
>  					queue_id, tdes, 0, pkt_burst_idx);
> -			src_iovec = vec_pool;
> -			dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
> -			src_it = it_pool;
> -			dst_it = it_pool + 1;
> +			iovec_idx = 0;
> +			it_idx = 0;
> +
>  			segs_await = 0;
>  			vq->async_pkts_inflight_n += n_pkts;
>  
> @@ -1639,6 +1656,43 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
>  	return pkt_idx;
>  }
>  
> +static __rte_always_inline void
> +write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t n_descs)
> +{
> +	uint16_t nr_left = n_descs;
> +	uint16_t nr_copy;
> +	uint16_t to, from;
> +
> +	do {
> +		from = vq->last_async_desc_idx & (vq->size - 1);
> +		nr_copy = nr_left + from <= vq->size ? nr_left :
> +			vq->size - from;
> +		to = vq->last_used_idx & (vq->size - 1);
> +
> +		if (to + nr_copy <= vq->size) {
> +			rte_memcpy(&vq->used->ring[to],
> +					&vq->async_descs_split[from],
> +					nr_copy *
> +					sizeof(struct vring_used_elem));
> +		} else {
> +			uint16_t size = vq->size - to;
> +
> +			rte_memcpy(&vq->used->ring[to],
> +					&vq->async_descs_split[from],
> +					size *
> +					sizeof(struct vring_used_elem));
> +			rte_memcpy(vq->used->ring,
&vq->used->ring[0] for consistency
> +					&vq->async_descs_split[from +
> +					size], (nr_copy - size) *
> +					sizeof(struct vring_used_elem));

Lines can now be up to 100 chars.
Please take the opportunity to indent properly not to have parts of each
args being put on the same line. It will help readability.

> +		}
> +
> +		vq->last_async_desc_idx += nr_copy;
> +		vq->last_used_idx += nr_copy;
> +		nr_left -= nr_copy;
> +	} while (nr_left > 0);
> +}
> +
>  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
>  		struct rte_mbuf **pkts, uint16_t count)
>  {
> @@ -1695,39 +1749,7 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
>  	vq->async_pkts_inflight_n -= n_pkts_put;
>  
>  	if (likely(vq->enabled && vq->access_ok)) {
> -		uint16_t nr_left = n_descs;
> -		uint16_t nr_copy;
> -		uint16_t to;
> -
> -		/* write back completed descriptors to used ring */
> -		do {
> -			from = vq->last_async_desc_idx & (vq->size - 1);
> -			nr_copy = nr_left + from <= vq->size ? nr_left :
> -				vq->size - from;
> -			to = vq->last_used_idx & (vq->size - 1);
> -
> -			if (to + nr_copy <= vq->size) {
> -				rte_memcpy(&vq->used->ring[to],
> -						&vq->async_descs_split[from],
> -						nr_copy *
> -						sizeof(struct vring_used_elem));
> -			} else {
> -				uint16_t size = vq->size - to;
> -
> -				rte_memcpy(&vq->used->ring[to],
> -						&vq->async_descs_split[from],
> -						size *
> -						sizeof(struct vring_used_elem));
> -				rte_memcpy(vq->used->ring,
> -						&vq->async_descs_split[from +
> -						size], (nr_copy - size) *
> -						sizeof(struct vring_used_elem));
> -			}
> -
> -			vq->last_async_desc_idx += nr_copy;
> -			vq->last_used_idx += nr_copy;
> -			nr_left -= nr_copy;
> -		} while (nr_left > 0);
> +		write_back_completed_descs_split(vq, n_descs);
>  
>  		__atomic_add_fetch(&vq->used->idx, n_descs, __ATOMIC_RELEASE);
>  		vhost_vring_call_split(dev, vq);
>
  
Jiang, Cheng1 April 13, 2021, 9:06 a.m. UTC | #4
Hi Maxime,

> -----Original Message-----
> From: Maxime Coquelin <maxime.coquelin@redhat.com>
> Sent: Tuesday, April 13, 2021 3:12 PM
> To: Jiang, Cheng1 <cheng1.jiang@intel.com>; Xia, Chenbo
> <chenbo.xia@intel.com>
> Cc: dev@dpdk.org; Hu, Jiayu <jiayu.hu@intel.com>; Yang, YvonneX
> <yvonnex.yang@intel.com>; Wang, Yinan <yinan.wang@intel.com>; Liu,
> Yong <yong.liu@intel.com>
> Subject: Re: [PATCH v5 1/4] vhost: abstract and reorganize async split ring
> code
> 
> Hi Cheng,
> 
> On 4/12/21 1:34 PM, Cheng Jiang wrote:
> > In order to improve code efficiency and readability when async packed
> > ring support is enabled. This patch abstract some functions like
> > shadow_ring_store and write_back_completed_descs_split. And improve
> > the efficiency of some pointer offset calculation.
> >
> > Signed-off-by: Cheng Jiang <Cheng1.jiang@intel.com>
> > ---
> >  lib/librte_vhost/virtio_net.c | 146
> > +++++++++++++++++++---------------
> >  1 file changed, 84 insertions(+), 62 deletions(-)
> >
> > diff --git a/lib/librte_vhost/virtio_net.c
> > b/lib/librte_vhost/virtio_net.c index ff3987860..c43ab0093 100644
> > --- a/lib/librte_vhost/virtio_net.c
> > +++ b/lib/librte_vhost/virtio_net.c
> > @@ -1458,6 +1458,29 @@ virtio_dev_rx_async_get_info_idx(uint16_t
> pkts_idx,
> >  		(vq_size - n_inflight + pkts_idx) & (vq_size - 1);  }
> >
> > +static __rte_always_inline void
> > +shadow_ring_store(struct vhost_virtqueue *vq,  void *shadow_ring, void
> *d_ring,
> > +		uint16_t s_idx, uint16_t d_idx,
> > +		uint16_t count, uint16_t elem_size) {
> > +	if (d_idx + count <= vq->size) {
> > +		rte_memcpy((void *)((uintptr_t)d_ring + d_idx * elem_size),
> > +			(void *)((uintptr_t)shadow_ring + s_idx * elem_size),
> > +			count * elem_size);
> > +	} else {
> > +		uint16_t size = vq->size - d_idx;
> > +
> > +		rte_memcpy((void *)((uintptr_t)d_ring + d_idx * elem_size),
> > +			(void *)((uintptr_t)shadow_ring + s_idx * elem_size),
> > +			size * elem_size);
> > +
> > +		rte_memcpy((void *)((uintptr_t)d_ring),
> > +			(void *)((uintptr_t)shadow_ring +
> > +				(s_idx + size) * elem_size),
> > +			(count - size) * elem_size);
> > +	}
> > +}
> > +
> >  static __rte_noinline uint32_t
> >  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> >  	struct vhost_virtqueue *vq, uint16_t queue_id, @@ -1478,6 +1501,7
> @@
> > virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> >  	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
> >  	uint16_t slot_idx = 0;
> >  	uint16_t segs_await = 0;
> > +	uint16_t iovec_idx = 0, it_idx = 0;
> >  	struct async_inflight_info *pkts_info = vq->async_pkts_info;
> >  	uint32_t n_pkts = 0, pkt_err = 0;
> >  	uint32_t num_async_pkts = 0, num_done_pkts = 0; @@ -1513,27
> +1537,32
> > @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> >
> >  		if (async_mbuf_to_desc(dev, vq, pkts[pkt_idx],
> >  				buf_vec, nr_vec, num_buffers,
> > -				src_iovec, dst_iovec, src_it, dst_it) < 0) {
> > +				&src_iovec[iovec_idx],
> > +				&dst_iovec[iovec_idx],
> > +				&src_it[it_idx],
> > +				&dst_it[it_idx]) < 0) {
> >  			vq->shadow_used_idx -= num_buffers;
> >  			break;
> >  		}
> >
> >  		slot_idx = (vq->async_pkts_idx + num_async_pkts) &
> >  			(vq->size - 1);
> > -		if (src_it->count) {
> > +		if (src_it[it_idx].count) {
> >  			uint16_t from, to;
> >
> > -			async_fill_desc(&tdes[pkt_burst_idx++], src_it,
> dst_it);
> > +			async_fill_desc(&tdes[pkt_burst_idx++],
> > +				&src_it[it_idx],
> > +				&dst_it[it_idx]);
> >  			pkts_info[slot_idx].descs = num_buffers;
> >  			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
> >  			async_pkts_log[num_async_pkts].pkt_idx = pkt_idx;
> >  			async_pkts_log[num_async_pkts++].last_avail_idx =
> >  				vq->last_avail_idx;
> > -			src_iovec += src_it->nr_segs;
> > -			dst_iovec += dst_it->nr_segs;
> > -			src_it += 2;
> > -			dst_it += 2;
> > -			segs_await += src_it->nr_segs;
> > +
> > +			iovec_idx += src_it[it_idx].nr_segs;
> > +			it_idx += 2;
> > +
> > +			segs_await += src_it[it_idx].nr_segs;
> >
> >  			/**
> >  			 * recover shadow used ring and keep DMA-occupied
> @@ -1541,23
> > +1570,12 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> >  			 */
> >  			from = vq->shadow_used_idx - num_buffers;
> >  			to = vq->async_desc_idx & (vq->size - 1);
> > -			if (num_buffers + to <= vq->size) {
> > -				rte_memcpy(&vq->async_descs_split[to],
> > -						&vq-
> >shadow_used_split[from],
> > -						num_buffers *
> > -						sizeof(struct
> vring_used_elem));
> > -			} else {
> > -				int size = vq->size - to;
> > -
> > -				rte_memcpy(&vq->async_descs_split[to],
> > -						&vq-
> >shadow_used_split[from],
> > -						size *
> > -						sizeof(struct
> vring_used_elem));
> > -				rte_memcpy(vq->async_descs_split,
> > -						&vq-
> >shadow_used_split[from +
> > -						size], (num_buffers - size) *
> > -					   sizeof(struct vring_used_elem));
> > -			}
> > +
> > +			shadow_ring_store(vq, vq->shadow_used_split,
> > +					vq->async_descs_split,
> > +					from, to, num_buffers,
> > +					sizeof(struct vring_used_elem));
> > +
> 
> I'm not convinced with this rework.
> 
> I think it is good to create a dedicated function for this to simplify this huge
> virtio_dev_rx_async_submit_split() function. But we should have a
> dedicated version for split ring. Having a single function for both split and
> packed ring does not improve readability, and unlikely improve performance.

Sure, I'm agree with you. I will use two functions for split and packed separately in the next version.

> 
> >  			vq->async_desc_idx += num_buffers;
> >  			vq->shadow_used_idx -= num_buffers;
> >  		} else
> > @@ -1575,10 +1593,9 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
> >  			BUF_VECTOR_MAX))) {
> >  			n_pkts = vq->async_ops.transfer_data(dev->vid,
> >  					queue_id, tdes, 0, pkt_burst_idx);
> > -			src_iovec = vec_pool;
> > -			dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >>
> 1);
> > -			src_it = it_pool;
> > -			dst_it = it_pool + 1;
> > +			iovec_idx = 0;
> > +			it_idx = 0;
> > +
> >  			segs_await = 0;
> >  			vq->async_pkts_inflight_n += n_pkts;
> >
> > @@ -1639,6 +1656,43 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
> >  	return pkt_idx;
> >  }
> >
> > +static __rte_always_inline void
> > +write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t
> > +n_descs) {
> > +	uint16_t nr_left = n_descs;
> > +	uint16_t nr_copy;
> > +	uint16_t to, from;
> > +
> > +	do {
> > +		from = vq->last_async_desc_idx & (vq->size - 1);
> > +		nr_copy = nr_left + from <= vq->size ? nr_left :
> > +			vq->size - from;
> > +		to = vq->last_used_idx & (vq->size - 1);
> > +
> > +		if (to + nr_copy <= vq->size) {
> > +			rte_memcpy(&vq->used->ring[to],
> > +					&vq->async_descs_split[from],
> > +					nr_copy *
> > +					sizeof(struct vring_used_elem));
> > +		} else {
> > +			uint16_t size = vq->size - to;
> > +
> > +			rte_memcpy(&vq->used->ring[to],
> > +					&vq->async_descs_split[from],
> > +					size *
> > +					sizeof(struct vring_used_elem));
> > +			rte_memcpy(vq->used->ring,
> &vq->used->ring[0] for consistency

It will be fixed in the next version.

> > +					&vq->async_descs_split[from +
> > +					size], (nr_copy - size) *
> > +					sizeof(struct vring_used_elem));
> 
> Lines can now be up to 100 chars.
> Please take the opportunity to indent properly not to have parts of each args
> being put on the same line. It will help readability.

Ok, glad to know. I will fix them in the next version.

Thanks a lot.
Cheng

> 
> > +		}
> > +
> > +		vq->last_async_desc_idx += nr_copy;
> > +		vq->last_used_idx += nr_copy;
> > +		nr_left -= nr_copy;
> > +	} while (nr_left > 0);
> > +}
> > +
> >  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> >  		struct rte_mbuf **pkts, uint16_t count)  { @@ -1695,39
> +1749,7 @@
> > uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
> >  	vq->async_pkts_inflight_n -= n_pkts_put;
> >
> >  	if (likely(vq->enabled && vq->access_ok)) {
> > -		uint16_t nr_left = n_descs;
> > -		uint16_t nr_copy;
> > -		uint16_t to;
> > -
> > -		/* write back completed descriptors to used ring */
> > -		do {
> > -			from = vq->last_async_desc_idx & (vq->size - 1);
> > -			nr_copy = nr_left + from <= vq->size ? nr_left :
> > -				vq->size - from;
> > -			to = vq->last_used_idx & (vq->size - 1);
> > -
> > -			if (to + nr_copy <= vq->size) {
> > -				rte_memcpy(&vq->used->ring[to],
> > -						&vq-
> >async_descs_split[from],
> > -						nr_copy *
> > -						sizeof(struct
> vring_used_elem));
> > -			} else {
> > -				uint16_t size = vq->size - to;
> > -
> > -				rte_memcpy(&vq->used->ring[to],
> > -						&vq-
> >async_descs_split[from],
> > -						size *
> > -						sizeof(struct
> vring_used_elem));
> > -				rte_memcpy(vq->used->ring,
> > -						&vq->async_descs_split[from
> +
> > -						size], (nr_copy - size) *
> > -						sizeof(struct
> vring_used_elem));
> > -			}
> > -
> > -			vq->last_async_desc_idx += nr_copy;
> > -			vq->last_used_idx += nr_copy;
> > -			nr_left -= nr_copy;
> > -		} while (nr_left > 0);
> > +		write_back_completed_descs_split(vq, n_descs);
> >
> >  		__atomic_add_fetch(&vq->used->idx, n_descs,
> __ATOMIC_RELEASE);
> >  		vhost_vring_call_split(dev, vq);
> >
  

Patch

diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index ff3987860..c43ab0093 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -1458,6 +1458,29 @@  virtio_dev_rx_async_get_info_idx(uint16_t pkts_idx,
 		(vq_size - n_inflight + pkts_idx) & (vq_size - 1);
 }
 
+static __rte_always_inline void
+shadow_ring_store(struct vhost_virtqueue *vq,  void *shadow_ring, void *d_ring,
+		uint16_t s_idx, uint16_t d_idx,
+		uint16_t count, uint16_t elem_size)
+{
+	if (d_idx + count <= vq->size) {
+		rte_memcpy((void *)((uintptr_t)d_ring + d_idx * elem_size),
+			(void *)((uintptr_t)shadow_ring + s_idx * elem_size),
+			count * elem_size);
+	} else {
+		uint16_t size = vq->size - d_idx;
+
+		rte_memcpy((void *)((uintptr_t)d_ring + d_idx * elem_size),
+			(void *)((uintptr_t)shadow_ring + s_idx * elem_size),
+			size * elem_size);
+
+		rte_memcpy((void *)((uintptr_t)d_ring),
+			(void *)((uintptr_t)shadow_ring +
+				(s_idx + size) * elem_size),
+			(count - size) * elem_size);
+	}
+}
+
 static __rte_noinline uint32_t
 virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	struct vhost_virtqueue *vq, uint16_t queue_id,
@@ -1478,6 +1501,7 @@  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	struct rte_vhost_iov_iter *dst_it = it_pool + 1;
 	uint16_t slot_idx = 0;
 	uint16_t segs_await = 0;
+	uint16_t iovec_idx = 0, it_idx = 0;
 	struct async_inflight_info *pkts_info = vq->async_pkts_info;
 	uint32_t n_pkts = 0, pkt_err = 0;
 	uint32_t num_async_pkts = 0, num_done_pkts = 0;
@@ -1513,27 +1537,32 @@  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 
 		if (async_mbuf_to_desc(dev, vq, pkts[pkt_idx],
 				buf_vec, nr_vec, num_buffers,
-				src_iovec, dst_iovec, src_it, dst_it) < 0) {
+				&src_iovec[iovec_idx],
+				&dst_iovec[iovec_idx],
+				&src_it[it_idx],
+				&dst_it[it_idx]) < 0) {
 			vq->shadow_used_idx -= num_buffers;
 			break;
 		}
 
 		slot_idx = (vq->async_pkts_idx + num_async_pkts) &
 			(vq->size - 1);
-		if (src_it->count) {
+		if (src_it[it_idx].count) {
 			uint16_t from, to;
 
-			async_fill_desc(&tdes[pkt_burst_idx++], src_it, dst_it);
+			async_fill_desc(&tdes[pkt_burst_idx++],
+				&src_it[it_idx],
+				&dst_it[it_idx]);
 			pkts_info[slot_idx].descs = num_buffers;
 			pkts_info[slot_idx].mbuf = pkts[pkt_idx];
 			async_pkts_log[num_async_pkts].pkt_idx = pkt_idx;
 			async_pkts_log[num_async_pkts++].last_avail_idx =
 				vq->last_avail_idx;
-			src_iovec += src_it->nr_segs;
-			dst_iovec += dst_it->nr_segs;
-			src_it += 2;
-			dst_it += 2;
-			segs_await += src_it->nr_segs;
+
+			iovec_idx += src_it[it_idx].nr_segs;
+			it_idx += 2;
+
+			segs_await += src_it[it_idx].nr_segs;
 
 			/**
 			 * recover shadow used ring and keep DMA-occupied
@@ -1541,23 +1570,12 @@  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 			 */
 			from = vq->shadow_used_idx - num_buffers;
 			to = vq->async_desc_idx & (vq->size - 1);
-			if (num_buffers + to <= vq->size) {
-				rte_memcpy(&vq->async_descs_split[to],
-						&vq->shadow_used_split[from],
-						num_buffers *
-						sizeof(struct vring_used_elem));
-			} else {
-				int size = vq->size - to;
-
-				rte_memcpy(&vq->async_descs_split[to],
-						&vq->shadow_used_split[from],
-						size *
-						sizeof(struct vring_used_elem));
-				rte_memcpy(vq->async_descs_split,
-						&vq->shadow_used_split[from +
-						size], (num_buffers - size) *
-					   sizeof(struct vring_used_elem));
-			}
+
+			shadow_ring_store(vq, vq->shadow_used_split,
+					vq->async_descs_split,
+					from, to, num_buffers,
+					sizeof(struct vring_used_elem));
+
 			vq->async_desc_idx += num_buffers;
 			vq->shadow_used_idx -= num_buffers;
 		} else
@@ -1575,10 +1593,9 @@  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 			BUF_VECTOR_MAX))) {
 			n_pkts = vq->async_ops.transfer_data(dev->vid,
 					queue_id, tdes, 0, pkt_burst_idx);
-			src_iovec = vec_pool;
-			dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
-			src_it = it_pool;
-			dst_it = it_pool + 1;
+			iovec_idx = 0;
+			it_idx = 0;
+
 			segs_await = 0;
 			vq->async_pkts_inflight_n += n_pkts;
 
@@ -1639,6 +1656,43 @@  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 	return pkt_idx;
 }
 
+static __rte_always_inline void
+write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t n_descs)
+{
+	uint16_t nr_left = n_descs;
+	uint16_t nr_copy;
+	uint16_t to, from;
+
+	do {
+		from = vq->last_async_desc_idx & (vq->size - 1);
+		nr_copy = nr_left + from <= vq->size ? nr_left :
+			vq->size - from;
+		to = vq->last_used_idx & (vq->size - 1);
+
+		if (to + nr_copy <= vq->size) {
+			rte_memcpy(&vq->used->ring[to],
+					&vq->async_descs_split[from],
+					nr_copy *
+					sizeof(struct vring_used_elem));
+		} else {
+			uint16_t size = vq->size - to;
+
+			rte_memcpy(&vq->used->ring[to],
+					&vq->async_descs_split[from],
+					size *
+					sizeof(struct vring_used_elem));
+			rte_memcpy(vq->used->ring,
+					&vq->async_descs_split[from +
+					size], (nr_copy - size) *
+					sizeof(struct vring_used_elem));
+		}
+
+		vq->last_async_desc_idx += nr_copy;
+		vq->last_used_idx += nr_copy;
+		nr_left -= nr_copy;
+	} while (nr_left > 0);
+}
+
 uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 		struct rte_mbuf **pkts, uint16_t count)
 {
@@ -1695,39 +1749,7 @@  uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 	vq->async_pkts_inflight_n -= n_pkts_put;
 
 	if (likely(vq->enabled && vq->access_ok)) {
-		uint16_t nr_left = n_descs;
-		uint16_t nr_copy;
-		uint16_t to;
-
-		/* write back completed descriptors to used ring */
-		do {
-			from = vq->last_async_desc_idx & (vq->size - 1);
-			nr_copy = nr_left + from <= vq->size ? nr_left :
-				vq->size - from;
-			to = vq->last_used_idx & (vq->size - 1);
-
-			if (to + nr_copy <= vq->size) {
-				rte_memcpy(&vq->used->ring[to],
-						&vq->async_descs_split[from],
-						nr_copy *
-						sizeof(struct vring_used_elem));
-			} else {
-				uint16_t size = vq->size - to;
-
-				rte_memcpy(&vq->used->ring[to],
-						&vq->async_descs_split[from],
-						size *
-						sizeof(struct vring_used_elem));
-				rte_memcpy(vq->used->ring,
-						&vq->async_descs_split[from +
-						size], (nr_copy - size) *
-						sizeof(struct vring_used_elem));
-			}
-
-			vq->last_async_desc_idx += nr_copy;
-			vq->last_used_idx += nr_copy;
-			nr_left -= nr_copy;
-		} while (nr_left > 0);
+		write_back_completed_descs_split(vq, n_descs);
 
 		__atomic_add_fetch(&vq->used->idx, n_descs, __ATOMIC_RELEASE);
 		vhost_vring_call_split(dev, vq);