[v6,04/13] vhost: add packed ring batch enqueue

Message ID 20191015160739.51940-5-yong.liu@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Maxime Coquelin
Headers
Series vhost packed ring performance optimization |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Marvin Liu Oct. 15, 2019, 4:07 p.m. UTC
  Batch enqueue function will first check whether descriptors are cache
aligned. It will also check prerequisites in the beginning. Batch
enqueue function do not support chained mbufs, single packet enqueue
function will handle it.

Signed-off-by: Marvin Liu <yong.liu@intel.com>
Reviewed-by: Maxime Coquelin <maxime.coquelin@redhat.com>
  

Comments

Gavin Hu Oct. 15, 2019, 11:35 a.m. UTC | #1
Hi Marvin,

> -----Original Message-----
> From: Marvin Liu <yong.liu@intel.com>
> Sent: Wednesday, October 16, 2019 12:08 AM
> To: maxime.coquelin@redhat.com; tiwei.bie@intel.com;
> zhihong.wang@intel.com; stephen@networkplumber.org; Gavin Hu (Arm
> Technology China) <Gavin.Hu@arm.com>
> Cc: dev@dpdk.org; Marvin Liu <yong.liu@intel.com>
> Subject: [PATCH v6 04/13] vhost: add packed ring batch enqueue
> 
> Batch enqueue function will first check whether descriptors are cache
> aligned. It will also check prerequisites in the beginning. Batch
> enqueue function do not support chained mbufs, single packet enqueue
> function will handle it.
> 
> Signed-off-by: Marvin Liu <yong.liu@intel.com>
> Reviewed-by: Maxime Coquelin <maxime.coquelin@redhat.com>
> 
> diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> index 142c14e04..a8130dc06 100644
> --- a/lib/librte_vhost/virtio_net.c
> +++ b/lib/librte_vhost/virtio_net.c
> @@ -881,6 +881,76 @@ virtio_dev_rx_split(struct virtio_net *dev, struct
> vhost_virtqueue *vq,
>  	return pkt_idx;
>  }
> 
> +static __rte_unused int
> +virtio_dev_rx_batch_packed(struct virtio_net *dev,
> +			   struct vhost_virtqueue *vq,
> +			   struct rte_mbuf **pkts)
> +{
> +	bool wrap_counter = vq->avail_wrap_counter;
> +	struct vring_packed_desc *descs = vq->desc_packed;
> +	uint16_t avail_idx = vq->last_avail_idx;
> +	uint64_t desc_addrs[PACKED_BATCH_SIZE];
> +	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
> +	uint32_t buf_offset = dev->vhost_hlen;
> +	uint64_t lens[PACKED_BATCH_SIZE];
> +	uint16_t i;
> +
> +	if (unlikely(avail_idx & PACKED_BATCH_MASK))
> +		return -1;
> +
> +	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
> +		return -1;
> +
> +	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
> +		if (unlikely(pkts[i]->next != NULL))
> +			return -1;
> +		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
> +					    wrap_counter)))
> +			return -1;
> +	}
> +
> +	rte_smp_rmb();
> +
> +	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
> +		lens[i] = descs[avail_idx + i].len;
> +
> +	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
> +		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
> +			return -1;
> +	}
> +
> +	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
> +		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
> +						  descs[avail_idx + i].addr,
> +						  &lens[i],
> +						  VHOST_ACCESS_RW);
> +
> +	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
> +		if (unlikely(lens[i] != descs[avail_idx + i].len))
> +			return -1;
> +	}
> +
> +	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
> +		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
> +		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)
> +					(uintptr_t)desc_addrs[i];
> +		lens[i] = pkts[i]->pkt_len + dev->vhost_hlen;
> +	}
> +
> +	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
> +		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
> +
> +	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);

Is the last_avail_idx a shared variable? Why is updated before the following payload copy? 
This will cause the other side get earlier-than-arrival data? 
/Gavin
> +
> +	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
> +		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
> +			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
> +			   pkts[i]->pkt_len);
> +	}
> +
> +	return 0;
> +}
> +
>  static __rte_unused int16_t
>  virtio_dev_rx_single_packed(struct virtio_net *dev,
>  			    struct vhost_virtqueue *vq,
> --
> 2.17.1
  
Marvin Liu Oct. 15, 2019, 12:45 p.m. UTC | #2
> -----Original Message-----
> From: Gavin Hu (Arm Technology China) [mailto:Gavin.Hu@arm.com]
> Sent: Tuesday, October 15, 2019 7:36 PM
> To: Liu, Yong <yong.liu@intel.com>; maxime.coquelin@redhat.com; Bie, Tiwei
> <tiwei.bie@intel.com>; Wang, Zhihong <zhihong.wang@intel.com>;
> stephen@networkplumber.org
> Cc: dev@dpdk.org; nd <nd@arm.com>
> Subject: RE: [PATCH v6 04/13] vhost: add packed ring batch enqueue
> 
> Hi Marvin,
> 
> > -----Original Message-----
> > From: Marvin Liu <yong.liu@intel.com>
> > Sent: Wednesday, October 16, 2019 12:08 AM
> > To: maxime.coquelin@redhat.com; tiwei.bie@intel.com;
> > zhihong.wang@intel.com; stephen@networkplumber.org; Gavin Hu (Arm
> > Technology China) <Gavin.Hu@arm.com>
> > Cc: dev@dpdk.org; Marvin Liu <yong.liu@intel.com>
> > Subject: [PATCH v6 04/13] vhost: add packed ring batch enqueue
> >
> > Batch enqueue function will first check whether descriptors are cache
> > aligned. It will also check prerequisites in the beginning. Batch
> > enqueue function do not support chained mbufs, single packet enqueue
> > function will handle it.
> >
> > Signed-off-by: Marvin Liu <yong.liu@intel.com>
> > Reviewed-by: Maxime Coquelin <maxime.coquelin@redhat.com>
> >
> > diff --git a/lib/librte_vhost/virtio_net.c
> b/lib/librte_vhost/virtio_net.c
> > index 142c14e04..a8130dc06 100644
> > --- a/lib/librte_vhost/virtio_net.c
> > +++ b/lib/librte_vhost/virtio_net.c
> > @@ -881,6 +881,76 @@ virtio_dev_rx_split(struct virtio_net *dev, struct
> > vhost_virtqueue *vq,
> >  	return pkt_idx;
> >  }
> >
> > +static __rte_unused int
> > +virtio_dev_rx_batch_packed(struct virtio_net *dev,
> > +			   struct vhost_virtqueue *vq,
> > +			   struct rte_mbuf **pkts)
> > +{
> > +	bool wrap_counter = vq->avail_wrap_counter;
> > +	struct vring_packed_desc *descs = vq->desc_packed;
> > +	uint16_t avail_idx = vq->last_avail_idx;
> > +	uint64_t desc_addrs[PACKED_BATCH_SIZE];
> > +	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
> > +	uint32_t buf_offset = dev->vhost_hlen;
> > +	uint64_t lens[PACKED_BATCH_SIZE];
> > +	uint16_t i;
> > +
> > +	if (unlikely(avail_idx & PACKED_BATCH_MASK))
> > +		return -1;
> > +
> > +	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
> > +		return -1;
> > +
> > +	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
> > +		if (unlikely(pkts[i]->next != NULL))
> > +			return -1;
> > +		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
> > +					    wrap_counter)))
> > +			return -1;
> > +	}
> > +
> > +	rte_smp_rmb();
> > +
> > +	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
> > +		lens[i] = descs[avail_idx + i].len;
> > +
> > +	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
> > +		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
> > +			return -1;
> > +	}
> > +
> > +	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
> > +		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
> > +						  descs[avail_idx + i].addr,
> > +						  &lens[i],
> > +						  VHOST_ACCESS_RW);
> > +
> > +	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
> > +		if (unlikely(lens[i] != descs[avail_idx + i].len))
> > +			return -1;
> > +	}
> > +
> > +	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
> > +		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
> > +		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)
> > +					(uintptr_t)desc_addrs[i];
> > +		lens[i] = pkts[i]->pkt_len + dev->vhost_hlen;
> > +	}
> > +
> > +	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
> > +		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
> > +
> > +	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
> 
> Is the last_avail_idx a shared variable? Why is updated before the
> following payload copy?
> This will cause the other side get earlier-than-arrival data?
> /Gavin

Hi Gavin,
Last_avail_idx and last_used_idx are all vhost local variables. 
They are used for tracking next available and used index of virtqueue.
Last avail_idx value should increase after descs are consumed.
Last used_idx value should increase after descs flags are updated.

Thanks,
Marvin

> > +
> > +	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
> > +		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
> > +			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
> > +			   pkts[i]->pkt_len);
> > +	}
> > +
> > +	return 0;
> > +}
> > +
> >  static __rte_unused int16_t
> >  virtio_dev_rx_single_packed(struct virtio_net *dev,
> >  			    struct vhost_virtqueue *vq,
> > --
> > 2.17.1
  

Patch

diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index 142c14e04..a8130dc06 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -881,6 +881,76 @@  virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	return pkt_idx;
 }
 
+static __rte_unused int
+virtio_dev_rx_batch_packed(struct virtio_net *dev,
+			   struct vhost_virtqueue *vq,
+			   struct rte_mbuf **pkts)
+{
+	bool wrap_counter = vq->avail_wrap_counter;
+	struct vring_packed_desc *descs = vq->desc_packed;
+	uint16_t avail_idx = vq->last_avail_idx;
+	uint64_t desc_addrs[PACKED_BATCH_SIZE];
+	struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE];
+	uint32_t buf_offset = dev->vhost_hlen;
+	uint64_t lens[PACKED_BATCH_SIZE];
+	uint16_t i;
+
+	if (unlikely(avail_idx & PACKED_BATCH_MASK))
+		return -1;
+
+	if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
+		return -1;
+
+	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+		if (unlikely(pkts[i]->next != NULL))
+			return -1;
+		if (unlikely(!desc_is_avail(&descs[avail_idx + i],
+					    wrap_counter)))
+			return -1;
+	}
+
+	rte_smp_rmb();
+
+	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
+		lens[i] = descs[avail_idx + i].len;
+
+	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+		if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset)))
+			return -1;
+	}
+
+	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
+		desc_addrs[i] = vhost_iova_to_vva(dev, vq,
+						  descs[avail_idx + i].addr,
+						  &lens[i],
+						  VHOST_ACCESS_RW);
+
+	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+		if (unlikely(lens[i] != descs[avail_idx + i].len))
+			return -1;
+	}
+
+	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+		rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
+		hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)
+					(uintptr_t)desc_addrs[i];
+		lens[i] = pkts[i]->pkt_len + dev->vhost_hlen;
+	}
+
+	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
+		virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr);
+
+	vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
+
+	for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+		rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset),
+			   rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
+			   pkts[i]->pkt_len);
+	}
+
+	return 0;
+}
+
 static __rte_unused int16_t
 virtio_dev_rx_single_packed(struct virtio_net *dev,
 			    struct vhost_virtqueue *vq,