[v2,03/16] vhost: add burst enqueue function for packed ring

Message ID 20190919163643.24130-4-yong.liu@intel.com (mailing list archive)
State Superseded, archived
Headers
Series vhost packed ring performance optimization |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation fail Compilation issues

Commit Message

Marvin Liu Sept. 19, 2019, 4:36 p.m. UTC
  Burst enqueue function will first check whether descriptors are cache
aligned. It will also check prerequisites in the beginning. Burst
enqueue function not support chained mbufs, single packet enqueue
function will handle it.

Signed-off-by: Marvin Liu <yong.liu@intel.com>
  

Comments

Tiwei Bie Sept. 23, 2019, 5:46 a.m. UTC | #1
On Fri, Sep 20, 2019 at 12:36:30AM +0800, Marvin Liu wrote:
> Burst enqueue function will first check whether descriptors are cache
> aligned. It will also check prerequisites in the beginning. Burst
> enqueue function not support chained mbufs, single packet enqueue
> function will handle it.
> 
> Signed-off-by: Marvin Liu <yong.liu@intel.com>
> 
> diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> index 5074226f0..67889c80a 100644
> --- a/lib/librte_vhost/vhost.h
> +++ b/lib/librte_vhost/vhost.h
> @@ -39,6 +39,9 @@
>  
>  #define VHOST_LOG_CACHE_NR 32
>  
> +#define PACKED_DESCS_BURST (RTE_CACHE_LINE_SIZE / \
> +			    sizeof(struct vring_packed_desc))
> +
>  #ifdef SUPPORT_GCC_UNROLL_PRAGMA
>  #define PRAGMA_PARAM "GCC unroll 4"
>  #endif
> @@ -57,6 +60,8 @@
>  #define UNROLL_PRAGMA(param) do {} while(0);
>  #endif
>  
> +#define PACKED_BURST_MASK (PACKED_DESCS_BURST - 1)

It's better to have consistent names, e.g. something like:

PACKED_BATCH_SIZE
PACKED_BATCH_MASK

instead of

PACKED_DESCS_BURST
PACKED_BURST_MASK

Besides, please also put above two defines together.


> +
>  /**
>   * Structure contains buffer address, length and descriptor index
>   * from vring to do scatter RX.
> diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> index 2b5c47145..c664b27c5 100644
> --- a/lib/librte_vhost/virtio_net.c
> +++ b/lib/librte_vhost/virtio_net.c
> @@ -895,6 +895,84 @@ virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
>  	return pkt_idx;
>  }
>  
> +static __rte_unused __rte_always_inline int
> +virtio_dev_rx_burst_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
> +	 struct rte_mbuf **pkts)
> +{
> +	bool wrap_counter = vq->avail_wrap_counter;
> +	struct vring_packed_desc *descs = vq->desc_packed;
> +	uint16_t avail_idx = vq->last_avail_idx;
> +
> +	uint64_t desc_addrs[PACKED_DESCS_BURST];
> +	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_DESCS_BURST];
> +	uint32_t buf_offset = dev->vhost_hlen;
> +	uint64_t lens[PACKED_DESCS_BURST];
> +
> +	uint16_t i;
> +
> +	if (unlikely(avail_idx & PACKED_BURST_MASK))
> +		return -1;

We also need to check (avail_idx + PACKED_DESCS_BURST) <= vq->size
before accessing descs[avail_idx + i] in the following code.


> +
> +	UNROLL_PRAGMA(PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> +		if (unlikely(pkts[i]->next != NULL))
> +			return -1;
> +		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
> +					    wrap_counter)))
> +			return -1;
> +	}
> +
> +	rte_smp_rmb();
> +
> +	UNROLL_PRAGMA(PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_DESCS_BURST; i++)
> +		lens[i] = descs[avail_idx + i].len;
> +
> +	UNROLL_PRAGMA(PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> +		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
> +			return -1;
> +	}
> +
> +	UNROLL_PRAGMA(PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_DESCS_BURST; i++)
> +		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
> +						 descs[avail_idx + i].addr,
> +						 &lens[i],
> +						 VHOST_ACCESS_RW);
> +	UNROLL_PRAGMA(PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> +		if (unlikely(lens[i] != descs[avail_idx + i].len))
> +			return -1;
> +	}
> +
> +	UNROLL_PRAGMA(PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> +		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
> +		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)desc_addrs[i];
> +		lens[i] = pkts[i]->pkt_len + dev->vhost_hlen;
> +	}
> +
> +	UNROLL_PRAGMA(PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_DESCS_BURST; i++)
> +		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
> +
> +	vq->last_avail_idx += PACKED_DESCS_BURST;
> +	if (vq->last_avail_idx >= vq->size) {
> +		vq->last_avail_idx -= vq->size;
> +		vq->avail_wrap_counter ^= 1;
> +	}
> +
> +	UNROLL_PRAGMA(PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> +		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
> +			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
> +			   pkts[i]->pkt_len);
> +	}
> +
> +	return 0;
> +}
> +
>  static __rte_unused int16_t
>  virtio_dev_rx_single_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
>  	struct rte_mbuf *pkt)
> -- 
> 2.17.1
>
  
Gavin Hu Sept. 23, 2019, 11:08 a.m. UTC | #2
Hi Marvin,

Is it possible to vectorize the processing? 
Other comments inline:
/Gavin
> -----Original Message-----
> From: dev <dev-bounces@dpdk.org> On Behalf Of Marvin Liu
> Sent: Friday, September 20, 2019 12:37 AM
> To: maxime.coquelin@redhat.com; tiwei.bie@intel.com;
> zhihong.wang@intel.com
> Cc: dev@dpdk.org; Marvin Liu <yong.liu@intel.com>
> Subject: [dpdk-dev] [PATCH v2 03/16] vhost: add burst enqueue function for
> packed ring
> 
> Burst enqueue function will first check whether descriptors are cache
> aligned. It will also check prerequisites in the beginning. Burst
> enqueue function not support chained mbufs, single packet enqueue
> function will handle it.
> 
> Signed-off-by: Marvin Liu <yong.liu@intel.com>
> 
> diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> index 5074226f0..67889c80a 100644
> --- a/lib/librte_vhost/vhost.h
> +++ b/lib/librte_vhost/vhost.h
> @@ -39,6 +39,9 @@
> 
>  #define VHOST_LOG_CACHE_NR 32
> 
> +#define PACKED_DESCS_BURST (RTE_CACHE_LINE_SIZE / \
> +			    sizeof(struct vring_packed_desc))
> +
>  #ifdef SUPPORT_GCC_UNROLL_PRAGMA
>  #define PRAGMA_PARAM "GCC unroll 4"
>  #endif
> @@ -57,6 +60,8 @@
>  #define UNROLL_PRAGMA(param) do {} while(0);
>  #endif
> 
> +#define PACKED_BURST_MASK (PACKED_DESCS_BURST - 1)
> +
>  /**
>   * Structure contains buffer address, length and descriptor index
>   * from vring to do scatter RX.
> diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> index 2b5c47145..c664b27c5 100644
> --- a/lib/librte_vhost/virtio_net.c
> +++ b/lib/librte_vhost/virtio_net.c
> @@ -895,6 +895,84 @@ virtio_dev_rx_split(struct virtio_net *dev, struct
> vhost_virtqueue *vq,
>  	return pkt_idx;
>  }
> 
> +static __rte_unused __rte_always_inline int
I remember "__rte_always_inline" should start at the first and separate line, otherwise you will get a style issue. 
/Gavin
> +virtio_dev_rx_burst_packed(struct virtio_net *dev, struct vhost_virtqueue
> *vq,
> +	 struct rte_mbuf **pkts)
> +{
> +	bool wrap_counter = vq->avail_wrap_counter;
> +	struct vring_packed_desc *descs = vq->desc_packed;
> +	uint16_t avail_idx = vq->last_avail_idx;
> +
> +	uint64_t desc_addrs[PACKED_DESCS_BURST];
> +	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_DESCS_BURST];
> +	uint32_t buf_offset = dev->vhost_hlen;
> +	uint64_t lens[PACKED_DESCS_BURST];
> +
> +	uint16_t i;
> +
> +	if (unlikely(avail_idx & PACKED_BURST_MASK))
> +		return -1;
> +
> +	UNROLL_PRAGMA(PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> +		if (unlikely(pkts[i]->next != NULL))
> +			return -1;
> +		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
> +					    wrap_counter)))
> +			return -1;
> +	}
> +
> +	rte_smp_rmb();
> +
> +	UNROLL_PRAGMA(PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_DESCS_BURST; i++)
> +		lens[i] = descs[avail_idx + i].len;
Looks like the code is a strong candidate for vectorization. 
> +
> +	UNROLL_PRAGMA(PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> +		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
> +			return -1;
> +	}
> +
> +	UNROLL_PRAGMA(PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_DESCS_BURST; i++)
> +		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
> +						 descs[avail_idx + i].addr,
> +						 &lens[i],
> +						 VHOST_ACCESS_RW);
> +	UNROLL_PRAGMA(PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> +		if (unlikely(lens[i] != descs[avail_idx + i].len))
> +			return -1;
> +	}
> +
> +	UNROLL_PRAGMA(PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> +		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
> +		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)desc_addrs[i];
> +		lens[i] = pkts[i]->pkt_len + dev->vhost_hlen;
> +	}
> +
> +	UNROLL_PRAGMA(PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_DESCS_BURST; i++)
> +		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
> +
A store barrier here is missing, last_avail_idx may be observed before the above enqueue completion on weak memory order architectures.
For x86, a compiler barrier is also required. 

> +	vq->last_avail_idx += PACKED_DESCS_BURST;
> +	if (vq->last_avail_idx >= vq->size) {
> +		vq->last_avail_idx -= vq->size;
> +		vq->avail_wrap_counter ^= 1;
> +	}
> +
> +	UNROLL_PRAGMA(PRAGMA_PARAM)
> +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> +		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
> +			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
> +			   pkts[i]->pkt_len);
> +	}
> +
> +	return 0;
> +}
> +
>  static __rte_unused int16_t
>  virtio_dev_rx_single_packed(struct virtio_net *dev, struct vhost_virtqueue
> *vq,
>  	struct rte_mbuf *pkt)
> --
> 2.17.1
  
Marvin Liu Sept. 24, 2019, 3:30 a.m. UTC | #3
Thanks, Gavin. My comments are inline.

> -----Original Message-----
> From: Gavin Hu (Arm Technology China) [mailto:Gavin.Hu@arm.com]
> Sent: Monday, September 23, 2019 7:09 PM
> To: Liu, Yong <yong.liu@intel.com>; maxime.coquelin@redhat.com; Bie, Tiwei
> <tiwei.bie@intel.com>; Wang, Zhihong <zhihong.wang@intel.com>
> Cc: dev@dpdk.org; nd <nd@arm.com>
> Subject: RE: [dpdk-dev] [PATCH v2 03/16] vhost: add burst enqueue function
> for packed ring
> 
> Hi Marvin,
> 
> Is it possible to vectorize the processing?
> Other comments inline:
> /Gavin

Gavin,
According to our experiment, only vectorize some parts in [ed]nqueue function can't benefit performance.
Functions like vhost_iova_to_vva and virtio_enqueue_offload can't be easily vectorized as they are full of judgment conditions.

Thanks,
Marvin

> > -----Original Message-----
> > From: dev <dev-bounces@dpdk.org> On Behalf Of Marvin Liu
> > Sent: Friday, September 20, 2019 12:37 AM
> > To: maxime.coquelin@redhat.com; tiwei.bie@intel.com;
> > zhihong.wang@intel.com
> > Cc: dev@dpdk.org; Marvin Liu <yong.liu@intel.com>
> > Subject: [dpdk-dev] [PATCH v2 03/16] vhost: add burst enqueue function
> for
> > packed ring
> >
> > Burst enqueue function will first check whether descriptors are cache
> > aligned. It will also check prerequisites in the beginning. Burst
> > enqueue function not support chained mbufs, single packet enqueue
> > function will handle it.
> >
> > Signed-off-by: Marvin Liu <yong.liu@intel.com>
> >
> > diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> > index 5074226f0..67889c80a 100644
> > --- a/lib/librte_vhost/vhost.h
> > +++ b/lib/librte_vhost/vhost.h
> > @@ -39,6 +39,9 @@
> >
> >  #define VHOST_LOG_CACHE_NR 32
> >
> > +#define PACKED_DESCS_BURST (RTE_CACHE_LINE_SIZE / \
> > +			    sizeof(struct vring_packed_desc))
> > +
> >  #ifdef SUPPORT_GCC_UNROLL_PRAGMA
> >  #define PRAGMA_PARAM "GCC unroll 4"
> >  #endif
> > @@ -57,6 +60,8 @@
> >  #define UNROLL_PRAGMA(param) do {} while(0);
> >  #endif
> >
> > +#define PACKED_BURST_MASK (PACKED_DESCS_BURST - 1)
> > +
> >  /**
> >   * Structure contains buffer address, length and descriptor index
> >   * from vring to do scatter RX.
> > diff --git a/lib/librte_vhost/virtio_net.c
> b/lib/librte_vhost/virtio_net.c
> > index 2b5c47145..c664b27c5 100644
> > --- a/lib/librte_vhost/virtio_net.c
> > +++ b/lib/librte_vhost/virtio_net.c
> > @@ -895,6 +895,84 @@ virtio_dev_rx_split(struct virtio_net *dev, struct
> > vhost_virtqueue *vq,
> >  	return pkt_idx;
> >  }
> >
> > +static __rte_unused __rte_always_inline int
> I remember "__rte_always_inline" should start at the first and separate
> line, otherwise you will get a style issue.
> /Gavin
> > +virtio_dev_rx_burst_packed(struct virtio_net *dev, struct
> vhost_virtqueue
> > *vq,
> > +	 struct rte_mbuf **pkts)
> > +{
> > +	bool wrap_counter = vq->avail_wrap_counter;
> > +	struct vring_packed_desc *descs = vq->desc_packed;
> > +	uint16_t avail_idx = vq->last_avail_idx;
> > +
> > +	uint64_t desc_addrs[PACKED_DESCS_BURST];
> > +	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_DESCS_BURST];
> > +	uint32_t buf_offset = dev->vhost_hlen;
> > +	uint64_t lens[PACKED_DESCS_BURST];
> > +
> > +	uint16_t i;
> > +
> > +	if (unlikely(avail_idx & PACKED_BURST_MASK))
> > +		return -1;
> > +
> > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> > +		if (unlikely(pkts[i]->next != NULL))
> > +			return -1;
> > +		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
> > +					    wrap_counter)))
> > +			return -1;
> > +	}
> > +
> > +	rte_smp_rmb();
> > +
> > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > +	for (i = 0; i < PACKED_DESCS_BURST; i++)
> > +		lens[i] = descs[avail_idx + i].len;
> Looks like the code is a strong candidate for vectorization.
> > +
> > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> > +		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
> > +			return -1;
> > +	}
> > +
> > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > +	for (i = 0; i < PACKED_DESCS_BURST; i++)
> > +		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
> > +						 descs[avail_idx + i].addr,
> > +						 &lens[i],
> > +						 VHOST_ACCESS_RW);
> > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> > +		if (unlikely(lens[i] != descs[avail_idx + i].len))
> > +			return -1;
> > +	}
> > +
> > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> > +		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
> > +		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)desc_addrs[i];
> > +		lens[i] = pkts[i]->pkt_len + dev->vhost_hlen;
> > +	}
> > +
> > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > +	for (i = 0; i < PACKED_DESCS_BURST; i++)
> > +		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
> > +
> A store barrier here is missing, last_avail_idx may be observed before the
> above enqueue completion on weak memory order architectures.
> For x86, a compiler barrier is also required.
> 
Thanks a lot for point out. I guess your mention is that need to add barrier between memcpy and enqueue.
last_avail_idx is just local variable, no barrier is need to protect it.

> > +	vq->last_avail_idx += PACKED_DESCS_BURST;
> > +	if (vq->last_avail_idx >= vq->size) {
> > +		vq->last_avail_idx -= vq->size;
> > +		vq->avail_wrap_counter ^= 1;
> > +	}
> > +
> > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> > +		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
> > +			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
> > +			   pkts[i]->pkt_len);
> > +	}
> > +
> > +	return 0;
> > +}
> > +
> >  static __rte_unused int16_t
> >  virtio_dev_rx_single_packed(struct virtio_net *dev, struct
> vhost_virtqueue
> > *vq,
> >  	struct rte_mbuf *pkt)
> > --
> > 2.17.1
  
Gavin Hu Sept. 25, 2019, 9:29 a.m. UTC | #4
Hi Marvin,

> -----Original Message-----
> From: Liu, Yong <yong.liu@intel.com>
> Sent: Tuesday, September 24, 2019 11:31 AM
> To: Gavin Hu (Arm Technology China) <Gavin.Hu@arm.com>;
> maxime.coquelin@redhat.com; Bie, Tiwei <tiwei.bie@intel.com>; Wang,
> Zhihong <zhihong.wang@intel.com>
> Cc: dev@dpdk.org; nd <nd@arm.com>
> Subject: RE: [dpdk-dev] [PATCH v2 03/16] vhost: add burst enqueue function
> for packed ring
> 
> Thanks, Gavin. My comments are inline.
> 
> > -----Original Message-----
> > From: Gavin Hu (Arm Technology China) [mailto:Gavin.Hu@arm.com]
> > Sent: Monday, September 23, 2019 7:09 PM
> > To: Liu, Yong <yong.liu@intel.com>; maxime.coquelin@redhat.com; Bie,
> Tiwei
> > <tiwei.bie@intel.com>; Wang, Zhihong <zhihong.wang@intel.com>
> > Cc: dev@dpdk.org; nd <nd@arm.com>
> > Subject: RE: [dpdk-dev] [PATCH v2 03/16] vhost: add burst enqueue
> function
> > for packed ring
> >
> > Hi Marvin,
> >
> > Is it possible to vectorize the processing?
> > Other comments inline:
> > /Gavin
> 
> Gavin,
> According to our experiment, only vectorize some parts in [ed]nqueue
> function can't benefit performance.
> Functions like vhost_iova_to_vva and virtio_enqueue_offload can't be
> easily vectorized as they are full of judgment conditions.
> 
> Thanks,
> Marvin
> 
> > > -----Original Message-----
> > > From: dev <dev-bounces@dpdk.org> On Behalf Of Marvin Liu
> > > Sent: Friday, September 20, 2019 12:37 AM
> > > To: maxime.coquelin@redhat.com; tiwei.bie@intel.com;
> > > zhihong.wang@intel.com
> > > Cc: dev@dpdk.org; Marvin Liu <yong.liu@intel.com>
> > > Subject: [dpdk-dev] [PATCH v2 03/16] vhost: add burst enqueue function
> > for
> > > packed ring
> > >
> > > Burst enqueue function will first check whether descriptors are cache
> > > aligned. It will also check prerequisites in the beginning. Burst
> > > enqueue function not support chained mbufs, single packet enqueue
> > > function will handle it.
> > >
> > > Signed-off-by: Marvin Liu <yong.liu@intel.com>
> > >
> > > diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> > > index 5074226f0..67889c80a 100644
> > > --- a/lib/librte_vhost/vhost.h
> > > +++ b/lib/librte_vhost/vhost.h
> > > @@ -39,6 +39,9 @@
> > >
> > >  #define VHOST_LOG_CACHE_NR 32
> > >
> > > +#define PACKED_DESCS_BURST (RTE_CACHE_LINE_SIZE / \
> > > +			    sizeof(struct vring_packed_desc))
> > > +
> > >  #ifdef SUPPORT_GCC_UNROLL_PRAGMA
> > >  #define PRAGMA_PARAM "GCC unroll 4"
> > >  #endif
> > > @@ -57,6 +60,8 @@
> > >  #define UNROLL_PRAGMA(param) do {} while(0);
> > >  #endif
> > >
> > > +#define PACKED_BURST_MASK (PACKED_DESCS_BURST - 1)
> > > +
> > >  /**
> > >   * Structure contains buffer address, length and descriptor index
> > >   * from vring to do scatter RX.
> > > diff --git a/lib/librte_vhost/virtio_net.c
> > b/lib/librte_vhost/virtio_net.c
> > > index 2b5c47145..c664b27c5 100644
> > > --- a/lib/librte_vhost/virtio_net.c
> > > +++ b/lib/librte_vhost/virtio_net.c
> > > @@ -895,6 +895,84 @@ virtio_dev_rx_split(struct virtio_net *dev,
> struct
> > > vhost_virtqueue *vq,
> > >  	return pkt_idx;
> > >  }
> > >
> > > +static __rte_unused __rte_always_inline int
> > I remember "__rte_always_inline" should start at the first and separate
> > line, otherwise you will get a style issue.
> > /Gavin
> > > +virtio_dev_rx_burst_packed(struct virtio_net *dev, struct
> > vhost_virtqueue
> > > *vq,
> > > +	 struct rte_mbuf **pkts)
> > > +{
> > > +	bool wrap_counter = vq->avail_wrap_counter;
> > > +	struct vring_packed_desc *descs = vq->desc_packed;
> > > +	uint16_t avail_idx = vq->last_avail_idx;
> > > +
> > > +	uint64_t desc_addrs[PACKED_DESCS_BURST];
> > > +	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_DESCS_BURST];
> > > +	uint32_t buf_offset = dev->vhost_hlen;
> > > +	uint64_t lens[PACKED_DESCS_BURST];
> > > +
> > > +	uint16_t i;
> > > +
> > > +	if (unlikely(avail_idx & PACKED_BURST_MASK))
> > > +		return -1;
> > > +
> > > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > > +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> > > +		if (unlikely(pkts[i]->next != NULL))
> > > +			return -1;
> > > +		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
> > > +					    wrap_counter)))
> > > +			return -1;
> > > +	}
> > > +
> > > +	rte_smp_rmb();
> > > +
> > > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > > +	for (i = 0; i < PACKED_DESCS_BURST; i++)
> > > +		lens[i] = descs[avail_idx + i].len;
> > Looks like the code is a strong candidate for vectorization.
> > > +
> > > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > > +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> > > +		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
> > > +			return -1;
> > > +	}
> > > +
> > > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > > +	for (i = 0; i < PACKED_DESCS_BURST; i++)
> > > +		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
> > > +						 descs[avail_idx + i].addr,
> > > +						 &lens[i],
> > > +						 VHOST_ACCESS_RW);
> > > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > > +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> > > +		if (unlikely(lens[i] != descs[avail_idx + i].len))
> > > +			return -1;
> > > +	}
> > > +
> > > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > > +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> > > +		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
> > > +		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)desc_addrs[i];
> > > +		lens[i] = pkts[i]->pkt_len + dev->vhost_hlen;
> > > +	}
> > > +
> > > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > > +	for (i = 0; i < PACKED_DESCS_BURST; i++)
> > > +		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
> > > +
> > A store barrier here is missing, last_avail_idx may be observed before the
> > above enqueue completion on weak memory order architectures.
> > For x86, a compiler barrier is also required.
> >
> Thanks a lot for point out. I guess your mention is that need to add barrier
> between memcpy and enqueue.
> last_avail_idx is just local variable, no barrier is need to protect it.

Sorry I was wrong, yes, last_avail_idx is a local variable(or we may call it meta data).
Copying the headers and payload does not need to be ordered, we just need to ensure all these happen before updating the idx, which is the single synchronization point.
In one word, no barriers are required here. 
/Gavin
> 
> > > +	vq->last_avail_idx += PACKED_DESCS_BURST;
> > > +	if (vq->last_avail_idx >= vq->size) {
> > > +		vq->last_avail_idx -= vq->size;
> > > +		vq->avail_wrap_counter ^= 1;
> > > +	}
> > > +
> > > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > > +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> > > +		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
> > > +			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
> > > +			   pkts[i]->pkt_len);
> > > +	}
> > > +
> > > +	return 0;
> > > +}
> > > +
> > >  static __rte_unused int16_t
> > >  virtio_dev_rx_single_packed(struct virtio_net *dev, struct
> > vhost_virtqueue
> > > *vq,
> > >  	struct rte_mbuf *pkt)
> > > --
> > > 2.17.1
  
Marvin Liu Sept. 25, 2019, 9:36 a.m. UTC | #5
> -----Original Message-----
> From: Gavin Hu (Arm Technology China) [mailto:Gavin.Hu@arm.com]
> Sent: Wednesday, September 25, 2019 5:29 PM
> To: Liu, Yong <yong.liu@intel.com>; maxime.coquelin@redhat.com; Bie, Tiwei
> <tiwei.bie@intel.com>; Wang, Zhihong <zhihong.wang@intel.com>
> Cc: dev@dpdk.org; nd <nd@arm.com>; nd <nd@arm.com>
> Subject: RE: [dpdk-dev] [PATCH v2 03/16] vhost: add burst enqueue function
> for packed ring
> 
> Hi Marvin,
> 
> > -----Original Message-----
> > From: Liu, Yong <yong.liu@intel.com>
> > Sent: Tuesday, September 24, 2019 11:31 AM
> > To: Gavin Hu (Arm Technology China) <Gavin.Hu@arm.com>;
> > maxime.coquelin@redhat.com; Bie, Tiwei <tiwei.bie@intel.com>; Wang,
> > Zhihong <zhihong.wang@intel.com>
> > Cc: dev@dpdk.org; nd <nd@arm.com>
> > Subject: RE: [dpdk-dev] [PATCH v2 03/16] vhost: add burst enqueue
> function
> > for packed ring
> >
> > Thanks, Gavin. My comments are inline.
> >
> > > -----Original Message-----
> > > From: Gavin Hu (Arm Technology China) [mailto:Gavin.Hu@arm.com]
> > > Sent: Monday, September 23, 2019 7:09 PM
> > > To: Liu, Yong <yong.liu@intel.com>; maxime.coquelin@redhat.com; Bie,
> > Tiwei
> > > <tiwei.bie@intel.com>; Wang, Zhihong <zhihong.wang@intel.com>
> > > Cc: dev@dpdk.org; nd <nd@arm.com>
> > > Subject: RE: [dpdk-dev] [PATCH v2 03/16] vhost: add burst enqueue
> > function
> > > for packed ring
> > >
> > > Hi Marvin,
> > >
> > > Is it possible to vectorize the processing?
> > > Other comments inline:
> > > /Gavin
> >
> > Gavin,
> > According to our experiment, only vectorize some parts in [ed]nqueue
> > function can't benefit performance.
> > Functions like vhost_iova_to_vva and virtio_enqueue_offload can't be
> > easily vectorized as they are full of judgment conditions.
> >
> > Thanks,
> > Marvin
> >
> > > > -----Original Message-----
> > > > From: dev <dev-bounces@dpdk.org> On Behalf Of Marvin Liu
> > > > Sent: Friday, September 20, 2019 12:37 AM
> > > > To: maxime.coquelin@redhat.com; tiwei.bie@intel.com;
> > > > zhihong.wang@intel.com
> > > > Cc: dev@dpdk.org; Marvin Liu <yong.liu@intel.com>
> > > > Subject: [dpdk-dev] [PATCH v2 03/16] vhost: add burst enqueue
> function
> > > for
> > > > packed ring
> > > >
> > > > Burst enqueue function will first check whether descriptors are cache
> > > > aligned. It will also check prerequisites in the beginning. Burst
> > > > enqueue function not support chained mbufs, single packet enqueue
> > > > function will handle it.
> > > >
> > > > Signed-off-by: Marvin Liu <yong.liu@intel.com>
> > > >
> > > > diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> > > > index 5074226f0..67889c80a 100644
> > > > --- a/lib/librte_vhost/vhost.h
> > > > +++ b/lib/librte_vhost/vhost.h
> > > > @@ -39,6 +39,9 @@
> > > >
> > > >  #define VHOST_LOG_CACHE_NR 32
> > > >
> > > > +#define PACKED_DESCS_BURST (RTE_CACHE_LINE_SIZE / \
> > > > +			    sizeof(struct vring_packed_desc))
> > > > +
> > > >  #ifdef SUPPORT_GCC_UNROLL_PRAGMA
> > > >  #define PRAGMA_PARAM "GCC unroll 4"
> > > >  #endif
> > > > @@ -57,6 +60,8 @@
> > > >  #define UNROLL_PRAGMA(param) do {} while(0);
> > > >  #endif
> > > >
> > > > +#define PACKED_BURST_MASK (PACKED_DESCS_BURST - 1)
> > > > +
> > > >  /**
> > > >   * Structure contains buffer address, length and descriptor index
> > > >   * from vring to do scatter RX.
> > > > diff --git a/lib/librte_vhost/virtio_net.c
> > > b/lib/librte_vhost/virtio_net.c
> > > > index 2b5c47145..c664b27c5 100644
> > > > --- a/lib/librte_vhost/virtio_net.c
> > > > +++ b/lib/librte_vhost/virtio_net.c
> > > > @@ -895,6 +895,84 @@ virtio_dev_rx_split(struct virtio_net *dev,
> > struct
> > > > vhost_virtqueue *vq,
> > > >  	return pkt_idx;
> > > >  }
> > > >
> > > > +static __rte_unused __rte_always_inline int
> > > I remember "__rte_always_inline" should start at the first and separate
> > > line, otherwise you will get a style issue.
> > > /Gavin
> > > > +virtio_dev_rx_burst_packed(struct virtio_net *dev, struct
> > > vhost_virtqueue
> > > > *vq,
> > > > +	 struct rte_mbuf **pkts)
> > > > +{
> > > > +	bool wrap_counter = vq->avail_wrap_counter;
> > > > +	struct vring_packed_desc *descs = vq->desc_packed;
> > > > +	uint16_t avail_idx = vq->last_avail_idx;
> > > > +
> > > > +	uint64_t desc_addrs[PACKED_DESCS_BURST];
> > > > +	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_DESCS_BURST];
> > > > +	uint32_t buf_offset = dev->vhost_hlen;
> > > > +	uint64_t lens[PACKED_DESCS_BURST];
> > > > +
> > > > +	uint16_t i;
> > > > +
> > > > +	if (unlikely(avail_idx & PACKED_BURST_MASK))
> > > > +		return -1;
> > > > +
> > > > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > > > +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> > > > +		if (unlikely(pkts[i]->next != NULL))
> > > > +			return -1;
> > > > +		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
> > > > +					    wrap_counter)))
> > > > +			return -1;
> > > > +	}
> > > > +
> > > > +	rte_smp_rmb();
> > > > +
> > > > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > > > +	for (i = 0; i < PACKED_DESCS_BURST; i++)
> > > > +		lens[i] = descs[avail_idx + i].len;
> > > Looks like the code is a strong candidate for vectorization.
> > > > +
> > > > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > > > +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> > > > +		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
> > > > +			return -1;
> > > > +	}
> > > > +
> > > > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > > > +	for (i = 0; i < PACKED_DESCS_BURST; i++)
> > > > +		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
> > > > +						 descs[avail_idx + i].addr,
> > > > +						 &lens[i],
> > > > +						 VHOST_ACCESS_RW);
> > > > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > > > +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> > > > +		if (unlikely(lens[i] != descs[avail_idx + i].len))
> > > > +			return -1;
> > > > +	}
> > > > +
> > > > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > > > +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> > > > +		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
> > > > +		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf
> *)desc_addrs[i];
> > > > +		lens[i] = pkts[i]->pkt_len + dev->vhost_hlen;
> > > > +	}
> > > > +
> > > > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > > > +	for (i = 0; i < PACKED_DESCS_BURST; i++)
> > > > +		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
> > > > +
> > > A store barrier here is missing, last_avail_idx may be observed before
> the
> > > above enqueue completion on weak memory order architectures.
> > > For x86, a compiler barrier is also required.
> > >
> > Thanks a lot for point out. I guess your mention is that need to add
> barrier
> > between memcpy and enqueue.
> > last_avail_idx is just local variable, no barrier is need to protect it.
> 
> Sorry I was wrong, yes, last_avail_idx is a local variable(or we may call
> it meta data).
> Copying the headers and payload does not need to be ordered, we just need
> to ensure all these happen before updating the idx, which is the single
> synchronization point.
> In one word, no barriers are required here.
> /Gavin
> >

NP:) Nothing changed here in V3 patch. Thanks for kindly reviewing. 

> > > > +	vq->last_avail_idx += PACKED_DESCS_BURST;
> > > > +	if (vq->last_avail_idx >= vq->size) {
> > > > +		vq->last_avail_idx -= vq->size;
> > > > +		vq->avail_wrap_counter ^= 1;
> > > > +	}
> > > > +
> > > > +	UNROLL_PRAGMA(PRAGMA_PARAM)
> > > > +	for (i = 0; i < PACKED_DESCS_BURST; i++) {
> > > > +		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] +
> buf_offset),
> > > > +			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
> > > > +			   pkts[i]->pkt_len);
> > > > +	}
> > > > +
> > > > +	return 0;
> > > > +}
> > > > +
> > > >  static __rte_unused int16_t
> > > >  virtio_dev_rx_single_packed(struct virtio_net *dev, struct
> > > vhost_virtqueue
> > > > *vq,
> > > >  	struct rte_mbuf *pkt)
> > > > --
> > > > 2.17.1
  

Patch

diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
index 5074226f0..67889c80a 100644
--- a/lib/librte_vhost/vhost.h
+++ b/lib/librte_vhost/vhost.h
@@ -39,6 +39,9 @@ 
 
 #define VHOST_LOG_CACHE_NR 32
 
+#define PACKED_DESCS_BURST (RTE_CACHE_LINE_SIZE / \
+			    sizeof(struct vring_packed_desc))
+
 #ifdef SUPPORT_GCC_UNROLL_PRAGMA
 #define PRAGMA_PARAM "GCC unroll 4"
 #endif
@@ -57,6 +60,8 @@ 
 #define UNROLL_PRAGMA(param) do {} while(0);
 #endif
 
+#define PACKED_BURST_MASK (PACKED_DESCS_BURST - 1)
+
 /**
  * Structure contains buffer address, length and descriptor index
  * from vring to do scatter RX.
diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index 2b5c47145..c664b27c5 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -895,6 +895,84 @@  virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	return pkt_idx;
 }
 
+static __rte_unused __rte_always_inline int
+virtio_dev_rx_burst_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
+	 struct rte_mbuf **pkts)
+{
+	bool wrap_counter = vq->avail_wrap_counter;
+	struct vring_packed_desc *descs = vq->desc_packed;
+	uint16_t avail_idx = vq->last_avail_idx;
+
+	uint64_t desc_addrs[PACKED_DESCS_BURST];
+	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_DESCS_BURST];
+	uint32_t buf_offset = dev->vhost_hlen;
+	uint64_t lens[PACKED_DESCS_BURST];
+
+	uint16_t i;
+
+	if (unlikely(avail_idx & PACKED_BURST_MASK))
+		return -1;
+
+	UNROLL_PRAGMA(PRAGMA_PARAM)
+	for (i = 0; i < PACKED_DESCS_BURST; i++) {
+		if (unlikely(pkts[i]->next != NULL))
+			return -1;
+		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
+					    wrap_counter)))
+			return -1;
+	}
+
+	rte_smp_rmb();
+
+	UNROLL_PRAGMA(PRAGMA_PARAM)
+	for (i = 0; i < PACKED_DESCS_BURST; i++)
+		lens[i] = descs[avail_idx + i].len;
+
+	UNROLL_PRAGMA(PRAGMA_PARAM)
+	for (i = 0; i < PACKED_DESCS_BURST; i++) {
+		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
+			return -1;
+	}
+
+	UNROLL_PRAGMA(PRAGMA_PARAM)
+	for (i = 0; i < PACKED_DESCS_BURST; i++)
+		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
+						 descs[avail_idx + i].addr,
+						 &lens[i],
+						 VHOST_ACCESS_RW);
+	UNROLL_PRAGMA(PRAGMA_PARAM)
+	for (i = 0; i < PACKED_DESCS_BURST; i++) {
+		if (unlikely(lens[i] != descs[avail_idx + i].len))
+			return -1;
+	}
+
+	UNROLL_PRAGMA(PRAGMA_PARAM)
+	for (i = 0; i < PACKED_DESCS_BURST; i++) {
+		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
+		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)desc_addrs[i];
+		lens[i] = pkts[i]->pkt_len + dev->vhost_hlen;
+	}
+
+	UNROLL_PRAGMA(PRAGMA_PARAM)
+	for (i = 0; i < PACKED_DESCS_BURST; i++)
+		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
+
+	vq->last_avail_idx += PACKED_DESCS_BURST;
+	if (vq->last_avail_idx >= vq->size) {
+		vq->last_avail_idx -= vq->size;
+		vq->avail_wrap_counter ^= 1;
+	}
+
+	UNROLL_PRAGMA(PRAGMA_PARAM)
+	for (i = 0; i < PACKED_DESCS_BURST; i++) {
+		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
+			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
+			   pkts[i]->pkt_len);
+	}
+
+	return 0;
+}
+
 static __rte_unused int16_t
 virtio_dev_rx_single_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	struct rte_mbuf *pkt)