[dpdk-dev,v2,1/5] lib_vhost: Fix enqueue/dequeue can't handle chained vring descriptors

Message ID 1432826207-8428-2-git-send-email-changchun.ouyang@intel.com (mailing list archive)
State Superseded, archived
Headers

Commit Message

Ouyang Changchun May 28, 2015, 3:16 p.m. UTC
  Vring enqueue need consider the 2 cases:
 1. Vring descriptors chained together, the first one is for virtio header, the rest are for real
    data, virtio driver in Linux usually use this scheme;
 2. Only one descriptor, virtio header and real data share one single descriptor, virtio-net pmd use
    such scheme;

So does vring dequeue, it should not assume vring descriptor is chained or not chained, virtio in
different Linux version has different behavior, e.g. fedora 20 use chained vring descriptor, while
fedora 21 use one single vring descriptor for tx.

Changes in v2
  - drop the uncompleted packet
  - refine code logic

Signed-off-by: Changchun Ouyang <changchun.ouyang@intel.com>
---
 lib/librte_vhost/vhost_rxtx.c | 65 +++++++++++++++++++++++++++++++++----------
 1 file changed, 50 insertions(+), 15 deletions(-)
  

Comments

Huawei Xie May 31, 2015, 5:03 a.m. UTC | #1
On 5/28/2015 11:17 PM, Ouyang, Changchun wrote:
> Vring enqueue need consider the 2 cases:
>  1. Vring descriptors chained together, the first one is for virtio header, the rest are for real
>     data, virtio driver in Linux usually use this scheme;
>  2. Only one descriptor, virtio header and real data share one single descriptor, virtio-net pmd use
>     such scheme;
For the commit message, :), actually we should consider the desc chain
as logically continuous memory space, so there is also the case like
desc 1: virtio header and data; descs followed: data only.

> So does vring dequeue, it should not assume vring descriptor is chained or not chained, virtio in
> different Linux version has different behavior, e.g. fedora 20 use chained vring descriptor, while
> fedora 21 use one single vring descriptor for tx.
This behavior could be configured. Besides it is not bound to
distribution but virtio-net driver.
They key thing is we should consider the generic case, rather than
fitting the requirement of existing  virtio-net implementation, so
suggest remove the above message.
>
> Changes in v2
>   - drop the uncompleted packet
>   - refine code logic
>
> Signed-off-by: Changchun Ouyang <changchun.ouyang@intel.com>
> ---
>  lib/librte_vhost/vhost_rxtx.c | 65 +++++++++++++++++++++++++++++++++----------
>  1 file changed, 50 insertions(+), 15 deletions(-)
>
> diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
> index 4809d32..06ae2df 100644
> --- a/lib/librte_vhost/vhost_rxtx.c
> +++ b/lib/librte_vhost/vhost_rxtx.c
> @@ -59,7 +59,7 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
>  	struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
>  	uint64_t buff_addr = 0;
>  	uint64_t buff_hdr_addr = 0;
> -	uint32_t head[MAX_PKT_BURST], packet_len = 0;
> +	uint32_t head[MAX_PKT_BURST];
>  	uint32_t head_idx, packet_success = 0;
>  	uint16_t avail_idx, res_cur_idx;
>  	uint16_t res_base_idx, res_end_idx;
> @@ -113,6 +113,10 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
>  	rte_prefetch0(&vq->desc[head[packet_success]]);
>  
>  	while (res_cur_idx != res_end_idx) {
> +		uint32_t offset = 0;
> +		uint32_t data_len, len_to_cpy;
> +		uint8_t hdr = 0, uncompleted_pkt = 0;
> +
>  		/* Get descriptor from available ring */
>  		desc = &vq->desc[head[packet_success]];
>  
> @@ -125,7 +129,6 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
>  
>  		/* Copy virtio_hdr to packet and increment buffer address */
>  		buff_hdr_addr = buff_addr;
> -		packet_len = rte_pktmbuf_data_len(buff) + vq->vhost_hlen;
>  
>  		/*
>  		 * If the descriptors are chained the header and data are
> @@ -136,28 +139,55 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
>  			desc = &vq->desc[desc->next];
>  			/* Buffer address translation. */
>  			buff_addr = gpa_to_vva(dev, desc->addr);
I am wondering if there is the possibility the [GPA, GPA+desc->len]
could cross multiple memory regions.
Don't expect to fix in this patch, :).
> -			desc->len = rte_pktmbuf_data_len(buff);
>  		} else {
>  			buff_addr += vq->vhost_hlen;
> -			desc->len = packet_len;
> +			hdr = 1;
>  		}
>  
> +		data_len = rte_pktmbuf_data_len(buff);
> +		len_to_cpy = RTE_MIN(data_len,
> +			hdr ? desc->len - vq->vhost_hlen : desc->len);
> +		while (len_to_cpy > 0) {
> +			/* Copy mbuf data to buffer */
> +			rte_memcpy((void *)(uintptr_t)buff_addr,
> +				(const void *)(rte_pktmbuf_mtod(buff, const char *) + offset),
> +				len_to_cpy);
> +			PRINT_PACKET(dev, (uintptr_t)buff_addr,
> +				len_to_cpy, 0);
> +
> +			offset += len_to_cpy;
> +
> +			if (offset == data_len)
> +				break;
I don't understand here. If offset reaches the end of the first segment,
why don't we continue to copy from the next segment?

> +
> +			if (desc->flags & VRING_DESC_F_NEXT) {
> +				desc = &vq->desc[desc->next];
> +				buff_addr = gpa_to_vva(dev, desc->addr);
> +				len_to_cpy = RTE_MIN(data_len - offset, desc->len);
> +			} else {
> +				/* Room in vring buffer is not enough */
> +				uncompleted_pkt = 1;
> +				break;
> +			}
> +		};
> +
>  		/* Update used ring with desc information */
>  		vq->used->ring[res_cur_idx & (vq->size - 1)].id =
>  							head[packet_success];
> -		vq->used->ring[res_cur_idx & (vq->size - 1)].len = packet_len;
>  
> -		/* Copy mbuf data to buffer */
> -		/* FIXME for sg mbuf and the case that desc couldn't hold the mbuf data */
> -		rte_memcpy((void *)(uintptr_t)buff_addr,
> -			rte_pktmbuf_mtod(buff, const void *),
> -			rte_pktmbuf_data_len(buff));
> -		PRINT_PACKET(dev, (uintptr_t)buff_addr,
> -			rte_pktmbuf_data_len(buff), 0);
> +		/* Drop the packet if it is uncompleted */
> +		if (unlikely(uncompleted_pkt == 1))
> +			vq->used->ring[res_cur_idx & (vq->size - 1)].len = 0;
> +		else
> +			vq->used->ring[res_cur_idx & (vq->size - 1)].len =
> +							offset + vq->vhost_hlen;
>  
>  		res_cur_idx++;
>  		packet_success++;
>  
> +		if (unlikely(uncompleted_pkt == 1))
> +			continue;
> +
>  		rte_memcpy((void *)(uintptr_t)buff_hdr_addr,
>  			(const void *)&virtio_hdr, vq->vhost_hlen);
>  
> @@ -589,7 +619,14 @@ rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
>  		desc = &vq->desc[head[entry_success]];
>  
>  		/* Discard first buffer as it is the virtio header */
> -		desc = &vq->desc[desc->next];
> +		if (desc->flags & VRING_DESC_F_NEXT) {
> +			desc = &vq->desc[desc->next];
> +			vb_offset = 0;
> +			vb_avail = desc->len;
> +		} else {
> +			vb_offset = vq->vhost_hlen;
> +			vb_avail = desc->len - vb_offset;
> +		}
>  
>  		/* Buffer address translation. */
>  		vb_addr = gpa_to_vva(dev, desc->addr);
> @@ -608,8 +645,6 @@ rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
>  		vq->used->ring[used_idx].id = head[entry_success];
>  		vq->used->ring[used_idx].len = 0;
>  
> -		vb_offset = 0;
> -		vb_avail = desc->len;
>  		/* Allocate an mbuf and populate the structure. */
>  		m = rte_pktmbuf_alloc(mbuf_pool);
>  		if (unlikely(m == NULL)) {
  
Huawei Xie May 31, 2015, 8:40 a.m. UTC | #2
On 5/28/2015 11:17 PM, Ouyang, Changchun wrote:
> Vring enqueue need consider the 2 cases:
>  1. Vring descriptors chained together, the first one is for virtio header, the rest are for real
>     data, virtio driver in Linux usually use this scheme;
>  2. Only one descriptor, virtio header and real data share one single descriptor, virtio-net pmd use
>     such scheme;
>
> So does vring dequeue, it should not assume vring descriptor is chained or not chained, virtio in
> different Linux version has different behavior, e.g. fedora 20 use chained vring descriptor, while
> fedora 21 use one single vring descriptor for tx.
>
> Changes in v2
>   - drop the uncompleted packet
>   - refine code logic
>
> Signed-off-by: Changchun Ouyang <changchun.ouyang@intel.com>
> ---
>  lib/librte_vhost/vhost_rxtx.c | 65 +++++++++++++++++++++++++++++++++----------
>  1 file changed, 50 insertions(+), 15 deletions(-)
>
> diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
> index 4809d32..06ae2df 100644
> --- a/lib/librte_vhost/vhost_rxtx.c
> +++ b/lib/librte_vhost/vhost_rxtx.c
> @@ -59,7 +59,7 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
>  	struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
>  	uint64_t buff_addr = 0;
>  	uint64_t buff_hdr_addr = 0;
> -	uint32_t head[MAX_PKT_BURST], packet_len = 0;
> +	uint32_t head[MAX_PKT_BURST];
>  	uint32_t head_idx, packet_success = 0;
>  	uint16_t avail_idx, res_cur_idx;
>  	uint16_t res_base_idx, res_end_idx;
> @@ -113,6 +113,10 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
>  	rte_prefetch0(&vq->desc[head[packet_success]]);
>  
>  	while (res_cur_idx != res_end_idx) {
> +		uint32_t offset = 0;
> +		uint32_t data_len, len_to_cpy;
> +		uint8_t hdr = 0, uncompleted_pkt = 0;
> +
>  		/* Get descriptor from available ring */
>  		desc = &vq->desc[head[packet_success]];
>  
> @@ -125,7 +129,6 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
>  
>  		/* Copy virtio_hdr to packet and increment buffer address */
>  		buff_hdr_addr = buff_addr;
> -		packet_len = rte_pktmbuf_data_len(buff) + vq->vhost_hlen;
>  
>  		/*
>  		 * If the descriptors are chained the header and data are
> @@ -136,28 +139,55 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
>  			desc = &vq->desc[desc->next];
>  			/* Buffer address translation. */
>  			buff_addr = gpa_to_vva(dev, desc->addr);
> -			desc->len = rte_pktmbuf_data_len(buff);
Do we got confirm from virtio SPEC that it is OK to only update used->len?
>  		} else {
>  			buff_addr += vq->vhost_hlen;
> -			desc->len = packet_len;
> +			hdr = 1;
>  		}
>  
> +		data_len = rte_pktmbuf_data_len(buff);
> +		len_to_cpy = RTE_MIN(data_len,
> +			hdr ? desc->len - vq->vhost_hlen : desc->len);
> +		while (len_to_cpy > 0) {
> +			/* Copy mbuf data to buffer */
> +			rte_memcpy((void *)(uintptr_t)buff_addr,
> +				(const void *)(rte_pktmbuf_mtod(buff, const char *) + offset),
> +				len_to_cpy);
> +			PRINT_PACKET(dev, (uintptr_t)buff_addr,
> +				len_to_cpy, 0);
> +
> +			offset += len_to_cpy;
> +
> +			if (offset == data_len)
> +				break;
Ok, i see scatter gather case handling is in patch 5.
> +
> +			if (desc->flags & VRING_DESC_F_NEXT) {
> +				desc = &vq->desc[desc->next];
> +				buff_addr = gpa_to_vva(dev, desc->addr);
> +				len_to_cpy = RTE_MIN(data_len - offset, desc->len);
> +			} else {
> +				/* Room in vring buffer is not enough */
> +				uncompleted_pkt = 1;
> +				break;
> +			}
> +		};
> +
>  		/* Update used ring with desc information */
>  		vq->used->ring[res_cur_idx & (vq->size - 1)].id =
>  							head[packet_success];
> -		vq->used->ring[res_cur_idx & (vq->size - 1)].len = packet_len;
>  
> -		/* Copy mbuf data to buffer */
> -		/* FIXME for sg mbuf and the case that desc couldn't hold the mbuf data */
> -		rte_memcpy((void *)(uintptr_t)buff_addr,
> -			rte_pktmbuf_mtod(buff, const void *),
> -			rte_pktmbuf_data_len(buff));
> -		PRINT_PACKET(dev, (uintptr_t)buff_addr,
> -			rte_pktmbuf_data_len(buff), 0);
> +		/* Drop the packet if it is uncompleted */
> +		if (unlikely(uncompleted_pkt == 1))
> +			vq->used->ring[res_cur_idx & (vq->size - 1)].len = 0;
Here things become complicated with the previous lockless reserve.
What is the consequence when guest sees zero in used->len? At least, do
we check with virtio-net implementation?

> +		else
> +			vq->used->ring[res_cur_idx & (vq->size - 1)].len =
> +							offset + vq->vhost_hlen;
Two questions here,
1.  add virtio header len?
2.  Why not use packet_len rather than offset?
>  
>  		res_cur_idx++;
>  		packet_success++;
>  
> +		if (unlikely(uncompleted_pkt == 1))
> +			continue;
> +
>  		rte_memcpy((void *)(uintptr_t)buff_hdr_addr,
>  			(const void *)&virtio_hdr, vq->vhost_hlen);
>  
> @@ -589,7 +619,14 @@ rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
>  		desc = &vq->desc[head[entry_success]];
>  
>  		/* Discard first buffer as it is the virtio header */
> -		desc = &vq->desc[desc->next];
> +		if (desc->flags & VRING_DESC_F_NEXT) {
> +			desc = &vq->desc[desc->next];
> +			vb_offset = 0;
> +			vb_avail = desc->len;
> +		} else {
> +			vb_offset = vq->vhost_hlen;
> +			vb_avail = desc->len - vb_offset;
> +		}
>  
>  		/* Buffer address translation. */
>  		vb_addr = gpa_to_vva(dev, desc->addr);
> @@ -608,8 +645,6 @@ rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
>  		vq->used->ring[used_idx].id = head[entry_success];
>  		vq->used->ring[used_idx].len = 0;
>  
> -		vb_offset = 0;
> -		vb_avail = desc->len;
>  		/* Allocate an mbuf and populate the structure. */
>  		m = rte_pktmbuf_alloc(mbuf_pool);
>  		if (unlikely(m == NULL)) {
  
Ouyang Changchun May 31, 2015, 12:59 p.m. UTC | #3
> -----Original Message-----
> From: Xie, Huawei
> Sent: Sunday, May 31, 2015 4:41 PM
> To: Ouyang, Changchun; dev@dpdk.org
> Cc: Cao, Waterman
> Subject: Re: [PATCH v2 1/5] lib_vhost: Fix enqueue/dequeue can't handle
> chained vring descriptors
> 
> On 5/28/2015 11:17 PM, Ouyang, Changchun wrote:
> > Vring enqueue need consider the 2 cases:
> >  1. Vring descriptors chained together, the first one is for virtio header, the
> rest are for real
> >     data, virtio driver in Linux usually use this scheme;  2. Only one
> > descriptor, virtio header and real data share one single descriptor, virtio-
> net pmd use
> >     such scheme;
> >
> > So does vring dequeue, it should not assume vring descriptor is
> > chained or not chained, virtio in different Linux version has
> > different behavior, e.g. fedora 20 use chained vring descriptor, while
> fedora 21 use one single vring descriptor for tx.
> >
> > Changes in v2
> >   - drop the uncompleted packet
> >   - refine code logic
> >
> > Signed-off-by: Changchun Ouyang <changchun.ouyang@intel.com>
> > ---
> >  lib/librte_vhost/vhost_rxtx.c | 65
> > +++++++++++++++++++++++++++++++++----------
> >  1 file changed, 50 insertions(+), 15 deletions(-)
> >
> > diff --git a/lib/librte_vhost/vhost_rxtx.c
> > b/lib/librte_vhost/vhost_rxtx.c index 4809d32..06ae2df 100644
> > --- a/lib/librte_vhost/vhost_rxtx.c
> > +++ b/lib/librte_vhost/vhost_rxtx.c
> > @@ -59,7 +59,7 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t
> queue_id,
> >  	struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
> >  	uint64_t buff_addr = 0;
> >  	uint64_t buff_hdr_addr = 0;
> > -	uint32_t head[MAX_PKT_BURST], packet_len = 0;
> > +	uint32_t head[MAX_PKT_BURST];
> >  	uint32_t head_idx, packet_success = 0;
> >  	uint16_t avail_idx, res_cur_idx;
> >  	uint16_t res_base_idx, res_end_idx;
> > @@ -113,6 +113,10 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t
> queue_id,
> >  	rte_prefetch0(&vq->desc[head[packet_success]]);
> >
> >  	while (res_cur_idx != res_end_idx) {
> > +		uint32_t offset = 0;
> > +		uint32_t data_len, len_to_cpy;
> > +		uint8_t hdr = 0, uncompleted_pkt = 0;
> > +
> >  		/* Get descriptor from available ring */
> >  		desc = &vq->desc[head[packet_success]];
> >
> > @@ -125,7 +129,6 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t
> > queue_id,
> >
> >  		/* Copy virtio_hdr to packet and increment buffer address */
> >  		buff_hdr_addr = buff_addr;
> > -		packet_len = rte_pktmbuf_data_len(buff) + vq->vhost_hlen;
> >
> >  		/*
> >  		 * If the descriptors are chained the header and data are @@
> > -136,28 +139,55 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t
> queue_id,
> >  			desc = &vq->desc[desc->next];
> >  			/* Buffer address translation. */
> >  			buff_addr = gpa_to_vva(dev, desc->addr);
> > -			desc->len = rte_pktmbuf_data_len(buff);
> Do we got confirm from virtio SPEC that it is OK to only update used->len?

Virtio Spec don't require vhost update desc->len.


> >  		} else {
> >  			buff_addr += vq->vhost_hlen;
> > -			desc->len = packet_len;
> > +			hdr = 1;
> >  		}
> >
> > +		data_len = rte_pktmbuf_data_len(buff);
> > +		len_to_cpy = RTE_MIN(data_len,
> > +			hdr ? desc->len - vq->vhost_hlen : desc->len);
> > +		while (len_to_cpy > 0) {
> > +			/* Copy mbuf data to buffer */
> > +			rte_memcpy((void *)(uintptr_t)buff_addr,
> > +				(const void *)(rte_pktmbuf_mtod(buff,
> const char *) + offset),
> > +				len_to_cpy);
> > +			PRINT_PACKET(dev, (uintptr_t)buff_addr,
> > +				len_to_cpy, 0);
> > +
> > +			offset += len_to_cpy;
> > +
> > +			if (offset == data_len)
> > +				break;
> Ok, i see scatter gather case handling is in patch 5.
> > +
> > +			if (desc->flags & VRING_DESC_F_NEXT) {
> > +				desc = &vq->desc[desc->next];
> > +				buff_addr = gpa_to_vva(dev, desc->addr);
> > +				len_to_cpy = RTE_MIN(data_len - offset,
> desc->len);
> > +			} else {
> > +				/* Room in vring buffer is not enough */
> > +				uncompleted_pkt = 1;
> > +				break;
> > +			}
> > +		};
> > +
> >  		/* Update used ring with desc information */
> >  		vq->used->ring[res_cur_idx & (vq->size - 1)].id =
> >
> 	head[packet_success];
> > -		vq->used->ring[res_cur_idx & (vq->size - 1)].len =
> packet_len;
> >
> > -		/* Copy mbuf data to buffer */
> > -		/* FIXME for sg mbuf and the case that desc couldn't hold the
> mbuf data */
> > -		rte_memcpy((void *)(uintptr_t)buff_addr,
> > -			rte_pktmbuf_mtod(buff, const void *),
> > -			rte_pktmbuf_data_len(buff));
> > -		PRINT_PACKET(dev, (uintptr_t)buff_addr,
> > -			rte_pktmbuf_data_len(buff), 0);
> > +		/* Drop the packet if it is uncompleted */
> > +		if (unlikely(uncompleted_pkt == 1))
> > +			vq->used->ring[res_cur_idx & (vq->size - 1)].len = 0;
> Here things become complicated with the previous lockless reserve.

Why it become complicated? Len = 0 means it contain any meaningful data in the buffer.

> What is the consequence when guest sees zero in used->len? At least, do we
> check with virtio-net implementation?

> 
> > +		else
> > +			vq->used->ring[res_cur_idx & (vq->size - 1)].len =
> > +							offset + vq-
> >vhost_hlen;
> Two questions here,
> 1.  add virtio header len?
> 2.  Why not use packet_len rather than offset?
> >
> >  		res_cur_idx++;
> >  		packet_success++;
> >
> > +		if (unlikely(uncompleted_pkt == 1))
> > +			continue;
> > +
> >  		rte_memcpy((void *)(uintptr_t)buff_hdr_addr,
> >  			(const void *)&virtio_hdr, vq->vhost_hlen);
> >
> > @@ -589,7 +619,14 @@ rte_vhost_dequeue_burst(struct virtio_net *dev,
> uint16_t queue_id,
> >  		desc = &vq->desc[head[entry_success]];
> >
> >  		/* Discard first buffer as it is the virtio header */
> > -		desc = &vq->desc[desc->next];
> > +		if (desc->flags & VRING_DESC_F_NEXT) {
> > +			desc = &vq->desc[desc->next];
> > +			vb_offset = 0;
> > +			vb_avail = desc->len;
> > +		} else {
> > +			vb_offset = vq->vhost_hlen;
> > +			vb_avail = desc->len - vb_offset;
> > +		}
> >
> >  		/* Buffer address translation. */
> >  		vb_addr = gpa_to_vva(dev, desc->addr); @@ -608,8 +645,6
> @@
> > rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
> >  		vq->used->ring[used_idx].id = head[entry_success];
> >  		vq->used->ring[used_idx].len = 0;
> >
> > -		vb_offset = 0;
> > -		vb_avail = desc->len;
> >  		/* Allocate an mbuf and populate the structure. */
> >  		m = rte_pktmbuf_alloc(mbuf_pool);
> >  		if (unlikely(m == NULL)) {
  
Ouyang Changchun May 31, 2015, 1:20 p.m. UTC | #4
> -----Original Message-----
> From: Xie, Huawei
> Sent: Sunday, May 31, 2015 1:04 PM
> To: Ouyang, Changchun; dev@dpdk.org
> Cc: Cao, Waterman
> Subject: Re: [PATCH v2 1/5] lib_vhost: Fix enqueue/dequeue can't handle
> chained vring descriptors
> 
> On 5/28/2015 11:17 PM, Ouyang, Changchun wrote:
> > Vring enqueue need consider the 2 cases:
> >  1. Vring descriptors chained together, the first one is for virtio header, the
> rest are for real
> >     data, virtio driver in Linux usually use this scheme;  2. Only one
> > descriptor, virtio header and real data share one single descriptor, virtio-
> net pmd use
> >     such scheme;
> For the commit message, :), actually we should consider the desc chain as
> logically continuous memory space, so there is also the case like desc 1: virtio
> header and data; descs followed: data only.
> 

Ok, make sense, will update the description a bit in next version. 

> > So does vring dequeue, it should not assume vring descriptor is
> > chained or not chained, virtio in different Linux version has
> > different behavior, e.g. fedora 20 use chained vring descriptor, while
> fedora 21 use one single vring descriptor for tx.
> This behavior could be configured. Besides it is not bound to distribution but
> virtio-net driver.
> They key thing is we should consider the generic case, rather than fitting the
> requirement of existing  virtio-net implementation, so suggest remove the
> above message.
It also makes sense, I can remove it in next version. 
> >
> > Changes in v2
> >   - drop the uncompleted packet
> >   - refine code logic
> >
> > Signed-off-by: Changchun Ouyang <changchun.ouyang@intel.com>
> > ---
> >  lib/librte_vhost/vhost_rxtx.c | 65
> > +++++++++++++++++++++++++++++++++----------
> >  1 file changed, 50 insertions(+), 15 deletions(-)
> >
> > diff --git a/lib/librte_vhost/vhost_rxtx.c
> > b/lib/librte_vhost/vhost_rxtx.c index 4809d32..06ae2df 100644
> > --- a/lib/librte_vhost/vhost_rxtx.c
> > +++ b/lib/librte_vhost/vhost_rxtx.c
> > @@ -59,7 +59,7 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t
> queue_id,
> >  	struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
> >  	uint64_t buff_addr = 0;
> >  	uint64_t buff_hdr_addr = 0;
> > -	uint32_t head[MAX_PKT_BURST], packet_len = 0;
> > +	uint32_t head[MAX_PKT_BURST];
> >  	uint32_t head_idx, packet_success = 0;
> >  	uint16_t avail_idx, res_cur_idx;
> >  	uint16_t res_base_idx, res_end_idx;
> > @@ -113,6 +113,10 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t
> queue_id,
> >  	rte_prefetch0(&vq->desc[head[packet_success]]);
> >
> >  	while (res_cur_idx != res_end_idx) {
> > +		uint32_t offset = 0;
> > +		uint32_t data_len, len_to_cpy;
> > +		uint8_t hdr = 0, uncompleted_pkt = 0;
> > +
> >  		/* Get descriptor from available ring */
> >  		desc = &vq->desc[head[packet_success]];
> >
> > @@ -125,7 +129,6 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t
> > queue_id,
> >
> >  		/* Copy virtio_hdr to packet and increment buffer address */
> >  		buff_hdr_addr = buff_addr;
> > -		packet_len = rte_pktmbuf_data_len(buff) + vq->vhost_hlen;
> >
> >  		/*
> >  		 * If the descriptors are chained the header and data are @@
> > -136,28 +139,55 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t
> queue_id,
> >  			desc = &vq->desc[desc->next];
> >  			/* Buffer address translation. */
> >  			buff_addr = gpa_to_vva(dev, desc->addr);
> I am wondering if there is the possibility the [GPA, GPA+desc->len] could
> cross multiple memory regions.
> Don't expect to fix in this patch, :).
> > -			desc->len = rte_pktmbuf_data_len(buff);
> >  		} else {
> >  			buff_addr += vq->vhost_hlen;
> > -			desc->len = packet_len;
> > +			hdr = 1;
> >  		}
> >
> > +		data_len = rte_pktmbuf_data_len(buff);
> > +		len_to_cpy = RTE_MIN(data_len,
> > +			hdr ? desc->len - vq->vhost_hlen : desc->len);
> > +		while (len_to_cpy > 0) {
> > +			/* Copy mbuf data to buffer */
> > +			rte_memcpy((void *)(uintptr_t)buff_addr,
> > +				(const void *)(rte_pktmbuf_mtod(buff,
> const char *) + offset),
> > +				len_to_cpy);
> > +			PRINT_PACKET(dev, (uintptr_t)buff_addr,
> > +				len_to_cpy, 0);
> > +
> > +			offset += len_to_cpy;
> > +
> > +			if (offset == data_len)
> > +				break;
> I don't understand here. If offset reaches the end of the first segment, why
> don't we continue to copy from the next segment?
Data_len is the total length of the whole packet rather than len of just one segment.
Equal means it has copied the whole packet into vring buf, then it could break this while loop,
And continue to handle next packet.

> 
> > +
> > +			if (desc->flags & VRING_DESC_F_NEXT) {
> > +				desc = &vq->desc[desc->next];
> > +				buff_addr = gpa_to_vva(dev, desc->addr);
> > +				len_to_cpy = RTE_MIN(data_len - offset,
> desc->len);
> > +			} else {
> > +				/* Room in vring buffer is not enough */
> > +				uncompleted_pkt = 1;
> > +				break;
> > +			}
> > +		};
> > +
> >  		/* Update used ring with desc information */
> >  		vq->used->ring[res_cur_idx & (vq->size - 1)].id =
> >
> 	head[packet_success];
> > -		vq->used->ring[res_cur_idx & (vq->size - 1)].len =
> packet_len;
> >
> > -		/* Copy mbuf data to buffer */
> > -		/* FIXME for sg mbuf and the case that desc couldn't hold the
> mbuf data */
> > -		rte_memcpy((void *)(uintptr_t)buff_addr,
> > -			rte_pktmbuf_mtod(buff, const void *),
> > -			rte_pktmbuf_data_len(buff));
> > -		PRINT_PACKET(dev, (uintptr_t)buff_addr,
> > -			rte_pktmbuf_data_len(buff), 0);
> > +		/* Drop the packet if it is uncompleted */
> > +		if (unlikely(uncompleted_pkt == 1))
> > +			vq->used->ring[res_cur_idx & (vq->size - 1)].len = 0;
> > +		else
> > +			vq->used->ring[res_cur_idx & (vq->size - 1)].len =
> > +							offset + vq-
> >vhost_hlen;
> >
> >  		res_cur_idx++;
> >  		packet_success++;
> >
> > +		if (unlikely(uncompleted_pkt == 1))
> > +			continue;
> > +
> >  		rte_memcpy((void *)(uintptr_t)buff_hdr_addr,
> >  			(const void *)&virtio_hdr, vq->vhost_hlen);
> >
> > @@ -589,7 +619,14 @@ rte_vhost_dequeue_burst(struct virtio_net *dev,
> uint16_t queue_id,
> >  		desc = &vq->desc[head[entry_success]];
> >
> >  		/* Discard first buffer as it is the virtio header */
> > -		desc = &vq->desc[desc->next];
> > +		if (desc->flags & VRING_DESC_F_NEXT) {
> > +			desc = &vq->desc[desc->next];
> > +			vb_offset = 0;
> > +			vb_avail = desc->len;
> > +		} else {
> > +			vb_offset = vq->vhost_hlen;
> > +			vb_avail = desc->len - vb_offset;
> > +		}
> >
> >  		/* Buffer address translation. */
> >  		vb_addr = gpa_to_vva(dev, desc->addr); @@ -608,8 +645,6
> @@
> > rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
> >  		vq->used->ring[used_idx].id = head[entry_success];
> >  		vq->used->ring[used_idx].len = 0;
> >
> > -		vb_offset = 0;
> > -		vb_avail = desc->len;
> >  		/* Allocate an mbuf and populate the structure. */
> >  		m = rte_pktmbuf_alloc(mbuf_pool);
> >  		if (unlikely(m == NULL)) {
  
Ouyang Changchun May 31, 2015, 1:22 p.m. UTC | #5
> -----Original Message-----
> From: Ouyang, Changchun
> Sent: Sunday, May 31, 2015 9:00 PM
> To: Xie, Huawei; dev@dpdk.org
> Cc: Cao, Waterman; Ouyang, Changchun
> Subject: RE: [PATCH v2 1/5] lib_vhost: Fix enqueue/dequeue can't handle
> chained vring descriptors
> 
> 
> 
> > -----Original Message-----
> > From: Xie, Huawei
> > Sent: Sunday, May 31, 2015 4:41 PM
> > To: Ouyang, Changchun; dev@dpdk.org
> > Cc: Cao, Waterman
> > Subject: Re: [PATCH v2 1/5] lib_vhost: Fix enqueue/dequeue can't
> > handle chained vring descriptors
> >
> > On 5/28/2015 11:17 PM, Ouyang, Changchun wrote:
> > > Vring enqueue need consider the 2 cases:
> > >  1. Vring descriptors chained together, the first one is for virtio
> > > header, the
> > rest are for real
> > >     data, virtio driver in Linux usually use this scheme;  2. Only
> > > one descriptor, virtio header and real data share one single
> > > descriptor, virtio-
> > net pmd use
> > >     such scheme;
> > >
> > > So does vring dequeue, it should not assume vring descriptor is
> > > chained or not chained, virtio in different Linux version has
> > > different behavior, e.g. fedora 20 use chained vring descriptor,
> > > while
> > fedora 21 use one single vring descriptor for tx.
> > >
> > > Changes in v2
> > >   - drop the uncompleted packet
> > >   - refine code logic
> > >
> > > Signed-off-by: Changchun Ouyang <changchun.ouyang@intel.com>
> > > ---
> > >  lib/librte_vhost/vhost_rxtx.c | 65
> > > +++++++++++++++++++++++++++++++++----------
> > >  1 file changed, 50 insertions(+), 15 deletions(-)
> > >
> > > diff --git a/lib/librte_vhost/vhost_rxtx.c
> > > b/lib/librte_vhost/vhost_rxtx.c index 4809d32..06ae2df 100644
> > > --- a/lib/librte_vhost/vhost_rxtx.c
> > > +++ b/lib/librte_vhost/vhost_rxtx.c
> > > @@ -59,7 +59,7 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t
> > queue_id,
> > >  	struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
> > >  	uint64_t buff_addr = 0;
> > >  	uint64_t buff_hdr_addr = 0;
> > > -	uint32_t head[MAX_PKT_BURST], packet_len = 0;
> > > +	uint32_t head[MAX_PKT_BURST];
> > >  	uint32_t head_idx, packet_success = 0;
> > >  	uint16_t avail_idx, res_cur_idx;
> > >  	uint16_t res_base_idx, res_end_idx; @@ -113,6 +113,10 @@
> > > virtio_dev_rx(struct virtio_net *dev, uint16_t
> > queue_id,
> > >  	rte_prefetch0(&vq->desc[head[packet_success]]);
> > >
> > >  	while (res_cur_idx != res_end_idx) {
> > > +		uint32_t offset = 0;
> > > +		uint32_t data_len, len_to_cpy;
> > > +		uint8_t hdr = 0, uncompleted_pkt = 0;
> > > +
> > >  		/* Get descriptor from available ring */
> > >  		desc = &vq->desc[head[packet_success]];
> > >
> > > @@ -125,7 +129,6 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t
> > > queue_id,
> > >
> > >  		/* Copy virtio_hdr to packet and increment buffer address */
> > >  		buff_hdr_addr = buff_addr;
> > > -		packet_len = rte_pktmbuf_data_len(buff) + vq->vhost_hlen;
> > >
> > >  		/*
> > >  		 * If the descriptors are chained the header and data are @@
> > > -136,28 +139,55 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t
> > queue_id,
> > >  			desc = &vq->desc[desc->next];
> > >  			/* Buffer address translation. */
> > >  			buff_addr = gpa_to_vva(dev, desc->addr);
> > > -			desc->len = rte_pktmbuf_data_len(buff);
> > Do we got confirm from virtio SPEC that it is OK to only update used->len?
> 
> Virtio Spec don't require vhost update desc->len.
> 
> 
> > >  		} else {
> > >  			buff_addr += vq->vhost_hlen;
> > > -			desc->len = packet_len;
> > > +			hdr = 1;
> > >  		}
> > >
> > > +		data_len = rte_pktmbuf_data_len(buff);
> > > +		len_to_cpy = RTE_MIN(data_len,
> > > +			hdr ? desc->len - vq->vhost_hlen : desc->len);
> > > +		while (len_to_cpy > 0) {
> > > +			/* Copy mbuf data to buffer */
> > > +			rte_memcpy((void *)(uintptr_t)buff_addr,
> > > +				(const void *)(rte_pktmbuf_mtod(buff,
> > const char *) + offset),
> > > +				len_to_cpy);
> > > +			PRINT_PACKET(dev, (uintptr_t)buff_addr,
> > > +				len_to_cpy, 0);
> > > +
> > > +			offset += len_to_cpy;
> > > +
> > > +			if (offset == data_len)
> > > +				break;
> > Ok, i see scatter gather case handling is in patch 5.
> > > +
> > > +			if (desc->flags & VRING_DESC_F_NEXT) {
> > > +				desc = &vq->desc[desc->next];
> > > +				buff_addr = gpa_to_vva(dev, desc->addr);
> > > +				len_to_cpy = RTE_MIN(data_len - offset,
> > desc->len);
> > > +			} else {
> > > +				/* Room in vring buffer is not enough */
> > > +				uncompleted_pkt = 1;
> > > +				break;
> > > +			}
> > > +		};
> > > +
> > >  		/* Update used ring with desc information */
> > >  		vq->used->ring[res_cur_idx & (vq->size - 1)].id =
> > >
> > 	head[packet_success];
> > > -		vq->used->ring[res_cur_idx & (vq->size - 1)].len =
> > packet_len;
> > >
> > > -		/* Copy mbuf data to buffer */
> > > -		/* FIXME for sg mbuf and the case that desc couldn't hold the
> > mbuf data */
> > > -		rte_memcpy((void *)(uintptr_t)buff_addr,
> > > -			rte_pktmbuf_mtod(buff, const void *),
> > > -			rte_pktmbuf_data_len(buff));
> > > -		PRINT_PACKET(dev, (uintptr_t)buff_addr,
> > > -			rte_pktmbuf_data_len(buff), 0);
> > > +		/* Drop the packet if it is uncompleted */
> > > +		if (unlikely(uncompleted_pkt == 1))
> > > +			vq->used->ring[res_cur_idx & (vq->size - 1)].len = 0;
> > Here things become complicated with the previous lockless reserve.
> 
> Why it become complicated? Len = 0 means it contain any meaningful data in
> the buffer.
Sorry typo here, Len = 0 means it doesn't' contain any meaningful data in
 the buffer.

> 
> > What is the consequence when guest sees zero in used->len? At least,
> > do we check with virtio-net implementation?
> 
> >
> > > +		else
> > > +			vq->used->ring[res_cur_idx & (vq->size - 1)].len =
> > > +							offset + vq-
> > >vhost_hlen;
> > Two questions here,
> > 1.  add virtio header len?
> > 2.  Why not use packet_len rather than offset?
> > >
> > >  		res_cur_idx++;
> > >  		packet_success++;
> > >
> > > +		if (unlikely(uncompleted_pkt == 1))
> > > +			continue;
> > > +
> > >  		rte_memcpy((void *)(uintptr_t)buff_hdr_addr,
> > >  			(const void *)&virtio_hdr, vq->vhost_hlen);
> > >
> > > @@ -589,7 +619,14 @@ rte_vhost_dequeue_burst(struct virtio_net
> *dev,
> > uint16_t queue_id,
> > >  		desc = &vq->desc[head[entry_success]];
> > >
> > >  		/* Discard first buffer as it is the virtio header */
> > > -		desc = &vq->desc[desc->next];
> > > +		if (desc->flags & VRING_DESC_F_NEXT) {
> > > +			desc = &vq->desc[desc->next];
> > > +			vb_offset = 0;
> > > +			vb_avail = desc->len;
> > > +		} else {
> > > +			vb_offset = vq->vhost_hlen;
> > > +			vb_avail = desc->len - vb_offset;
> > > +		}
> > >
> > >  		/* Buffer address translation. */
> > >  		vb_addr = gpa_to_vva(dev, desc->addr); @@ -608,8 +645,6
> > @@
> > > rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
> > >  		vq->used->ring[used_idx].id = head[entry_success];
> > >  		vq->used->ring[used_idx].len = 0;
> > >
> > > -		vb_offset = 0;
> > > -		vb_avail = desc->len;
> > >  		/* Allocate an mbuf and populate the structure. */
> > >  		m = rte_pktmbuf_alloc(mbuf_pool);
> > >  		if (unlikely(m == NULL)) {
  
Ouyang Changchun May 31, 2015, 1:33 p.m. UTC | #6
> -----Original Message-----
> From: Xie, Huawei
> Sent: Sunday, May 31, 2015 4:41 PM
> To: Ouyang, Changchun; dev@dpdk.org
> Cc: Cao, Waterman
> Subject: Re: [PATCH v2 1/5] lib_vhost: Fix enqueue/dequeue can't handle
> chained vring descriptors
> 
> On 5/28/2015 11:17 PM, Ouyang, Changchun wrote:
> > Vring enqueue need consider the 2 cases:
> >  1. Vring descriptors chained together, the first one is for virtio header, the
> rest are for real
> >     data, virtio driver in Linux usually use this scheme;  2. Only one
> > descriptor, virtio header and real data share one single descriptor, virtio-
> net pmd use
> >     such scheme;
> >
> > So does vring dequeue, it should not assume vring descriptor is
> > chained or not chained, virtio in different Linux version has
> > different behavior, e.g. fedora 20 use chained vring descriptor, while
> fedora 21 use one single vring descriptor for tx.
> >
> > Changes in v2
> >   - drop the uncompleted packet
> >   - refine code logic
> >
> > Signed-off-by: Changchun Ouyang <changchun.ouyang@intel.com>
> > ---
> >  lib/librte_vhost/vhost_rxtx.c | 65
> > +++++++++++++++++++++++++++++++++----------
> >  1 file changed, 50 insertions(+), 15 deletions(-)
> >
> > diff --git a/lib/librte_vhost/vhost_rxtx.c
> > b/lib/librte_vhost/vhost_rxtx.c index 4809d32..06ae2df 100644
> > --- a/lib/librte_vhost/vhost_rxtx.c
> > +++ b/lib/librte_vhost/vhost_rxtx.c
> > @@ -59,7 +59,7 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t
> queue_id,
> >  	struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
> >  	uint64_t buff_addr = 0;
> >  	uint64_t buff_hdr_addr = 0;
> > -	uint32_t head[MAX_PKT_BURST], packet_len = 0;
> > +	uint32_t head[MAX_PKT_BURST];
> >  	uint32_t head_idx, packet_success = 0;
> >  	uint16_t avail_idx, res_cur_idx;
> >  	uint16_t res_base_idx, res_end_idx;
> > @@ -113,6 +113,10 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t
> queue_id,
> >  	rte_prefetch0(&vq->desc[head[packet_success]]);
> >
> >  	while (res_cur_idx != res_end_idx) {
> > +		uint32_t offset = 0;
> > +		uint32_t data_len, len_to_cpy;
> > +		uint8_t hdr = 0, uncompleted_pkt = 0;
> > +
> >  		/* Get descriptor from available ring */
> >  		desc = &vq->desc[head[packet_success]];
> >
> > @@ -125,7 +129,6 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t
> > queue_id,
> >
> >  		/* Copy virtio_hdr to packet and increment buffer address */
> >  		buff_hdr_addr = buff_addr;
> > -		packet_len = rte_pktmbuf_data_len(buff) + vq->vhost_hlen;
> >
> >  		/*
> >  		 * If the descriptors are chained the header and data are @@
> > -136,28 +139,55 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t
> queue_id,
> >  			desc = &vq->desc[desc->next];
> >  			/* Buffer address translation. */
> >  			buff_addr = gpa_to_vva(dev, desc->addr);
> > -			desc->len = rte_pktmbuf_data_len(buff);
> Do we got confirm from virtio SPEC that it is OK to only update used->len?
> >  		} else {
> >  			buff_addr += vq->vhost_hlen;
> > -			desc->len = packet_len;
> > +			hdr = 1;
> >  		}
> >
> > +		data_len = rte_pktmbuf_data_len(buff);
> > +		len_to_cpy = RTE_MIN(data_len,
> > +			hdr ? desc->len - vq->vhost_hlen : desc->len);
> > +		while (len_to_cpy > 0) {
> > +			/* Copy mbuf data to buffer */
> > +			rte_memcpy((void *)(uintptr_t)buff_addr,
> > +				(const void *)(rte_pktmbuf_mtod(buff,
> const char *) + offset),
> > +				len_to_cpy);
> > +			PRINT_PACKET(dev, (uintptr_t)buff_addr,
> > +				len_to_cpy, 0);
> > +
> > +			offset += len_to_cpy;
> > +
> > +			if (offset == data_len)
> > +				break;
> Ok, i see scatter gather case handling is in patch 5.
> > +
> > +			if (desc->flags & VRING_DESC_F_NEXT) {
> > +				desc = &vq->desc[desc->next];
> > +				buff_addr = gpa_to_vva(dev, desc->addr);
> > +				len_to_cpy = RTE_MIN(data_len - offset,
> desc->len);
> > +			} else {
> > +				/* Room in vring buffer is not enough */
> > +				uncompleted_pkt = 1;
> > +				break;
> > +			}
> > +		};
> > +
> >  		/* Update used ring with desc information */
> >  		vq->used->ring[res_cur_idx & (vq->size - 1)].id =
> >
> 	head[packet_success];
> > -		vq->used->ring[res_cur_idx & (vq->size - 1)].len =
> packet_len;
> >
> > -		/* Copy mbuf data to buffer */
> > -		/* FIXME for sg mbuf and the case that desc couldn't hold the
> mbuf data */
> > -		rte_memcpy((void *)(uintptr_t)buff_addr,
> > -			rte_pktmbuf_mtod(buff, const void *),
> > -			rte_pktmbuf_data_len(buff));
> > -		PRINT_PACKET(dev, (uintptr_t)buff_addr,
> > -			rte_pktmbuf_data_len(buff), 0);
> > +		/* Drop the packet if it is uncompleted */
> > +		if (unlikely(uncompleted_pkt == 1))
> > +			vq->used->ring[res_cur_idx & (vq->size - 1)].len = 0;
> Here things become complicated with the previous lockless reserve.
> What is the consequence when guest sees zero in used->len? At least, do we
> check with virtio-net implementation?
> 
> > +		else
> > +			vq->used->ring[res_cur_idx & (vq->size - 1)].len =
> > +							offset + vq-
> >vhost_hlen;
> Two questions here,
> 1.  add virtio header len?
Will double confirm it is necessary need plus header len here.

> 2.  Why not use packet_len rather than offset?
For me, they are exact same value if the whole packet is totally copied,
Do you have any concern I use offset here?

> >
> >  		res_cur_idx++;
> >  		packet_success++;
> >
> > +		if (unlikely(uncompleted_pkt == 1))
> > +			continue;
> > +
> >  		rte_memcpy((void *)(uintptr_t)buff_hdr_addr,
> >  			(const void *)&virtio_hdr, vq->vhost_hlen);
> >
> > @@ -589,7 +619,14 @@ rte_vhost_dequeue_burst(struct virtio_net *dev,
> uint16_t queue_id,
> >  		desc = &vq->desc[head[entry_success]];
> >
> >  		/* Discard first buffer as it is the virtio header */
> > -		desc = &vq->desc[desc->next];
> > +		if (desc->flags & VRING_DESC_F_NEXT) {
> > +			desc = &vq->desc[desc->next];
> > +			vb_offset = 0;
> > +			vb_avail = desc->len;
> > +		} else {
> > +			vb_offset = vq->vhost_hlen;
> > +			vb_avail = desc->len - vb_offset;
> > +		}
> >
> >  		/* Buffer address translation. */
> >  		vb_addr = gpa_to_vva(dev, desc->addr); @@ -608,8 +645,6
> @@
> > rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
> >  		vq->used->ring[used_idx].id = head[entry_success];
> >  		vq->used->ring[used_idx].len = 0;
> >
> > -		vb_offset = 0;
> > -		vb_avail = desc->len;
> >  		/* Allocate an mbuf and populate the structure. */
> >  		m = rte_pktmbuf_alloc(mbuf_pool);
> >  		if (unlikely(m == NULL)) {
  

Patch

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 4809d32..06ae2df 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -59,7 +59,7 @@  virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
 	struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
 	uint64_t buff_addr = 0;
 	uint64_t buff_hdr_addr = 0;
-	uint32_t head[MAX_PKT_BURST], packet_len = 0;
+	uint32_t head[MAX_PKT_BURST];
 	uint32_t head_idx, packet_success = 0;
 	uint16_t avail_idx, res_cur_idx;
 	uint16_t res_base_idx, res_end_idx;
@@ -113,6 +113,10 @@  virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
 	rte_prefetch0(&vq->desc[head[packet_success]]);
 
 	while (res_cur_idx != res_end_idx) {
+		uint32_t offset = 0;
+		uint32_t data_len, len_to_cpy;
+		uint8_t hdr = 0, uncompleted_pkt = 0;
+
 		/* Get descriptor from available ring */
 		desc = &vq->desc[head[packet_success]];
 
@@ -125,7 +129,6 @@  virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
 
 		/* Copy virtio_hdr to packet and increment buffer address */
 		buff_hdr_addr = buff_addr;
-		packet_len = rte_pktmbuf_data_len(buff) + vq->vhost_hlen;
 
 		/*
 		 * If the descriptors are chained the header and data are
@@ -136,28 +139,55 @@  virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
 			desc = &vq->desc[desc->next];
 			/* Buffer address translation. */
 			buff_addr = gpa_to_vva(dev, desc->addr);
-			desc->len = rte_pktmbuf_data_len(buff);
 		} else {
 			buff_addr += vq->vhost_hlen;
-			desc->len = packet_len;
+			hdr = 1;
 		}
 
+		data_len = rte_pktmbuf_data_len(buff);
+		len_to_cpy = RTE_MIN(data_len,
+			hdr ? desc->len - vq->vhost_hlen : desc->len);
+		while (len_to_cpy > 0) {
+			/* Copy mbuf data to buffer */
+			rte_memcpy((void *)(uintptr_t)buff_addr,
+				(const void *)(rte_pktmbuf_mtod(buff, const char *) + offset),
+				len_to_cpy);
+			PRINT_PACKET(dev, (uintptr_t)buff_addr,
+				len_to_cpy, 0);
+
+			offset += len_to_cpy;
+
+			if (offset == data_len)
+				break;
+
+			if (desc->flags & VRING_DESC_F_NEXT) {
+				desc = &vq->desc[desc->next];
+				buff_addr = gpa_to_vva(dev, desc->addr);
+				len_to_cpy = RTE_MIN(data_len - offset, desc->len);
+			} else {
+				/* Room in vring buffer is not enough */
+				uncompleted_pkt = 1;
+				break;
+			}
+		};
+
 		/* Update used ring with desc information */
 		vq->used->ring[res_cur_idx & (vq->size - 1)].id =
 							head[packet_success];
-		vq->used->ring[res_cur_idx & (vq->size - 1)].len = packet_len;
 
-		/* Copy mbuf data to buffer */
-		/* FIXME for sg mbuf and the case that desc couldn't hold the mbuf data */
-		rte_memcpy((void *)(uintptr_t)buff_addr,
-			rte_pktmbuf_mtod(buff, const void *),
-			rte_pktmbuf_data_len(buff));
-		PRINT_PACKET(dev, (uintptr_t)buff_addr,
-			rte_pktmbuf_data_len(buff), 0);
+		/* Drop the packet if it is uncompleted */
+		if (unlikely(uncompleted_pkt == 1))
+			vq->used->ring[res_cur_idx & (vq->size - 1)].len = 0;
+		else
+			vq->used->ring[res_cur_idx & (vq->size - 1)].len =
+							offset + vq->vhost_hlen;
 
 		res_cur_idx++;
 		packet_success++;
 
+		if (unlikely(uncompleted_pkt == 1))
+			continue;
+
 		rte_memcpy((void *)(uintptr_t)buff_hdr_addr,
 			(const void *)&virtio_hdr, vq->vhost_hlen);
 
@@ -589,7 +619,14 @@  rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
 		desc = &vq->desc[head[entry_success]];
 
 		/* Discard first buffer as it is the virtio header */
-		desc = &vq->desc[desc->next];
+		if (desc->flags & VRING_DESC_F_NEXT) {
+			desc = &vq->desc[desc->next];
+			vb_offset = 0;
+			vb_avail = desc->len;
+		} else {
+			vb_offset = vq->vhost_hlen;
+			vb_avail = desc->len - vb_offset;
+		}
 
 		/* Buffer address translation. */
 		vb_addr = gpa_to_vva(dev, desc->addr);
@@ -608,8 +645,6 @@  rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
 		vq->used->ring[used_idx].id = head[entry_success];
 		vq->used->ring[used_idx].len = 0;
 
-		vb_offset = 0;
-		vb_avail = desc->len;
 		/* Allocate an mbuf and populate the structure. */
 		m = rte_pktmbuf_alloc(mbuf_pool);
 		if (unlikely(m == NULL)) {