[dpdk-dev,v4,2/6] vhost: rewrite enqueue

Message ID 1472528164-54296-3-git-send-email-zhihong.wang@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Yuanhan Liu
Headers

Commit Message

Zhihong Wang Aug. 30, 2016, 3:36 a.m. UTC
  This patch implements the vhost logic from scratch into a single function
designed for high performance and better maintainability.

This is the baseline version of the new code, more optimization will be
added in the following patches in this patch set.

---
Changes in v4:

 1. Refactor the code for clearer logic.

 2. Add PRINT_PACKET for debugging.

---
Changes in v3:

 1. Rewrite enqueue and delete the obsolete in the same patch.

Signed-off-by: Zhihong Wang <zhihong.wang@intel.com>
---
 lib/librte_vhost/vhost_rxtx.c | 525 ++++++++++++------------------------------
 1 file changed, 145 insertions(+), 380 deletions(-)
  

Comments

Yuanhan Liu Sept. 5, 2016, 6:39 a.m. UTC | #1
On Mon, Aug 29, 2016 at 11:36:00PM -0400, Zhihong Wang wrote:
> This patch implements the vhost logic from scratch into a single function
> designed for high performance and better maintainability.
> 
> This is the baseline version of the new code, more optimization will be
> added in the following patches in this patch set.
> 
> ---
> Changes in v4:
> 
>  1. Refactor the code for clearer logic.
> 
>  2. Add PRINT_PACKET for debugging.
> 
> ---
> Changes in v3:
> 
>  1. Rewrite enqueue and delete the obsolete in the same patch.

Change log should go ----> 

> Signed-off-by: Zhihong Wang <zhihong.wang@intel.com>
> ---

... here, after the SoB.

>  lib/librte_vhost/vhost_rxtx.c | 525 ++++++++++++------------------------------
>  1 file changed, 145 insertions(+), 380 deletions(-)
> 
> diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
> index 5806f99..629e8ae 100644
> --- a/lib/librte_vhost/vhost_rxtx.c
> +++ b/lib/librte_vhost/vhost_rxtx.c
> @@ -91,7 +91,7 @@ is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t qp_nb)
>  	return (is_tx ^ (idx & 1)) == 0 && idx < qp_nb * VIRTIO_QNUM;
>  }
>  
> -static void
> +static inline void __attribute__((always_inline))
>  virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
>  {
>  	if (m_buf->ol_flags & PKT_TX_L4_MASK) {
> @@ -112,6 +112,10 @@ virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
>  						cksum));
>  			break;
>  		}
> +	} else {
> +		net_hdr->flags = 0;
> +		net_hdr->csum_start = 0;
> +		net_hdr->csum_offset = 0;
>  	}
>  
>  	if (m_buf->ol_flags & PKT_TX_TCP_SEG) {
> @@ -122,437 +126,198 @@ virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
>  		net_hdr->gso_size = m_buf->tso_segsz;
>  		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len
>  					+ m_buf->l4_len;
> +	} else {
> +		net_hdr->gso_type = 0;
> +		net_hdr->hdr_len = 0;
> +		net_hdr->gso_size = 0;
>  	}
>  }
>  
> -static inline void
> -copy_virtio_net_hdr(struct virtio_net *dev, uint64_t desc_addr,
> -		    struct virtio_net_hdr_mrg_rxbuf hdr)
> +static inline void __attribute__((always_inline))
> +update_used_ring(struct virtio_net *dev, struct vhost_virtqueue *vq,
> +		uint32_t desc_chain_head, uint32_t desc_chain_len)
>  {
> -	if (dev->vhost_hlen == sizeof(struct virtio_net_hdr_mrg_rxbuf))
> -		*(struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr = hdr;
> -	else
> -		*(struct virtio_net_hdr *)(uintptr_t)desc_addr = hdr.hdr;
> +	uint32_t used_idx_round = vq->last_used_idx & (vq->size - 1);

I'd suggest to use "used_idx", instead of "used_idx_round".

> +
> +	vq->used->ring[used_idx_round].id = desc_chain_head;
> +	vq->used->ring[used_idx_round].len = desc_chain_len;
> +	vhost_log_used_vring(dev, vq, offsetof(struct vring_used,
> +				ring[used_idx_round]),
> +			sizeof(vq->used->ring[used_idx_round]));
>  }
>  
> -static inline int __attribute__((always_inline))
> -copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
> -		  struct rte_mbuf *m, uint16_t desc_idx)
> +static inline uint32_t __attribute__((always_inline))
> +enqueue_packet(struct virtio_net *dev, struct vhost_virtqueue *vq,
> +		uint16_t avail_idx, struct rte_mbuf *mbuf,
> +		uint32_t is_mrg_rxbuf)
>  {
> -	uint32_t desc_avail, desc_offset;
> -	uint32_t mbuf_avail, mbuf_offset;
> -	uint32_t cpy_len;
> +	struct virtio_net_hdr_mrg_rxbuf *virtio_hdr;
>  	struct vring_desc *desc;
>  	uint64_t desc_addr;
> -	struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
> -
> -	desc = &vq->desc[desc_idx];
> +	uint32_t desc_chain_head;
> +	uint32_t desc_chain_len;
> +	uint32_t desc_current;
> +	uint32_t desc_offset;
> +	uint32_t mbuf_len;
> +	uint32_t mbuf_avail;
> +	uint32_t copy_len;
> +	uint32_t extra_buffers = 0;

I'd name it to "num_buffers", to keep consistent with the virito hdr
naming style.

> +
> +	/* start with the first mbuf of the packet */
> +	mbuf_len = rte_pktmbuf_data_len(mbuf);
> +	mbuf_avail = mbuf_len;
> +
> +	/* get the current desc */
> +	desc_current = vq->avail->ring[(vq->last_used_idx) & (vq->size - 1)];
> +	desc_chain_head = desc_current;
> +	desc = &vq->desc[desc_current];
>  	desc_addr = gpa_to_vva(dev, desc->addr);
> -	/*
> -	 * Checking of 'desc_addr' placed outside of 'unlikely' macro to avoid
> -	 * performance issue with some versions of gcc (4.8.4 and 5.3.0) which
> -	 * otherwise stores offset on the stack instead of in a register.
> -	 */
> -	if (unlikely(desc->len < dev->vhost_hlen) || !desc_addr)
> -		return -1;
> +	if (unlikely(!desc_addr))
> +		goto error;
>  
> -	rte_prefetch0((void *)(uintptr_t)desc_addr);
> +	/* handle virtio header */
> +	virtio_hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr;
> +	virtio_enqueue_offload(mbuf, &(virtio_hdr->hdr));
> +	if (is_mrg_rxbuf)
> +		virtio_hdr->num_buffers = extra_buffers + 1;
>  
> -	virtio_enqueue_offload(m, &virtio_hdr.hdr);
> -	copy_virtio_net_hdr(dev, desc_addr, virtio_hdr);
>  	vhost_log_write(dev, desc->addr, dev->vhost_hlen);
>  	PRINT_PACKET(dev, (uintptr_t)desc_addr, dev->vhost_hlen, 0);
> -
>  	desc_offset = dev->vhost_hlen;
> -	desc_avail  = desc->len - dev->vhost_hlen;
> -
> -	mbuf_avail  = rte_pktmbuf_data_len(m);
> -	mbuf_offset = 0;
> -	while (mbuf_avail != 0 || m->next != NULL) {
> -		/* done with current mbuf, fetch next */
> -		if (mbuf_avail == 0) {
> -			m = m->next;
> -
> -			mbuf_offset = 0;
> -			mbuf_avail  = rte_pktmbuf_data_len(m);
> +	desc_chain_len = desc_offset;
> +	desc_addr += desc_offset;
> +
> +	/* start copy from mbuf to desc */
> +	while (mbuf_avail || mbuf->next) {
> +		/* get the next mbuf if the current done */
> +		if (!mbuf_avail) {
> +			mbuf = mbuf->next;
> +			mbuf_len = rte_pktmbuf_data_len(mbuf);
> +			mbuf_avail = mbuf_len;
>  		}
>  
> -		/* done with current desc buf, fetch next */
> -		if (desc_avail == 0) {
> -			if ((desc->flags & VRING_DESC_F_NEXT) == 0) {
> -				/* Room in vring buffer is not enough */
> -				return -1;
> -			}
> -			if (unlikely(desc->next >= vq->size))
> -				return -1;
> +		/* get the next desc if the current done */
> +		if (desc->len <= desc_offset) {
> +			if (desc->flags & VRING_DESC_F_NEXT) {
> +				/* go on with the current desc chain */
> +				desc_offset = 0;
> +				desc_current = desc->next;
> +				desc = &vq->desc[desc_current];
> +				desc_addr = gpa_to_vva(dev, desc->addr);
> +				if (unlikely(!desc_addr))
> +					goto error;
> +			} else if (is_mrg_rxbuf) {
> +				/* start with the next desc chain */
> +				update_used_ring(dev, vq, desc_chain_head,
> +						desc_chain_len);
> +				vq->last_used_idx++;

Why not putting "vq->last_used_idx++" into update_used_ring()?

> +				extra_buffers++;
> +				virtio_hdr->num_buffers++;
> +				if (avail_idx == vq->last_used_idx)
> +					goto error;
> +
> +				desc_current =
> +					vq->avail->ring[(vq->last_used_idx) &
> +					(vq->size - 1)];
> +				desc_chain_head = desc_current;
> +				desc = &vq->desc[desc_current];
> +				desc_addr = gpa_to_vva(dev, desc->addr);
> +				if (unlikely(!desc_addr))
> +					goto error;
>  
> -			desc = &vq->desc[desc->next];
> -			desc_addr = gpa_to_vva(dev, desc->addr);
> -			if (unlikely(!desc_addr))
> -				return -1;
> -
> -			desc_offset = 0;
> -			desc_avail  = desc->len;
> +				desc_chain_len = 0;
> +				desc_offset = 0;
> +			} else
> +				goto error;
>  		}
>  
> -		cpy_len = RTE_MIN(desc_avail, mbuf_avail);
> -		rte_memcpy((void *)((uintptr_t)(desc_addr + desc_offset)),
> -			rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
> -			cpy_len);
> -		vhost_log_write(dev, desc->addr + desc_offset, cpy_len);
> -		PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
> -			     cpy_len, 0);
> -
> -		mbuf_avail  -= cpy_len;
> -		mbuf_offset += cpy_len;
> -		desc_avail  -= cpy_len;
> -		desc_offset += cpy_len;
> +		/* copy mbuf data */
> +		copy_len = RTE_MIN(desc->len - desc_offset, mbuf_avail);

TBH, I'm okay with copy_len (actually, I prefer it slightly). However,
the old code uses cpy_len, the current dequeue function also uses cpy_len,
I then see no good reason to use copy_len here. It's really not a good
idea to me to use two different naming styles in one source file.


> +		rte_memcpy((void *)(uintptr_t)desc_addr,
> +				rte_pktmbuf_mtod_offset(mbuf, void *,
> +					mbuf_len - mbuf_avail),

I would keep the old var "mbuf_offset" and do not introduce "mbuf_len".
This could avoid above calculation and make it straightforward.

> +				copy_len);
> +		vhost_log_write(dev, desc->addr + desc_offset, copy_len);
> +		PRINT_PACKET(dev, (uintptr_t)desc_addr, copy_len, 0);
> +		mbuf_avail -= copy_len;
> +		desc_offset += copy_len;
> +		desc_addr += copy_len;
> +		desc_chain_len += copy_len;

Vertical alighment[0] is not a must, but as you can see, it's a style I
prefer. Meaning, if possible, please follow it.

[0]: https://en.wikipedia.org/wiki/Programming_style#Vertical_alignment

>  	}
>  
> -	return 0;
> -}
> +	update_used_ring(dev, vq, desc_chain_head, desc_chain_len);
> +	vq->last_used_idx++;
>  
...
> -	rte_prefetch0(&vq->desc[desc_indexes[0]]);
> -	for (i = 0; i < count; i++) {
> -		uint16_t desc_idx = desc_indexes[i];
> -		int err;
> +	return 0;
>  
> -		err = copy_mbuf_to_desc(dev, vq, pkts[i], desc_idx);
> -		if (unlikely(err)) {
> -			used_idx = (start_idx + i) & (vq->size - 1);
> -			vq->used->ring[used_idx].len = dev->vhost_hlen;
> -			vhost_log_used_vring(dev, vq,
> -				offsetof(struct vring_used, ring[used_idx]),
> -				sizeof(vq->used->ring[used_idx]));
> -		}
> +error:
> +	/* rollback on any error if last_used_idx update on-the-fly */
> +	vq->last_used_idx -= extra_buffers;
>  
> -		if (i + 1 < count)
> -			rte_prefetch0(&vq->desc[desc_indexes[i+1]]);
> -	}
> +	return 1;

We normally returns -1 (but not 1) on error.

>  
> +static inline void __attribute__((always_inline))
> +notify_guest(struct virtio_net *dev, struct vhost_virtqueue *vq)
> +{
>  	rte_smp_wmb();
> -
> -	*(volatile uint16_t *)&vq->used->idx += count;
> -	vq->last_used_idx += count;
> -	vhost_log_used_vring(dev, vq,
> -		offsetof(struct vring_used, idx),
> -		sizeof(vq->used->idx));
> -
> -	/* flush used->idx update before we read avail->flags. */
> +	vq->used->idx = vq->last_used_idx;

I will not drop the "volatile" cast here, sliently. You know this kind of
stuff is tricky and would be painful to debug if it cause any issue. Such
removal deserves a patch, as well as some explanations.

> +	vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
> +			sizeof(vq->used->idx));
>  	rte_mb();
> -
> -	/* Kick the guest if necessary. */
>  	if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)
>  			&& (vq->callfd >= 0))
>  		eventfd_write(vq->callfd, (eventfd_t)1);
> -	return count;
>  }
>  
...
> +	/* start enqueuing packets 1 by 1 */
> +	avail_idx = *((volatile uint16_t *)&vq->avail->idx);
> +	while (pkt_left && avail_idx != vq->last_used_idx) {
> +		if (enqueue_packet(dev, vq, avail_idx, pkts[pkt_idx],
> +					is_mrg_rxbuf))
>  			break;
> -		}
> -
> -		nr_used = copy_mbuf_to_desc_mergeable(dev, vq, end,
> -						      pkts[pkt_idx], buf_vec);
> -		rte_smp_wmb();
> -
> -		*(volatile uint16_t *)&vq->used->idx += nr_used;
> -		vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
> -			sizeof(vq->used->idx));
> -		vq->last_used_idx += nr_used;
> -	}
> -
> -	if (likely(pkt_idx)) {
> -		/* flush used->idx update before we read avail->flags. */
> -		rte_mb();
>  
> -		/* Kick the guest if necessary. */
> -		if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)
> -				&& (vq->callfd >= 0))
> -			eventfd_write(vq->callfd, (eventfd_t)1);
> +		pkt_idx++;
> +		pkt_sent++;

pkt_idx and pkt_sent is duplicate here.

	--yliu

> +		pkt_left--;
>  	}
>  
> -	return pkt_idx;
> -}
> -
> -uint16_t
> -rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
> -	struct rte_mbuf **pkts, uint16_t count)
> -{
> -	struct virtio_net *dev = get_device(vid);
> -
> -	if (!dev)
> -		return 0;
> +	/* update used idx and kick the guest if necessary */
> +	if (pkt_sent)
> +		notify_guest(dev, vq);
>  
> -	if (dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF))
> -		return virtio_dev_merge_rx(dev, queue_id, pkts, count);
> -	else
> -		return virtio_dev_rx(dev, queue_id, pkts, count);
> +	return pkt_sent;
>  }
>  
>  static void
> -- 
> 2.7.4
  
Yuanhan Liu Sept. 7, 2016, 5:33 a.m. UTC | #2
Hmmm, yet another email didn't send out successfully. Resend.

BTW, please work out v5 on top of the latest next-virtio tree.

Thanks.

	--yliu

On Mon, Sep 05, 2016 at 02:39:25PM +0800, Yuanhan Liu wrote:
----
On Mon, Aug 29, 2016 at 11:36:00PM -0400, Zhihong Wang wrote:
> This patch implements the vhost logic from scratch into a single function
> designed for high performance and better maintainability.
> 
> This is the baseline version of the new code, more optimization will be
> added in the following patches in this patch set.
> 
> ---
> Changes in v4:
> 
>  1. Refactor the code for clearer logic.
> 
>  2. Add PRINT_PACKET for debugging.
> 
> ---
> Changes in v3:
> 
>  1. Rewrite enqueue and delete the obsolete in the same patch.

Change log should go ----> 

> Signed-off-by: Zhihong Wang <zhihong.wang@intel.com>
> ---

... here, after the SoB.

>  lib/librte_vhost/vhost_rxtx.c | 525 ++++++++++++------------------------------
>  1 file changed, 145 insertions(+), 380 deletions(-)
> 
> diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
> index 5806f99..629e8ae 100644
> --- a/lib/librte_vhost/vhost_rxtx.c
> +++ b/lib/librte_vhost/vhost_rxtx.c
> @@ -91,7 +91,7 @@ is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t qp_nb)
>  	return (is_tx ^ (idx & 1)) == 0 && idx < qp_nb * VIRTIO_QNUM;
>  }
>  
> -static void
> +static inline void __attribute__((always_inline))
>  virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
>  {
>  	if (m_buf->ol_flags & PKT_TX_L4_MASK) {
> @@ -112,6 +112,10 @@ virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
>  						cksum));
>  			break;
>  		}
> +	} else {
> +		net_hdr->flags = 0;
> +		net_hdr->csum_start = 0;
> +		net_hdr->csum_offset = 0;
>  	}
>  
>  	if (m_buf->ol_flags & PKT_TX_TCP_SEG) {
> @@ -122,437 +126,198 @@ virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
>  		net_hdr->gso_size = m_buf->tso_segsz;
>  		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len
>  					+ m_buf->l4_len;
> +	} else {
> +		net_hdr->gso_type = 0;
> +		net_hdr->hdr_len = 0;
> +		net_hdr->gso_size = 0;
>  	}
>  }
>  
> -static inline void
> -copy_virtio_net_hdr(struct virtio_net *dev, uint64_t desc_addr,
> -		    struct virtio_net_hdr_mrg_rxbuf hdr)
> +static inline void __attribute__((always_inline))
> +update_used_ring(struct virtio_net *dev, struct vhost_virtqueue *vq,
> +		uint32_t desc_chain_head, uint32_t desc_chain_len)
>  {
> -	if (dev->vhost_hlen == sizeof(struct virtio_net_hdr_mrg_rxbuf))
> -		*(struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr = hdr;
> -	else
> -		*(struct virtio_net_hdr *)(uintptr_t)desc_addr = hdr.hdr;
> +	uint32_t used_idx_round = vq->last_used_idx & (vq->size - 1);

I'd suggest to use "used_idx", instead of "used_idx_round".

> +
> +	vq->used->ring[used_idx_round].id = desc_chain_head;
> +	vq->used->ring[used_idx_round].len = desc_chain_len;
> +	vhost_log_used_vring(dev, vq, offsetof(struct vring_used,
> +				ring[used_idx_round]),
> +			sizeof(vq->used->ring[used_idx_round]));
>  }
>  
> -static inline int __attribute__((always_inline))
> -copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
> -		  struct rte_mbuf *m, uint16_t desc_idx)
> +static inline uint32_t __attribute__((always_inline))
> +enqueue_packet(struct virtio_net *dev, struct vhost_virtqueue *vq,
> +		uint16_t avail_idx, struct rte_mbuf *mbuf,
> +		uint32_t is_mrg_rxbuf)
>  {
> -	uint32_t desc_avail, desc_offset;
> -	uint32_t mbuf_avail, mbuf_offset;
> -	uint32_t cpy_len;
> +	struct virtio_net_hdr_mrg_rxbuf *virtio_hdr;
>  	struct vring_desc *desc;
>  	uint64_t desc_addr;
> -	struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
> -
> -	desc = &vq->desc[desc_idx];
> +	uint32_t desc_chain_head;
> +	uint32_t desc_chain_len;
> +	uint32_t desc_current;
> +	uint32_t desc_offset;
> +	uint32_t mbuf_len;
> +	uint32_t mbuf_avail;
> +	uint32_t copy_len;
> +	uint32_t extra_buffers = 0;

I'd name it to "num_buffers", to keep consistent with the virito hdr
naming style.

> +
> +	/* start with the first mbuf of the packet */
> +	mbuf_len = rte_pktmbuf_data_len(mbuf);
> +	mbuf_avail = mbuf_len;
> +
> +	/* get the current desc */
> +	desc_current = vq->avail->ring[(vq->last_used_idx) & (vq->size - 1)];
> +	desc_chain_head = desc_current;
> +	desc = &vq->desc[desc_current];
>  	desc_addr = gpa_to_vva(dev, desc->addr);
> -	/*
> -	 * Checking of 'desc_addr' placed outside of 'unlikely' macro to avoid
> -	 * performance issue with some versions of gcc (4.8.4 and 5.3.0) which
> -	 * otherwise stores offset on the stack instead of in a register.
> -	 */
> -	if (unlikely(desc->len < dev->vhost_hlen) || !desc_addr)
> -		return -1;
> +	if (unlikely(!desc_addr))
> +		goto error;
>  
> -	rte_prefetch0((void *)(uintptr_t)desc_addr);
> +	/* handle virtio header */
> +	virtio_hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr;
> +	virtio_enqueue_offload(mbuf, &(virtio_hdr->hdr));
> +	if (is_mrg_rxbuf)
> +		virtio_hdr->num_buffers = extra_buffers + 1;
>  
> -	virtio_enqueue_offload(m, &virtio_hdr.hdr);
> -	copy_virtio_net_hdr(dev, desc_addr, virtio_hdr);
>  	vhost_log_write(dev, desc->addr, dev->vhost_hlen);
>  	PRINT_PACKET(dev, (uintptr_t)desc_addr, dev->vhost_hlen, 0);
> -
>  	desc_offset = dev->vhost_hlen;
> -	desc_avail  = desc->len - dev->vhost_hlen;
> -
> -	mbuf_avail  = rte_pktmbuf_data_len(m);
> -	mbuf_offset = 0;
> -	while (mbuf_avail != 0 || m->next != NULL) {
> -		/* done with current mbuf, fetch next */
> -		if (mbuf_avail == 0) {
> -			m = m->next;
> -
> -			mbuf_offset = 0;
> -			mbuf_avail  = rte_pktmbuf_data_len(m);
> +	desc_chain_len = desc_offset;
> +	desc_addr += desc_offset;
> +
> +	/* start copy from mbuf to desc */
> +	while (mbuf_avail || mbuf->next) {
> +		/* get the next mbuf if the current done */
> +		if (!mbuf_avail) {
> +			mbuf = mbuf->next;
> +			mbuf_len = rte_pktmbuf_data_len(mbuf);
> +			mbuf_avail = mbuf_len;
>  		}
>  
> -		/* done with current desc buf, fetch next */
> -		if (desc_avail == 0) {
> -			if ((desc->flags & VRING_DESC_F_NEXT) == 0) {
> -				/* Room in vring buffer is not enough */
> -				return -1;
> -			}
> -			if (unlikely(desc->next >= vq->size))
> -				return -1;
> +		/* get the next desc if the current done */
> +		if (desc->len <= desc_offset) {
> +			if (desc->flags & VRING_DESC_F_NEXT) {
> +				/* go on with the current desc chain */
> +				desc_offset = 0;
> +				desc_current = desc->next;
> +				desc = &vq->desc[desc_current];
> +				desc_addr = gpa_to_vva(dev, desc->addr);
> +				if (unlikely(!desc_addr))
> +					goto error;
> +			} else if (is_mrg_rxbuf) {
> +				/* start with the next desc chain */
> +				update_used_ring(dev, vq, desc_chain_head,
> +						desc_chain_len);
> +				vq->last_used_idx++;

Why not putting "vq->last_used_idx++" into update_used_ring()?

> +				extra_buffers++;
> +				virtio_hdr->num_buffers++;
> +				if (avail_idx == vq->last_used_idx)
> +					goto error;
> +
> +				desc_current =
> +					vq->avail->ring[(vq->last_used_idx) &
> +					(vq->size - 1)];
> +				desc_chain_head = desc_current;
> +				desc = &vq->desc[desc_current];
> +				desc_addr = gpa_to_vva(dev, desc->addr);
> +				if (unlikely(!desc_addr))
> +					goto error;
>  
> -			desc = &vq->desc[desc->next];
> -			desc_addr = gpa_to_vva(dev, desc->addr);
> -			if (unlikely(!desc_addr))
> -				return -1;
> -
> -			desc_offset = 0;
> -			desc_avail  = desc->len;
> +				desc_chain_len = 0;
> +				desc_offset = 0;
> +			} else
> +				goto error;
>  		}
>  
> -		cpy_len = RTE_MIN(desc_avail, mbuf_avail);
> -		rte_memcpy((void *)((uintptr_t)(desc_addr + desc_offset)),
> -			rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
> -			cpy_len);
> -		vhost_log_write(dev, desc->addr + desc_offset, cpy_len);
> -		PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
> -			     cpy_len, 0);
> -
> -		mbuf_avail  -= cpy_len;
> -		mbuf_offset += cpy_len;
> -		desc_avail  -= cpy_len;
> -		desc_offset += cpy_len;
> +		/* copy mbuf data */
> +		copy_len = RTE_MIN(desc->len - desc_offset, mbuf_avail);

TBH, I'm okay with copy_len (actually, I prefer it slightly). However,
the old code uses cpy_len, the current dequeue function also uses cpy_len,
I then see no good reason to use copy_len here. It's really not a good
idea to me to use two different naming styles in one source file.


> +		rte_memcpy((void *)(uintptr_t)desc_addr,
> +				rte_pktmbuf_mtod_offset(mbuf, void *,
> +					mbuf_len - mbuf_avail),

I would keep the old var "mbuf_offset" and do not introduce "mbuf_len".
This could avoid above calculation and make it straightforward.

> +				copy_len);
> +		vhost_log_write(dev, desc->addr + desc_offset, copy_len);
> +		PRINT_PACKET(dev, (uintptr_t)desc_addr, copy_len, 0);
> +		mbuf_avail -= copy_len;
> +		desc_offset += copy_len;
> +		desc_addr += copy_len;
> +		desc_chain_len += copy_len;

Vertical alighment[0] is not a must, but as you can see, it's a style I
prefer. Meaning, if possible, please follow it.

[0]: https://en.wikipedia.org/wiki/Programming_style#Vertical_alignment

>  	}
>  
> -	return 0;
> -}
> +	update_used_ring(dev, vq, desc_chain_head, desc_chain_len);
> +	vq->last_used_idx++;
>  
...
> -	rte_prefetch0(&vq->desc[desc_indexes[0]]);
> -	for (i = 0; i < count; i++) {
> -		uint16_t desc_idx = desc_indexes[i];
> -		int err;
> +	return 0;
>  
> -		err = copy_mbuf_to_desc(dev, vq, pkts[i], desc_idx);
> -		if (unlikely(err)) {
> -			used_idx = (start_idx + i) & (vq->size - 1);
> -			vq->used->ring[used_idx].len = dev->vhost_hlen;
> -			vhost_log_used_vring(dev, vq,
> -				offsetof(struct vring_used, ring[used_idx]),
> -				sizeof(vq->used->ring[used_idx]));
> -		}
> +error:
> +	/* rollback on any error if last_used_idx update on-the-fly */
> +	vq->last_used_idx -= extra_buffers;
>  
> -		if (i + 1 < count)
> -			rte_prefetch0(&vq->desc[desc_indexes[i+1]]);
> -	}
> +	return 1;

We normally returns -1 (but not 1) on error.

>  
> +static inline void __attribute__((always_inline))
> +notify_guest(struct virtio_net *dev, struct vhost_virtqueue *vq)
> +{
>  	rte_smp_wmb();
> -
> -	*(volatile uint16_t *)&vq->used->idx += count;
> -	vq->last_used_idx += count;
> -	vhost_log_used_vring(dev, vq,
> -		offsetof(struct vring_used, idx),
> -		sizeof(vq->used->idx));
> -
> -	/* flush used->idx update before we read avail->flags. */
> +	vq->used->idx = vq->last_used_idx;

I will not drop the "volatile" cast here, sliently. You know this kind of
stuff is tricky and would be painful to debug if it cause any issue. Such
removal deserves a patch, as well as some explanations.

> +	vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
> +			sizeof(vq->used->idx));
>  	rte_mb();
> -
> -	/* Kick the guest if necessary. */
>  	if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)
>  			&& (vq->callfd >= 0))
>  		eventfd_write(vq->callfd, (eventfd_t)1);
> -	return count;
>  }
>  
...
> +	/* start enqueuing packets 1 by 1 */
> +	avail_idx = *((volatile uint16_t *)&vq->avail->idx);
> +	while (pkt_left && avail_idx != vq->last_used_idx) {
> +		if (enqueue_packet(dev, vq, avail_idx, pkts[pkt_idx],
> +					is_mrg_rxbuf))
>  			break;
> -		}
> -
> -		nr_used = copy_mbuf_to_desc_mergeable(dev, vq, end,
> -						      pkts[pkt_idx], buf_vec);
> -		rte_smp_wmb();
> -
> -		*(volatile uint16_t *)&vq->used->idx += nr_used;
> -		vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
> -			sizeof(vq->used->idx));
> -		vq->last_used_idx += nr_used;
> -	}
> -
> -	if (likely(pkt_idx)) {
> -		/* flush used->idx update before we read avail->flags. */
> -		rte_mb();
>  
> -		/* Kick the guest if necessary. */
> -		if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)
> -				&& (vq->callfd >= 0))
> -			eventfd_write(vq->callfd, (eventfd_t)1);
> +		pkt_idx++;
> +		pkt_sent++;

pkt_idx and pkt_sent is duplicate here.

	--yliu

> +		pkt_left--;
>  	}
>  
> -	return pkt_idx;
> -}
> -
> -uint16_t
> -rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
> -	struct rte_mbuf **pkts, uint16_t count)
> -{
> -	struct virtio_net *dev = get_device(vid);
> -
> -	if (!dev)
> -		return 0;
> +	/* update used idx and kick the guest if necessary */
> +	if (pkt_sent)
> +		notify_guest(dev, vq);
>  
> -	if (dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF))
> -		return virtio_dev_merge_rx(dev, queue_id, pkts, count);
> -	else
> -		return virtio_dev_rx(dev, queue_id, pkts, count);
> +	return pkt_sent;
>  }
>  
>  static void
> -- 
> 2.7.4
  
Zhihong Wang Sept. 7, 2016, 5:39 a.m. UTC | #3
> -----Original Message-----
> From: Yuanhan Liu [mailto:yuanhan.liu@linux.intel.com]
> Sent: Wednesday, September 7, 2016 1:33 PM
> To: Wang, Zhihong <zhihong.wang@intel.com>
> Cc: dev@dpdk.org; maxime.coquelin@redhat.com;
> thomas.monjalon@6wind.com
> Subject: Re: [PATCH v4 2/6] vhost: rewrite enqueue
> 
> Hmmm, yet another email didn't send out successfully. Resend.
> 
> BTW, please work out v5 on top of the latest next-virtio tree.
> 
> Thanks.

Okay. Thanks.

> 
> 	--yliu
> 
> On Mon, Sep 05, 2016 at 02:39:25PM +0800, Yuanhan Liu wrote:
> ----
> On Mon, Aug 29, 2016 at 11:36:00PM -0400, Zhihong Wang wrote:
> > This patch implements the vhost logic from scratch into a single function
> > designed for high performance and better maintainability.
> >
> > This is the baseline version of the new code, more optimization will be
> > added in the following patches in this patch set.
> >
> > ---
> > Changes in v4:
> >
> >  1. Refactor the code for clearer logic.
> >
> >  2. Add PRINT_PACKET for debugging.
> >
> > ---
> > Changes in v3:
> >
> >  1. Rewrite enqueue and delete the obsolete in the same patch.
> 
> Change log should go ---->
> 
> > Signed-off-by: Zhihong Wang <zhihong.wang@intel.com>
> > ---
> 
> ... here, after the SoB.
> 
> >  lib/librte_vhost/vhost_rxtx.c | 525 ++++++++++++-----------------------------
> -
> >  1 file changed, 145 insertions(+), 380 deletions(-)
> >
> > diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
> > index 5806f99..629e8ae 100644
> > --- a/lib/librte_vhost/vhost_rxtx.c
> > +++ b/lib/librte_vhost/vhost_rxtx.c
> > @@ -91,7 +91,7 @@ is_valid_virt_queue_idx(uint32_t idx, int is_tx,
> uint32_t qp_nb)
> >  	return (is_tx ^ (idx & 1)) == 0 && idx < qp_nb * VIRTIO_QNUM;
> >  }
> >
> > -static void
> > +static inline void __attribute__((always_inline))
> >  virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr
> *net_hdr)
> >  {
> >  	if (m_buf->ol_flags & PKT_TX_L4_MASK) {
> > @@ -112,6 +112,10 @@ virtio_enqueue_offload(struct rte_mbuf *m_buf,
> struct virtio_net_hdr *net_hdr)
> >  						cksum));
> >  			break;
> >  		}
> > +	} else {
> > +		net_hdr->flags = 0;
> > +		net_hdr->csum_start = 0;
> > +		net_hdr->csum_offset = 0;
> >  	}
> >
> >  	if (m_buf->ol_flags & PKT_TX_TCP_SEG) {
> > @@ -122,437 +126,198 @@ virtio_enqueue_offload(struct rte_mbuf
> *m_buf, struct virtio_net_hdr *net_hdr)
> >  		net_hdr->gso_size = m_buf->tso_segsz;
> >  		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len
> >  					+ m_buf->l4_len;
> > +	} else {
> > +		net_hdr->gso_type = 0;
> > +		net_hdr->hdr_len = 0;
> > +		net_hdr->gso_size = 0;
> >  	}
> >  }
> >
> > -static inline void
> > -copy_virtio_net_hdr(struct virtio_net *dev, uint64_t desc_addr,
> > -		    struct virtio_net_hdr_mrg_rxbuf hdr)
> > +static inline void __attribute__((always_inline))
> > +update_used_ring(struct virtio_net *dev, struct vhost_virtqueue *vq,
> > +		uint32_t desc_chain_head, uint32_t desc_chain_len)
> >  {
> > -	if (dev->vhost_hlen == sizeof(struct virtio_net_hdr_mrg_rxbuf))
> > -		*(struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr =
> hdr;
> > -	else
> > -		*(struct virtio_net_hdr *)(uintptr_t)desc_addr = hdr.hdr;
> > +	uint32_t used_idx_round = vq->last_used_idx & (vq->size - 1);
> 
> I'd suggest to use "used_idx", instead of "used_idx_round".
> 
> > +
> > +	vq->used->ring[used_idx_round].id = desc_chain_head;
> > +	vq->used->ring[used_idx_round].len = desc_chain_len;
> > +	vhost_log_used_vring(dev, vq, offsetof(struct vring_used,
> > +				ring[used_idx_round]),
> > +			sizeof(vq->used->ring[used_idx_round]));
> >  }
> >
> > -static inline int __attribute__((always_inline))
> > -copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
> > -		  struct rte_mbuf *m, uint16_t desc_idx)
> > +static inline uint32_t __attribute__((always_inline))
> > +enqueue_packet(struct virtio_net *dev, struct vhost_virtqueue *vq,
> > +		uint16_t avail_idx, struct rte_mbuf *mbuf,
> > +		uint32_t is_mrg_rxbuf)
> >  {
> > -	uint32_t desc_avail, desc_offset;
> > -	uint32_t mbuf_avail, mbuf_offset;
> > -	uint32_t cpy_len;
> > +	struct virtio_net_hdr_mrg_rxbuf *virtio_hdr;
> >  	struct vring_desc *desc;
> >  	uint64_t desc_addr;
> > -	struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
> > -
> > -	desc = &vq->desc[desc_idx];
> > +	uint32_t desc_chain_head;
> > +	uint32_t desc_chain_len;
> > +	uint32_t desc_current;
> > +	uint32_t desc_offset;
> > +	uint32_t mbuf_len;
> > +	uint32_t mbuf_avail;
> > +	uint32_t copy_len;
> > +	uint32_t extra_buffers = 0;
> 
> I'd name it to "num_buffers", to keep consistent with the virito hdr
> naming style.
> 
> > +
> > +	/* start with the first mbuf of the packet */
> > +	mbuf_len = rte_pktmbuf_data_len(mbuf);
> > +	mbuf_avail = mbuf_len;
> > +
> > +	/* get the current desc */
> > +	desc_current = vq->avail->ring[(vq->last_used_idx) & (vq->size - 1)];
> > +	desc_chain_head = desc_current;
> > +	desc = &vq->desc[desc_current];
> >  	desc_addr = gpa_to_vva(dev, desc->addr);
> > -	/*
> > -	 * Checking of 'desc_addr' placed outside of 'unlikely' macro to avoid
> > -	 * performance issue with some versions of gcc (4.8.4 and 5.3.0)
> which
> > -	 * otherwise stores offset on the stack instead of in a register.
> > -	 */
> > -	if (unlikely(desc->len < dev->vhost_hlen) || !desc_addr)
> > -		return -1;
> > +	if (unlikely(!desc_addr))
> > +		goto error;
> >
> > -	rte_prefetch0((void *)(uintptr_t)desc_addr);
> > +	/* handle virtio header */
> > +	virtio_hdr = (struct virtio_net_hdr_mrg_rxbuf
> *)(uintptr_t)desc_addr;
> > +	virtio_enqueue_offload(mbuf, &(virtio_hdr->hdr));
> > +	if (is_mrg_rxbuf)
> > +		virtio_hdr->num_buffers = extra_buffers + 1;
> >
> > -	virtio_enqueue_offload(m, &virtio_hdr.hdr);
> > -	copy_virtio_net_hdr(dev, desc_addr, virtio_hdr);
> >  	vhost_log_write(dev, desc->addr, dev->vhost_hlen);
> >  	PRINT_PACKET(dev, (uintptr_t)desc_addr, dev->vhost_hlen, 0);
> > -
> >  	desc_offset = dev->vhost_hlen;
> > -	desc_avail  = desc->len - dev->vhost_hlen;
> > -
> > -	mbuf_avail  = rte_pktmbuf_data_len(m);
> > -	mbuf_offset = 0;
> > -	while (mbuf_avail != 0 || m->next != NULL) {
> > -		/* done with current mbuf, fetch next */
> > -		if (mbuf_avail == 0) {
> > -			m = m->next;
> > -
> > -			mbuf_offset = 0;
> > -			mbuf_avail  = rte_pktmbuf_data_len(m);
> > +	desc_chain_len = desc_offset;
> > +	desc_addr += desc_offset;
> > +
> > +	/* start copy from mbuf to desc */
> > +	while (mbuf_avail || mbuf->next) {
> > +		/* get the next mbuf if the current done */
> > +		if (!mbuf_avail) {
> > +			mbuf = mbuf->next;
> > +			mbuf_len = rte_pktmbuf_data_len(mbuf);
> > +			mbuf_avail = mbuf_len;
> >  		}
> >
> > -		/* done with current desc buf, fetch next */
> > -		if (desc_avail == 0) {
> > -			if ((desc->flags & VRING_DESC_F_NEXT) == 0) {
> > -				/* Room in vring buffer is not enough */
> > -				return -1;
> > -			}
> > -			if (unlikely(desc->next >= vq->size))
> > -				return -1;
> > +		/* get the next desc if the current done */
> > +		if (desc->len <= desc_offset) {
> > +			if (desc->flags & VRING_DESC_F_NEXT) {
> > +				/* go on with the current desc chain */
> > +				desc_offset = 0;
> > +				desc_current = desc->next;
> > +				desc = &vq->desc[desc_current];
> > +				desc_addr = gpa_to_vva(dev, desc->addr);
> > +				if (unlikely(!desc_addr))
> > +					goto error;
> > +			} else if (is_mrg_rxbuf) {
> > +				/* start with the next desc chain */
> > +				update_used_ring(dev, vq,
> desc_chain_head,
> > +						desc_chain_len);
> > +				vq->last_used_idx++;
> 
> Why not putting "vq->last_used_idx++" into update_used_ring()?
> 
> > +				extra_buffers++;
> > +				virtio_hdr->num_buffers++;
> > +				if (avail_idx == vq->last_used_idx)
> > +					goto error;
> > +
> > +				desc_current =
> > +					vq->avail->ring[(vq->last_used_idx)
> &
> > +					(vq->size - 1)];
> > +				desc_chain_head = desc_current;
> > +				desc = &vq->desc[desc_current];
> > +				desc_addr = gpa_to_vva(dev, desc->addr);
> > +				if (unlikely(!desc_addr))
> > +					goto error;
> >
> > -			desc = &vq->desc[desc->next];
> > -			desc_addr = gpa_to_vva(dev, desc->addr);
> > -			if (unlikely(!desc_addr))
> > -				return -1;
> > -
> > -			desc_offset = 0;
> > -			desc_avail  = desc->len;
> > +				desc_chain_len = 0;
> > +				desc_offset = 0;
> > +			} else
> > +				goto error;
> >  		}
> >
> > -		cpy_len = RTE_MIN(desc_avail, mbuf_avail);
> > -		rte_memcpy((void *)((uintptr_t)(desc_addr + desc_offset)),
> > -			rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
> > -			cpy_len);
> > -		vhost_log_write(dev, desc->addr + desc_offset, cpy_len);
> > -		PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
> > -			     cpy_len, 0);
> > -
> > -		mbuf_avail  -= cpy_len;
> > -		mbuf_offset += cpy_len;
> > -		desc_avail  -= cpy_len;
> > -		desc_offset += cpy_len;
> > +		/* copy mbuf data */
> > +		copy_len = RTE_MIN(desc->len - desc_offset, mbuf_avail);
> 
> TBH, I'm okay with copy_len (actually, I prefer it slightly). However,
> the old code uses cpy_len, the current dequeue function also uses cpy_len,
> I then see no good reason to use copy_len here. It's really not a good
> idea to me to use two different naming styles in one source file.
> 
> 
> > +		rte_memcpy((void *)(uintptr_t)desc_addr,
> > +				rte_pktmbuf_mtod_offset(mbuf, void *,
> > +					mbuf_len - mbuf_avail),
> 
> I would keep the old var "mbuf_offset" and do not introduce "mbuf_len".
> This could avoid above calculation and make it straightforward.
> 
> > +				copy_len);
> > +		vhost_log_write(dev, desc->addr + desc_offset, copy_len);
> > +		PRINT_PACKET(dev, (uintptr_t)desc_addr, copy_len, 0);
> > +		mbuf_avail -= copy_len;
> > +		desc_offset += copy_len;
> > +		desc_addr += copy_len;
> > +		desc_chain_len += copy_len;
> 
> Vertical alighment[0] is not a must, but as you can see, it's a style I
> prefer. Meaning, if possible, please follow it.
> 
> [0]: https://en.wikipedia.org/wiki/Programming_style#Vertical_alignment
> 
> >  	}
> >
> > -	return 0;
> > -}
> > +	update_used_ring(dev, vq, desc_chain_head, desc_chain_len);
> > +	vq->last_used_idx++;
> >
> ...
> > -	rte_prefetch0(&vq->desc[desc_indexes[0]]);
> > -	for (i = 0; i < count; i++) {
> > -		uint16_t desc_idx = desc_indexes[i];
> > -		int err;
> > +	return 0;
> >
> > -		err = copy_mbuf_to_desc(dev, vq, pkts[i], desc_idx);
> > -		if (unlikely(err)) {
> > -			used_idx = (start_idx + i) & (vq->size - 1);
> > -			vq->used->ring[used_idx].len = dev->vhost_hlen;
> > -			vhost_log_used_vring(dev, vq,
> > -				offsetof(struct vring_used, ring[used_idx]),
> > -				sizeof(vq->used->ring[used_idx]));
> > -		}
> > +error:
> > +	/* rollback on any error if last_used_idx update on-the-fly */
> > +	vq->last_used_idx -= extra_buffers;
> >
> > -		if (i + 1 < count)
> > -			rte_prefetch0(&vq->desc[desc_indexes[i+1]]);
> > -	}
> > +	return 1;
> 
> We normally returns -1 (but not 1) on error.
> 
> >
> > +static inline void __attribute__((always_inline))
> > +notify_guest(struct virtio_net *dev, struct vhost_virtqueue *vq)
> > +{
> >  	rte_smp_wmb();
> > -
> > -	*(volatile uint16_t *)&vq->used->idx += count;
> > -	vq->last_used_idx += count;
> > -	vhost_log_used_vring(dev, vq,
> > -		offsetof(struct vring_used, idx),
> > -		sizeof(vq->used->idx));
> > -
> > -	/* flush used->idx update before we read avail->flags. */
> > +	vq->used->idx = vq->last_used_idx;
> 
> I will not drop the "volatile" cast here, sliently. You know this kind of
> stuff is tricky and would be painful to debug if it cause any issue. Such
> removal deserves a patch, as well as some explanations.
> 
> > +	vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
> > +			sizeof(vq->used->idx));
> >  	rte_mb();
> > -
> > -	/* Kick the guest if necessary. */
> >  	if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)
> >  			&& (vq->callfd >= 0))
> >  		eventfd_write(vq->callfd, (eventfd_t)1);
> > -	return count;
> >  }
> >
> ...
> > +	/* start enqueuing packets 1 by 1 */
> > +	avail_idx = *((volatile uint16_t *)&vq->avail->idx);
> > +	while (pkt_left && avail_idx != vq->last_used_idx) {
> > +		if (enqueue_packet(dev, vq, avail_idx, pkts[pkt_idx],
> > +					is_mrg_rxbuf))
> >  			break;
> > -		}
> > -
> > -		nr_used = copy_mbuf_to_desc_mergeable(dev, vq, end,
> > -						      pkts[pkt_idx], buf_vec);
> > -		rte_smp_wmb();
> > -
> > -		*(volatile uint16_t *)&vq->used->idx += nr_used;
> > -		vhost_log_used_vring(dev, vq, offsetof(struct vring_used,
> idx),
> > -			sizeof(vq->used->idx));
> > -		vq->last_used_idx += nr_used;
> > -	}
> > -
> > -	if (likely(pkt_idx)) {
> > -		/* flush used->idx update before we read avail->flags. */
> > -		rte_mb();
> >
> > -		/* Kick the guest if necessary. */
> > -		if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)
> > -				&& (vq->callfd >= 0))
> > -			eventfd_write(vq->callfd, (eventfd_t)1);
> > +		pkt_idx++;
> > +		pkt_sent++;
> 
> pkt_idx and pkt_sent is duplicate here.
> 
> 	--yliu
> 
> > +		pkt_left--;
> >  	}
> >
> > -	return pkt_idx;
> > -}
> > -
> > -uint16_t
> > -rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
> > -	struct rte_mbuf **pkts, uint16_t count)
> > -{
> > -	struct virtio_net *dev = get_device(vid);
> > -
> > -	if (!dev)
> > -		return 0;
> > +	/* update used idx and kick the guest if necessary */
> > +	if (pkt_sent)
> > +		notify_guest(dev, vq);
> >
> > -	if (dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF))
> > -		return virtio_dev_merge_rx(dev, queue_id, pkts, count);
> > -	else
> > -		return virtio_dev_rx(dev, queue_id, pkts, count);
> > +	return pkt_sent;
> >  }
> >
> >  static void
> > --
> > 2.7.4
  

Patch

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 5806f99..629e8ae 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -91,7 +91,7 @@  is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t qp_nb)
 	return (is_tx ^ (idx & 1)) == 0 && idx < qp_nb * VIRTIO_QNUM;
 }
 
-static void
+static inline void __attribute__((always_inline))
 virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
 {
 	if (m_buf->ol_flags & PKT_TX_L4_MASK) {
@@ -112,6 +112,10 @@  virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
 						cksum));
 			break;
 		}
+	} else {
+		net_hdr->flags = 0;
+		net_hdr->csum_start = 0;
+		net_hdr->csum_offset = 0;
 	}
 
 	if (m_buf->ol_flags & PKT_TX_TCP_SEG) {
@@ -122,437 +126,198 @@  virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
 		net_hdr->gso_size = m_buf->tso_segsz;
 		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len
 					+ m_buf->l4_len;
+	} else {
+		net_hdr->gso_type = 0;
+		net_hdr->hdr_len = 0;
+		net_hdr->gso_size = 0;
 	}
 }
 
-static inline void
-copy_virtio_net_hdr(struct virtio_net *dev, uint64_t desc_addr,
-		    struct virtio_net_hdr_mrg_rxbuf hdr)
+static inline void __attribute__((always_inline))
+update_used_ring(struct virtio_net *dev, struct vhost_virtqueue *vq,
+		uint32_t desc_chain_head, uint32_t desc_chain_len)
 {
-	if (dev->vhost_hlen == sizeof(struct virtio_net_hdr_mrg_rxbuf))
-		*(struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr = hdr;
-	else
-		*(struct virtio_net_hdr *)(uintptr_t)desc_addr = hdr.hdr;
+	uint32_t used_idx_round = vq->last_used_idx & (vq->size - 1);
+
+	vq->used->ring[used_idx_round].id = desc_chain_head;
+	vq->used->ring[used_idx_round].len = desc_chain_len;
+	vhost_log_used_vring(dev, vq, offsetof(struct vring_used,
+				ring[used_idx_round]),
+			sizeof(vq->used->ring[used_idx_round]));
 }
 
-static inline int __attribute__((always_inline))
-copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
-		  struct rte_mbuf *m, uint16_t desc_idx)
+static inline uint32_t __attribute__((always_inline))
+enqueue_packet(struct virtio_net *dev, struct vhost_virtqueue *vq,
+		uint16_t avail_idx, struct rte_mbuf *mbuf,
+		uint32_t is_mrg_rxbuf)
 {
-	uint32_t desc_avail, desc_offset;
-	uint32_t mbuf_avail, mbuf_offset;
-	uint32_t cpy_len;
+	struct virtio_net_hdr_mrg_rxbuf *virtio_hdr;
 	struct vring_desc *desc;
 	uint64_t desc_addr;
-	struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
-
-	desc = &vq->desc[desc_idx];
+	uint32_t desc_chain_head;
+	uint32_t desc_chain_len;
+	uint32_t desc_current;
+	uint32_t desc_offset;
+	uint32_t mbuf_len;
+	uint32_t mbuf_avail;
+	uint32_t copy_len;
+	uint32_t extra_buffers = 0;
+
+	/* start with the first mbuf of the packet */
+	mbuf_len = rte_pktmbuf_data_len(mbuf);
+	mbuf_avail = mbuf_len;
+
+	/* get the current desc */
+	desc_current = vq->avail->ring[(vq->last_used_idx) & (vq->size - 1)];
+	desc_chain_head = desc_current;
+	desc = &vq->desc[desc_current];
 	desc_addr = gpa_to_vva(dev, desc->addr);
-	/*
-	 * Checking of 'desc_addr' placed outside of 'unlikely' macro to avoid
-	 * performance issue with some versions of gcc (4.8.4 and 5.3.0) which
-	 * otherwise stores offset on the stack instead of in a register.
-	 */
-	if (unlikely(desc->len < dev->vhost_hlen) || !desc_addr)
-		return -1;
+	if (unlikely(!desc_addr))
+		goto error;
 
-	rte_prefetch0((void *)(uintptr_t)desc_addr);
+	/* handle virtio header */
+	virtio_hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr;
+	virtio_enqueue_offload(mbuf, &(virtio_hdr->hdr));
+	if (is_mrg_rxbuf)
+		virtio_hdr->num_buffers = extra_buffers + 1;
 
-	virtio_enqueue_offload(m, &virtio_hdr.hdr);
-	copy_virtio_net_hdr(dev, desc_addr, virtio_hdr);
 	vhost_log_write(dev, desc->addr, dev->vhost_hlen);
 	PRINT_PACKET(dev, (uintptr_t)desc_addr, dev->vhost_hlen, 0);
-
 	desc_offset = dev->vhost_hlen;
-	desc_avail  = desc->len - dev->vhost_hlen;
-
-	mbuf_avail  = rte_pktmbuf_data_len(m);
-	mbuf_offset = 0;
-	while (mbuf_avail != 0 || m->next != NULL) {
-		/* done with current mbuf, fetch next */
-		if (mbuf_avail == 0) {
-			m = m->next;
-
-			mbuf_offset = 0;
-			mbuf_avail  = rte_pktmbuf_data_len(m);
+	desc_chain_len = desc_offset;
+	desc_addr += desc_offset;
+
+	/* start copy from mbuf to desc */
+	while (mbuf_avail || mbuf->next) {
+		/* get the next mbuf if the current done */
+		if (!mbuf_avail) {
+			mbuf = mbuf->next;
+			mbuf_len = rte_pktmbuf_data_len(mbuf);
+			mbuf_avail = mbuf_len;
 		}
 
-		/* done with current desc buf, fetch next */
-		if (desc_avail == 0) {
-			if ((desc->flags & VRING_DESC_F_NEXT) == 0) {
-				/* Room in vring buffer is not enough */
-				return -1;
-			}
-			if (unlikely(desc->next >= vq->size))
-				return -1;
+		/* get the next desc if the current done */
+		if (desc->len <= desc_offset) {
+			if (desc->flags & VRING_DESC_F_NEXT) {
+				/* go on with the current desc chain */
+				desc_offset = 0;
+				desc_current = desc->next;
+				desc = &vq->desc[desc_current];
+				desc_addr = gpa_to_vva(dev, desc->addr);
+				if (unlikely(!desc_addr))
+					goto error;
+			} else if (is_mrg_rxbuf) {
+				/* start with the next desc chain */
+				update_used_ring(dev, vq, desc_chain_head,
+						desc_chain_len);
+				vq->last_used_idx++;
+				extra_buffers++;
+				virtio_hdr->num_buffers++;
+				if (avail_idx == vq->last_used_idx)
+					goto error;
+
+				desc_current =
+					vq->avail->ring[(vq->last_used_idx) &
+					(vq->size - 1)];
+				desc_chain_head = desc_current;
+				desc = &vq->desc[desc_current];
+				desc_addr = gpa_to_vva(dev, desc->addr);
+				if (unlikely(!desc_addr))
+					goto error;
 
-			desc = &vq->desc[desc->next];
-			desc_addr = gpa_to_vva(dev, desc->addr);
-			if (unlikely(!desc_addr))
-				return -1;
-
-			desc_offset = 0;
-			desc_avail  = desc->len;
+				desc_chain_len = 0;
+				desc_offset = 0;
+			} else
+				goto error;
 		}
 
-		cpy_len = RTE_MIN(desc_avail, mbuf_avail);
-		rte_memcpy((void *)((uintptr_t)(desc_addr + desc_offset)),
-			rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
-			cpy_len);
-		vhost_log_write(dev, desc->addr + desc_offset, cpy_len);
-		PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
-			     cpy_len, 0);
-
-		mbuf_avail  -= cpy_len;
-		mbuf_offset += cpy_len;
-		desc_avail  -= cpy_len;
-		desc_offset += cpy_len;
+		/* copy mbuf data */
+		copy_len = RTE_MIN(desc->len - desc_offset, mbuf_avail);
+		rte_memcpy((void *)(uintptr_t)desc_addr,
+				rte_pktmbuf_mtod_offset(mbuf, void *,
+					mbuf_len - mbuf_avail),
+				copy_len);
+		vhost_log_write(dev, desc->addr + desc_offset, copy_len);
+		PRINT_PACKET(dev, (uintptr_t)desc_addr, copy_len, 0);
+		mbuf_avail -= copy_len;
+		desc_offset += copy_len;
+		desc_addr += copy_len;
+		desc_chain_len += copy_len;
 	}
 
-	return 0;
-}
+	update_used_ring(dev, vq, desc_chain_head, desc_chain_len);
+	vq->last_used_idx++;
 
-/**
- * This function adds buffers to the virtio devices RX virtqueue. Buffers can
- * be received from the physical port or from another virtio device. A packet
- * count is returned to indicate the number of packets that are succesfully
- * added to the RX queue. This function works when the mbuf is scattered, but
- * it doesn't support the mergeable feature.
- */
-static inline uint32_t __attribute__((always_inline))
-virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
-	      struct rte_mbuf **pkts, uint32_t count)
-{
-	struct vhost_virtqueue *vq;
-	uint16_t avail_idx, free_entries, start_idx;
-	uint16_t desc_indexes[MAX_PKT_BURST];
-	uint16_t used_idx;
-	uint32_t i;
-
-	LOG_DEBUG(VHOST_DATA, "(%d) %s\n", dev->vid, __func__);
-	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->virt_qp_nb))) {
-		RTE_LOG(ERR, VHOST_DATA, "(%d) %s: invalid virtqueue idx %d.\n",
-			dev->vid, __func__, queue_id);
-		return 0;
-	}
-
-	vq = dev->virtqueue[queue_id];
-	if (unlikely(vq->enabled == 0))
-		return 0;
-
-	avail_idx = *((volatile uint16_t *)&vq->avail->idx);
-	start_idx = vq->last_used_idx;
-	free_entries = avail_idx - start_idx;
-	count = RTE_MIN(count, free_entries);
-	count = RTE_MIN(count, (uint32_t)MAX_PKT_BURST);
-	if (count == 0)
-		return 0;
-
-	LOG_DEBUG(VHOST_DATA, "(%d) start_idx %d | end_idx %d\n",
-		dev->vid, start_idx, start_idx + count);
-
-	/* Retrieve all of the desc indexes first to avoid caching issues. */
-	rte_prefetch0(&vq->avail->ring[start_idx & (vq->size - 1)]);
-	for (i = 0; i < count; i++) {
-		used_idx = (start_idx + i) & (vq->size - 1);
-		desc_indexes[i] = vq->avail->ring[used_idx];
-		vq->used->ring[used_idx].id = desc_indexes[i];
-		vq->used->ring[used_idx].len = pkts[i]->pkt_len +
-					       dev->vhost_hlen;
-		vhost_log_used_vring(dev, vq,
-			offsetof(struct vring_used, ring[used_idx]),
-			sizeof(vq->used->ring[used_idx]));
-	}
-
-	rte_prefetch0(&vq->desc[desc_indexes[0]]);
-	for (i = 0; i < count; i++) {
-		uint16_t desc_idx = desc_indexes[i];
-		int err;
+	return 0;
 
-		err = copy_mbuf_to_desc(dev, vq, pkts[i], desc_idx);
-		if (unlikely(err)) {
-			used_idx = (start_idx + i) & (vq->size - 1);
-			vq->used->ring[used_idx].len = dev->vhost_hlen;
-			vhost_log_used_vring(dev, vq,
-				offsetof(struct vring_used, ring[used_idx]),
-				sizeof(vq->used->ring[used_idx]));
-		}
+error:
+	/* rollback on any error if last_used_idx update on-the-fly */
+	vq->last_used_idx -= extra_buffers;
 
-		if (i + 1 < count)
-			rte_prefetch0(&vq->desc[desc_indexes[i+1]]);
-	}
+	return 1;
+}
 
+static inline void __attribute__((always_inline))
+notify_guest(struct virtio_net *dev, struct vhost_virtqueue *vq)
+{
 	rte_smp_wmb();
-
-	*(volatile uint16_t *)&vq->used->idx += count;
-	vq->last_used_idx += count;
-	vhost_log_used_vring(dev, vq,
-		offsetof(struct vring_used, idx),
-		sizeof(vq->used->idx));
-
-	/* flush used->idx update before we read avail->flags. */
+	vq->used->idx = vq->last_used_idx;
+	vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
+			sizeof(vq->used->idx));
 	rte_mb();
-
-	/* Kick the guest if necessary. */
 	if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)
 			&& (vq->callfd >= 0))
 		eventfd_write(vq->callfd, (eventfd_t)1);
-	return count;
 }
 
-static inline int
-fill_vec_buf(struct vhost_virtqueue *vq, uint32_t avail_idx,
-	     uint32_t *allocated, uint32_t *vec_idx,
-	     struct buf_vector *buf_vec)
-{
-	uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
-	uint32_t vec_id = *vec_idx;
-	uint32_t len    = *allocated;
-
-	while (1) {
-		if (unlikely(vec_id >= BUF_VECTOR_MAX || idx >= vq->size))
-			return -1;
-
-		len += vq->desc[idx].len;
-		buf_vec[vec_id].buf_addr = vq->desc[idx].addr;
-		buf_vec[vec_id].buf_len  = vq->desc[idx].len;
-		buf_vec[vec_id].desc_idx = idx;
-		vec_id++;
-
-		if ((vq->desc[idx].flags & VRING_DESC_F_NEXT) == 0)
-			break;
-
-		idx = vq->desc[idx].next;
-	}
-
-	*allocated = len;
-	*vec_idx   = vec_id;
-
-	return 0;
-}
-
-/*
- * Returns -1 on fail, 0 on success
- */
-static inline int
-reserve_avail_buf_mergeable(struct vhost_virtqueue *vq, uint32_t size,
-			    uint16_t *end, struct buf_vector *buf_vec)
+uint16_t
+rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
+	struct rte_mbuf **pkts, uint16_t count)
 {
-	uint16_t cur_idx;
+	struct vhost_virtqueue *vq;
+	struct virtio_net *dev;
+	uint32_t pkt_left = count;
+	uint32_t pkt_idx = 0;
+	uint32_t pkt_sent = 0;
+	uint32_t is_mrg_rxbuf = 0;
 	uint16_t avail_idx;
-	uint32_t allocated = 0;
-	uint32_t vec_idx = 0;
-	uint16_t tries = 0;
 
-	cur_idx  = vq->last_used_idx;
-
-	while (1) {
-		avail_idx = *((volatile uint16_t *)&vq->avail->idx);
-		if (unlikely(cur_idx == avail_idx))
-			return -1;
-
-		if (unlikely(fill_vec_buf(vq, cur_idx, &allocated,
-					  &vec_idx, buf_vec) < 0))
-			return -1;
-
-		cur_idx++;
-		tries++;
-
-		if (allocated >= size)
-			break;
-
-		/*
-		 * if we tried all available ring items, and still
-		 * can't get enough buf, it means something abnormal
-		 * happened.
-		 */
-		if (unlikely(tries >= vq->size))
-			return -1;
-	}
-
-	*end = cur_idx;
-	return 0;
-}
-
-static inline uint32_t __attribute__((always_inline))
-copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
-			    uint16_t end_idx, struct rte_mbuf *m,
-			    struct buf_vector *buf_vec)
-{
-	struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
-	uint32_t vec_idx = 0;
-	uint16_t start_idx = vq->last_used_idx;
-	uint16_t cur_idx = start_idx;
-	uint64_t desc_addr;
-	uint32_t desc_chain_head;
-	uint32_t desc_chain_len;
-	uint32_t mbuf_offset, mbuf_avail;
-	uint32_t desc_offset, desc_avail;
-	uint32_t cpy_len;
-	uint16_t desc_idx, used_idx;
-
-	if (unlikely(m == NULL))
+	if (unlikely(!pkt_left))
 		return 0;
 
-	LOG_DEBUG(VHOST_DATA, "(%d) current index %d | end index %d\n",
-		dev->vid, cur_idx, end_idx);
+	pkt_left = RTE_MIN((uint32_t)MAX_PKT_BURST, pkt_left);
 
-	desc_addr = gpa_to_vva(dev, buf_vec[vec_idx].buf_addr);
-	if (buf_vec[vec_idx].buf_len < dev->vhost_hlen || !desc_addr)
+	dev = get_device(vid);
+	if (unlikely(!dev))
 		return 0;
 
-	rte_prefetch0((void *)(uintptr_t)desc_addr);
-
-	virtio_hdr.num_buffers = end_idx - start_idx;
-	LOG_DEBUG(VHOST_DATA, "(%d) RX: num merge buffers %d\n",
-		dev->vid, virtio_hdr.num_buffers);
-
-	virtio_enqueue_offload(m, &virtio_hdr.hdr);
-	copy_virtio_net_hdr(dev, desc_addr, virtio_hdr);
-	vhost_log_write(dev, buf_vec[vec_idx].buf_addr, dev->vhost_hlen);
-	PRINT_PACKET(dev, (uintptr_t)desc_addr, dev->vhost_hlen, 0);
-
-	desc_avail  = buf_vec[vec_idx].buf_len - dev->vhost_hlen;
-	desc_offset = dev->vhost_hlen;
-	desc_chain_head = buf_vec[vec_idx].desc_idx;
-	desc_chain_len = desc_offset;
-
-	mbuf_avail  = rte_pktmbuf_data_len(m);
-	mbuf_offset = 0;
-	while (mbuf_avail != 0 || m->next != NULL) {
-		/* done with current desc buf, get the next one */
-		if (desc_avail == 0) {
-			desc_idx = buf_vec[vec_idx].desc_idx;
-			vec_idx++;
-
-			if (!(vq->desc[desc_idx].flags & VRING_DESC_F_NEXT)) {
-				/* Update used ring with desc information */
-				used_idx = cur_idx++ & (vq->size - 1);
-				vq->used->ring[used_idx].id = desc_chain_head;
-				vq->used->ring[used_idx].len = desc_chain_len;
-				vhost_log_used_vring(dev, vq,
-					offsetof(struct vring_used,
-						 ring[used_idx]),
-					sizeof(vq->used->ring[used_idx]));
-				desc_chain_head = buf_vec[vec_idx].desc_idx;
-				desc_chain_len = 0;
-			}
-
-			desc_addr = gpa_to_vva(dev, buf_vec[vec_idx].buf_addr);
-			if (unlikely(!desc_addr))
-				return 0;
-
-			/* Prefetch buffer address. */
-			rte_prefetch0((void *)(uintptr_t)desc_addr);
-			desc_offset = 0;
-			desc_avail  = buf_vec[vec_idx].buf_len;
-		}
-
-		/* done with current mbuf, get the next one */
-		if (mbuf_avail == 0) {
-			m = m->next;
-
-			mbuf_offset = 0;
-			mbuf_avail  = rte_pktmbuf_data_len(m);
-		}
-
-		cpy_len = RTE_MIN(desc_avail, mbuf_avail);
-		rte_memcpy((void *)((uintptr_t)(desc_addr + desc_offset)),
-			rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
-			cpy_len);
-		vhost_log_write(dev, buf_vec[vec_idx].buf_addr + desc_offset,
-			cpy_len);
-		PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
-			cpy_len, 0);
-
-		mbuf_avail  -= cpy_len;
-		mbuf_offset += cpy_len;
-		desc_avail  -= cpy_len;
-		desc_offset += cpy_len;
-		desc_chain_len += cpy_len;
-	}
-
-	used_idx = cur_idx & (vq->size - 1);
-	vq->used->ring[used_idx].id = desc_chain_head;
-	vq->used->ring[used_idx].len = desc_chain_len;
-	vhost_log_used_vring(dev, vq,
-		offsetof(struct vring_used, ring[used_idx]),
-		sizeof(vq->used->ring[used_idx]));
-
-	return end_idx - start_idx;
-}
-
-static inline uint32_t __attribute__((always_inline))
-virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
-	struct rte_mbuf **pkts, uint32_t count)
-{
-	struct vhost_virtqueue *vq;
-	uint32_t pkt_idx = 0, nr_used = 0;
-	uint16_t end;
-	struct buf_vector buf_vec[BUF_VECTOR_MAX];
-
-	LOG_DEBUG(VHOST_DATA, "(%d) %s\n", dev->vid, __func__);
-	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->virt_qp_nb))) {
-		RTE_LOG(ERR, VHOST_DATA, "(%d) %s: invalid virtqueue idx %d.\n",
-			dev->vid, __func__, queue_id);
+	if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->virt_qp_nb)))
 		return 0;
-	}
 
 	vq = dev->virtqueue[queue_id];
-	if (unlikely(vq->enabled == 0))
+	if (unlikely(!vq->enabled))
 		return 0;
 
-	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
-	if (count == 0)
-		return 0;
-
-	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
-		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
+	if (dev->features & (1ULL << VIRTIO_NET_F_MRG_RXBUF))
+		is_mrg_rxbuf = 1;
 
-		if (unlikely(reserve_avail_buf_mergeable(vq, pkt_len,
-							 &end, buf_vec) < 0)) {
-			LOG_DEBUG(VHOST_DATA,
-				"(%d) failed to get enough desc from vring\n",
-				dev->vid);
+	/* start enqueuing packets 1 by 1 */
+	avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+	while (pkt_left && avail_idx != vq->last_used_idx) {
+		if (enqueue_packet(dev, vq, avail_idx, pkts[pkt_idx],
+					is_mrg_rxbuf))
 			break;
-		}
-
-		nr_used = copy_mbuf_to_desc_mergeable(dev, vq, end,
-						      pkts[pkt_idx], buf_vec);
-		rte_smp_wmb();
-
-		*(volatile uint16_t *)&vq->used->idx += nr_used;
-		vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
-			sizeof(vq->used->idx));
-		vq->last_used_idx += nr_used;
-	}
-
-	if (likely(pkt_idx)) {
-		/* flush used->idx update before we read avail->flags. */
-		rte_mb();
 
-		/* Kick the guest if necessary. */
-		if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)
-				&& (vq->callfd >= 0))
-			eventfd_write(vq->callfd, (eventfd_t)1);
+		pkt_idx++;
+		pkt_sent++;
+		pkt_left--;
 	}
 
-	return pkt_idx;
-}
-
-uint16_t
-rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
-	struct rte_mbuf **pkts, uint16_t count)
-{
-	struct virtio_net *dev = get_device(vid);
-
-	if (!dev)
-		return 0;
+	/* update used idx and kick the guest if necessary */
+	if (pkt_sent)
+		notify_guest(dev, vq);
 
-	if (dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF))
-		return virtio_dev_merge_rx(dev, queue_id, pkts, count);
-	else
-		return virtio_dev_rx(dev, queue_id, pkts, count);
+	return pkt_sent;
 }
 
 static void