[dpdk-dev,v3,15/21] vhost: packed queue enqueue path

Message ID 20180405101031.26468-16-jfreimann@redhat.com (mailing list archive)
State Changes Requested, archived
Delegated to: Maxime Coquelin
Headers

Checks

Context Check Description
ci/checkpatch warning coding style issues

Commit Message

Jens Freimann April 5, 2018, 10:10 a.m. UTC
  Implement enqueue of packets to the receive virtqueue.

Set descriptor flag VIRTQ_DESC_F_USED and toggle used wrap counter if
last descriptor in ring is used. Perform a write memory barrier before
flags are written to descriptor.

Chained descriptors are not supported with this patch.

Signed-off-by: Jens Freimann <jfreimann@redhat.com>
---
 lib/librte_vhost/virtio_net.c | 129 ++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 129 insertions(+)
  

Comments

Maxime Coquelin April 6, 2018, 9:36 a.m. UTC | #1
On 04/05/2018 12:10 PM, Jens Freimann wrote:
> Implement enqueue of packets to the receive virtqueue.
> 
> Set descriptor flag VIRTQ_DESC_F_USED and toggle used wrap counter if
> last descriptor in ring is used. Perform a write memory barrier before
> flags are written to descriptor.
> 
> Chained descriptors are not supported with this patch.
> 
> Signed-off-by: Jens Freimann <jfreimann@redhat.com>
> ---
>   lib/librte_vhost/virtio_net.c | 129 ++++++++++++++++++++++++++++++++++++++++++
>   1 file changed, 129 insertions(+)
> 
> diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> index 7eea1da04..578e5612e 100644
> --- a/lib/librte_vhost/virtio_net.c
> +++ b/lib/librte_vhost/virtio_net.c
> @@ -695,6 +695,135 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
>   	return pkt_idx;
>   }
>   
> +static inline uint32_t __attribute__((always_inline))
> +vhost_enqueue_burst_packed(struct virtio_net *dev, uint16_t queue_id,
> +	      struct rte_mbuf **pkts, uint32_t count)
> +{
> +	struct vhost_virtqueue *vq;
> +	struct vring_desc_packed *descs;
> +	uint16_t idx;
> +	uint16_t mask;
> +	uint16_t i;
> +
> +	vq = dev->virtqueue[queue_id];
> +
> +	rte_spinlock_lock(&vq->access_lock);
> +
> +	if (unlikely(vq->enabled == 0)) {
> +		i = 0;
> +		goto out_access_unlock;
> +	}
> +
> +	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
> +		vhost_user_iotlb_rd_lock(vq);
> +
> +	descs = vq->desc_packed;
> +	mask = vq->size - 1;
> +
> +	for (i = 0; i < count; i++) {
> +		uint32_t desc_avail, desc_offset;
> +		uint32_t mbuf_avail, mbuf_offset;
> +		uint32_t cpy_len;
> +		struct vring_desc_packed *desc;
> +		uint64_t desc_addr;
> +		struct virtio_net_hdr_mrg_rxbuf *hdr;
> +		struct rte_mbuf *m = pkts[i];
> +
> +		/* XXX: there is an assumption that no desc will be chained */
Is this assumption still true?
If not what are the plan to fix this?

> +		idx = vq->last_used_idx & mask;
> +		desc = &descs[idx];
> +
> +		if (!desc_is_avail(vq, desc))
IIUC, it means the ring is full.
I think this is an unlikely case, so maybe better to use the unlikely
macro here.

> +			break;
> +		rte_smp_rmb();
> +
> +		desc_addr = vhost_iova_to_vva(dev, vq, desc->addr,
> +					      sizeof(*desc), VHOST_ACCESS_RW);
> +		/*
> +		 * Checking of 'desc_addr' placed outside of 'unlikely' macro
> +		 * to avoid performance issue with some versions of gcc (4.8.4
> +		 * and 5.3.0) which otherwise stores offset on the stack instead
> +		 * of in a register.
> +		 */
> +		if (unlikely(desc->len < dev->vhost_hlen) || !desc_addr)
> +			break;
> +
> +		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr;
> +		virtio_enqueue_offload(m, &hdr->hdr);
> +		vhost_log_write(dev, desc->addr, dev->vhost_hlen);
> +		PRINT_PACKET(dev, (uintptr_t)desc_addr, dev->vhost_hlen, 0);
> +
> +		desc_offset = dev->vhost_hlen;
> +		desc_avail  = desc->len - dev->vhost_hlen;
> +
> +		mbuf_avail  = rte_pktmbuf_data_len(m);
> +		mbuf_offset = 0;
> +		while (mbuf_avail != 0 || m->next != NULL) {
> +			/* done with current mbuf, fetch next */
> +			if (mbuf_avail == 0) {
> +				m = m->next;
> +
> +				mbuf_offset = 0;
> +				mbuf_avail  = rte_pktmbuf_data_len(m);
> +			}
> +
> +			/* done with current desc buf, fetch next */
> +			if (desc_avail == 0) {
> +				if ((desc->flags & VRING_DESC_F_NEXT) == 0) {
> +					/* Room in vring buffer is not enough */
> +					goto out;
> +				}
> +
> +				idx = (idx+1) & (vq->size - 1);
> +				desc = &descs[idx];
> +				if (unlikely(!desc_is_avail(vq, desc)))
> +					goto out ;
> +
> +				desc_addr = vhost_iova_to_vva(dev, vq, desc->addr,
> +							      sizeof(*desc),
> +							      VHOST_ACCESS_RW);
> +				if (unlikely(!desc_addr))
> +					goto out;
> +
> +				desc_offset = 0;
> +				desc_avail  = desc->len;
> +			}
> +
> +			cpy_len = RTE_MIN(desc_avail, mbuf_avail);
> +			rte_memcpy((void *)((uintptr_t)(desc_addr + desc_offset)),
> +				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
> +				cpy_len);
> +			vhost_log_write(dev, desc->addr + desc_offset, cpy_len);
> +			PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
> +				     cpy_len, 0);
> +
> +			mbuf_avail  -= cpy_len;
> +			mbuf_offset += cpy_len;
> +			desc_avail  -= cpy_len;
> +			desc_offset += cpy_len;
> +		}
> +
> +		descs[idx].len = pkts[i]->pkt_len + dev->vhost_hlen;
> +		rte_smp_wmb();
> +		set_desc_used(vq, desc);
> +
> +		vq->last_used_idx++;
> +		if ((vq->last_used_idx & (vq->size - 1)) == 0)
> +			toggle_wrap_counter(vq);
> +	}
> +
> +out:
> +	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
> +		vhost_user_iotlb_rd_unlock(vq);
> +
> +out_access_unlock:
> +	rte_spinlock_unlock(&vq->access_lock);
> +
> +	count = i;
> +
> +	return count;
> +}
> +
>   uint16_t
>   rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
>   	struct rte_mbuf **pkts, uint16_t count)
>
  
Jens Freimann April 6, 2018, 12:17 p.m. UTC | #2
On Fri, Apr 06, 2018 at 11:36:03AM +0200, Maxime Coquelin wrote:
>
>
>On 04/05/2018 12:10 PM, Jens Freimann wrote:
>>Implement enqueue of packets to the receive virtqueue.
>>
>>Set descriptor flag VIRTQ_DESC_F_USED and toggle used wrap counter if
>>last descriptor in ring is used. Perform a write memory barrier before
>>flags are written to descriptor.
>>
>>Chained descriptors are not supported with this patch.
>>
>>Signed-off-by: Jens Freimann <jfreimann@redhat.com>
>>---
>>  lib/librte_vhost/virtio_net.c | 129 ++++++++++++++++++++++++++++++++++++++++++
>>  1 file changed, 129 insertions(+)
>>
>>diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
>>index 7eea1da04..578e5612e 100644
>>--- a/lib/librte_vhost/virtio_net.c
>>+++ b/lib/librte_vhost/virtio_net.c
>>@@ -695,6 +695,135 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
>>  	return pkt_idx;
>>  }
>>+static inline uint32_t __attribute__((always_inline))
>>+vhost_enqueue_burst_packed(struct virtio_net *dev, uint16_t queue_id,
>>+	      struct rte_mbuf **pkts, uint32_t count)
>>+{
>>+	struct vhost_virtqueue *vq;
>>+	struct vring_desc_packed *descs;
>>+	uint16_t idx;
>>+	uint16_t mask;
>>+	uint16_t i;
>>+
>>+	vq = dev->virtqueue[queue_id];
>>+
>>+	rte_spinlock_lock(&vq->access_lock);
>>+
>>+	if (unlikely(vq->enabled == 0)) {
>>+		i = 0;
>>+		goto out_access_unlock;
>>+	}
>>+
>>+	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
>>+		vhost_user_iotlb_rd_lock(vq);
>>+
>>+	descs = vq->desc_packed;
>>+	mask = vq->size - 1;
>>+
>>+	for (i = 0; i < count; i++) {
>>+		uint32_t desc_avail, desc_offset;
>>+		uint32_t mbuf_avail, mbuf_offset;
>>+		uint32_t cpy_len;
>>+		struct vring_desc_packed *desc;
>>+		uint64_t desc_addr;
>>+		struct virtio_net_hdr_mrg_rxbuf *hdr;
>>+		struct rte_mbuf *m = pkts[i];
>>+
>>+		/* XXX: there is an assumption that no desc will be chained */
>Is this assumption still true?
>If not what are the plan to fix this?

This is a leftover from the prototype code. I checked the code and
don't see what it could still relate to except if it is supposed to
mean indirect instead of chained. I think the comment can be removed. 
>
>>+		idx = vq->last_used_idx & mask;
>>+		desc = &descs[idx];
>>+
>>+		if (!desc_is_avail(vq, desc))
>IIUC, it means the ring is full.
>I think this is an unlikely case, so maybe better to use the unlikely
>macro here.

yes, we can use unlikely here, will fix.

thanks!

regards,
Jens
  

Patch

diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index 7eea1da04..578e5612e 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -695,6 +695,135 @@  virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
 	return pkt_idx;
 }
 
+static inline uint32_t __attribute__((always_inline))
+vhost_enqueue_burst_packed(struct virtio_net *dev, uint16_t queue_id,
+	      struct rte_mbuf **pkts, uint32_t count)
+{
+	struct vhost_virtqueue *vq;
+	struct vring_desc_packed *descs;
+	uint16_t idx;
+	uint16_t mask;
+	uint16_t i;
+
+	vq = dev->virtqueue[queue_id];
+
+	rte_spinlock_lock(&vq->access_lock);
+
+	if (unlikely(vq->enabled == 0)) {
+		i = 0;
+		goto out_access_unlock;
+	}
+
+	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
+		vhost_user_iotlb_rd_lock(vq);
+
+	descs = vq->desc_packed;
+	mask = vq->size - 1;
+
+	for (i = 0; i < count; i++) {
+		uint32_t desc_avail, desc_offset;
+		uint32_t mbuf_avail, mbuf_offset;
+		uint32_t cpy_len;
+		struct vring_desc_packed *desc;
+		uint64_t desc_addr;
+		struct virtio_net_hdr_mrg_rxbuf *hdr;
+		struct rte_mbuf *m = pkts[i];
+
+		/* XXX: there is an assumption that no desc will be chained */
+		idx = vq->last_used_idx & mask;
+		desc = &descs[idx];
+
+		if (!desc_is_avail(vq, desc))
+			break;
+		rte_smp_rmb();
+
+		desc_addr = vhost_iova_to_vva(dev, vq, desc->addr,
+					      sizeof(*desc), VHOST_ACCESS_RW);
+		/*
+		 * Checking of 'desc_addr' placed outside of 'unlikely' macro
+		 * to avoid performance issue with some versions of gcc (4.8.4
+		 * and 5.3.0) which otherwise stores offset on the stack instead
+		 * of in a register.
+		 */
+		if (unlikely(desc->len < dev->vhost_hlen) || !desc_addr)
+			break;
+
+		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr;
+		virtio_enqueue_offload(m, &hdr->hdr);
+		vhost_log_write(dev, desc->addr, dev->vhost_hlen);
+		PRINT_PACKET(dev, (uintptr_t)desc_addr, dev->vhost_hlen, 0);
+
+		desc_offset = dev->vhost_hlen;
+		desc_avail  = desc->len - dev->vhost_hlen;
+
+		mbuf_avail  = rte_pktmbuf_data_len(m);
+		mbuf_offset = 0;
+		while (mbuf_avail != 0 || m->next != NULL) {
+			/* done with current mbuf, fetch next */
+			if (mbuf_avail == 0) {
+				m = m->next;
+
+				mbuf_offset = 0;
+				mbuf_avail  = rte_pktmbuf_data_len(m);
+			}
+
+			/* done with current desc buf, fetch next */
+			if (desc_avail == 0) {
+				if ((desc->flags & VRING_DESC_F_NEXT) == 0) {
+					/* Room in vring buffer is not enough */
+					goto out;
+				}
+
+				idx = (idx+1) & (vq->size - 1);
+				desc = &descs[idx];
+				if (unlikely(!desc_is_avail(vq, desc)))
+					goto out ;
+
+				desc_addr = vhost_iova_to_vva(dev, vq, desc->addr,
+							      sizeof(*desc),
+							      VHOST_ACCESS_RW);
+				if (unlikely(!desc_addr))
+					goto out;
+
+				desc_offset = 0;
+				desc_avail  = desc->len;
+			}
+
+			cpy_len = RTE_MIN(desc_avail, mbuf_avail);
+			rte_memcpy((void *)((uintptr_t)(desc_addr + desc_offset)),
+				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
+				cpy_len);
+			vhost_log_write(dev, desc->addr + desc_offset, cpy_len);
+			PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
+				     cpy_len, 0);
+
+			mbuf_avail  -= cpy_len;
+			mbuf_offset += cpy_len;
+			desc_avail  -= cpy_len;
+			desc_offset += cpy_len;
+		}
+
+		descs[idx].len = pkts[i]->pkt_len + dev->vhost_hlen;
+		rte_smp_wmb();
+		set_desc_used(vq, desc);
+
+		vq->last_used_idx++;
+		if ((vq->last_used_idx & (vq->size - 1)) == 0)
+			toggle_wrap_counter(vq);
+	}
+
+out:
+	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
+		vhost_user_iotlb_rd_unlock(vq);
+
+out_access_unlock:
+	rte_spinlock_unlock(&vq->access_lock);
+
+	count = i;
+
+	return count;
+}
+
 uint16_t
 rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
 	struct rte_mbuf **pkts, uint16_t count)