[v7,5/8] net/virtio: implement transmit path for packed queues

Message ID 20181003131118.21491-6-jfreimann@redhat.com (mailing list archive)
State Superseded, archived
Delegated to: Maxime Coquelin
Headers
Series implement packed virtqueues |

Checks

Context Check Description
ci/Intel-compilation success Compilation OK

Commit Message

Jens Freimann Oct. 3, 2018, 1:11 p.m. UTC
  This implements the transmit path for devices with
support for packed virtqueues.

Signed-off-by: Jens Freiman <jfreimann@redhat.com>
---
 drivers/net/virtio/virtio_ethdev.c |  32 ++--
 drivers/net/virtio/virtio_ethdev.h |   2 +
 drivers/net/virtio/virtio_ring.h   |  15 +-
 drivers/net/virtio/virtio_rxtx.c   | 276 +++++++++++++++++++++++++++++
 drivers/net/virtio/virtqueue.h     |  18 +-
 5 files changed, 329 insertions(+), 14 deletions(-)
  

Comments

Maxime Coquelin Oct. 10, 2018, 7:27 a.m. UTC | #1
On 10/03/2018 03:11 PM, Jens Freimann wrote:
> This implements the transmit path for devices with
> support for packed virtqueues.
> 
> Signed-off-by: Jens Freiman <jfreimann@redhat.com>
> ---
>   drivers/net/virtio/virtio_ethdev.c |  32 ++--
>   drivers/net/virtio/virtio_ethdev.h |   2 +
>   drivers/net/virtio/virtio_ring.h   |  15 +-
>   drivers/net/virtio/virtio_rxtx.c   | 276 +++++++++++++++++++++++++++++
>   drivers/net/virtio/virtqueue.h     |  18 +-
>   5 files changed, 329 insertions(+), 14 deletions(-)
> 
> diff --git a/drivers/net/virtio/virtio_ethdev.c b/drivers/net/virtio/virtio_ethdev.c
> index d6a1613dd..c65ac365c 100644
> --- a/drivers/net/virtio/virtio_ethdev.c
> +++ b/drivers/net/virtio/virtio_ethdev.c
> @@ -390,6 +390,8 @@ virtio_init_queue(struct rte_eth_dev *dev, uint16_t vtpci_queue_idx)
>   	vq->hw = hw;
>   	vq->vq_queue_index = vtpci_queue_idx;
>   	vq->vq_nentries = vq_size;
> +	if (vtpci_packed_queue(hw))
> +		vq->vq_ring.avail_wrap_counter = 1;
>   
>   	/*
>   	 * Reserve a memzone for vring elements
> @@ -496,16 +498,22 @@ virtio_init_queue(struct rte_eth_dev *dev, uint16_t vtpci_queue_idx)
>   		memset(txr, 0, vq_size * sizeof(*txr));
>   		for (i = 0; i < vq_size; i++) {
>   			struct vring_desc *start_dp = txr[i].tx_indir;
> -
> -			vring_desc_init_split(start_dp, RTE_DIM(txr[i].tx_indir));
> -
> +			struct vring_desc_packed*start_dp_packed = txr[i].tx_indir_pq;
> +	
>   			/* first indirect descriptor is always the tx header */
> -			start_dp->addr = txvq->virtio_net_hdr_mem
> -				+ i * sizeof(*txr)
> -				+ offsetof(struct virtio_tx_region, tx_hdr);
> -
> -			start_dp->len = hw->vtnet_hdr_size;
> -			start_dp->flags = VRING_DESC_F_NEXT;
> +			if (vtpci_packed_queue(hw)) {
No need to init desc here?

> +				start_dp_packed->addr = txvq->virtio_net_hdr_mem
> +					+ i * sizeof(*txr)
> +					+ offsetof(struct virtio_tx_region, tx_hdr);
> +				start_dp_packed->len = hw->vtnet_hdr_size;
> +			} else {
> +				vring_desc_init_split(start_dp, RTE_DIM(txr[i].tx_indir));
> +				start_dp->addr = txvq->virtio_net_hdr_mem
> +					+ i * sizeof(*txr)
> +					+ offsetof(struct virtio_tx_region, tx_hdr);
> +				start_dp->len = hw->vtnet_hdr_size;
> +				start_dp->flags = VRING_DESC_F_NEXT;
> +			}
>   		}
>   	}
>   
> @@ -1344,7 +1352,11 @@ set_rxtx_funcs(struct rte_eth_dev *eth_dev)
>   		eth_dev->rx_pkt_burst = &virtio_recv_pkts;
>   	}
>   
> -	if (hw->use_inorder_tx) {
> +	if (vtpci_packed_queue(hw)) {
> +		PMD_INIT_LOG(INFO, "virtio: using virtio 1.1 Tx path on port %u",
> +			eth_dev->data->port_id);
> +		eth_dev->tx_pkt_burst = virtio_xmit_pkts_packed;
> +	} else if (hw->use_inorder_tx) {
>   		PMD_INIT_LOG(INFO, "virtio: using inorder Tx path on port %u",
>   			eth_dev->data->port_id);
>   		eth_dev->tx_pkt_burst = virtio_xmit_pkts_inorder;
> diff --git a/drivers/net/virtio/virtio_ethdev.h b/drivers/net/virtio/virtio_ethdev.h
> index e0f80e5a4..05d355180 100644
> --- a/drivers/net/virtio/virtio_ethdev.h
> +++ b/drivers/net/virtio/virtio_ethdev.h
> @@ -82,6 +82,8 @@ uint16_t virtio_recv_mergeable_pkts_inorder(void *rx_queue,
>   
>   uint16_t virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts,
>   		uint16_t nb_pkts);
> +uint16_t virtio_xmit_pkts_packed(void *tx_queue, struct rte_mbuf **tx_pkts,
> +		uint16_t nb_pkts);
>   
>   uint16_t virtio_xmit_pkts_inorder(void *tx_queue, struct rte_mbuf **tx_pkts,
>   		uint16_t nb_pkts);
> diff --git a/drivers/net/virtio/virtio_ring.h b/drivers/net/virtio/virtio_ring.h
> index b9e63d4d4..dbffd4dcd 100644
> --- a/drivers/net/virtio/virtio_ring.h
> +++ b/drivers/net/virtio/virtio_ring.h
> @@ -108,14 +108,25 @@ set_desc_avail(struct vring *vr, struct vring_desc_packed *desc)
>   }
>   
>   static inline int
> -desc_is_used(struct vring_desc_packed *desc, struct vring *vr)
> +_desc_is_used(struct vring_desc_packed *desc)
>   {
>   	uint16_t used, avail;
>   
>   	used = !!(desc->flags & VRING_DESC_F_USED(1));
>   	avail = !!(desc->flags & VRING_DESC_F_AVAIL(1));
>   
> -	return used == avail && used == vr->used_wrap_counter;
> +	return used == avail;
> +
> +}
> +
> +static inline int
> +desc_is_used(struct vring_desc_packed *desc, struct vring *vr)
> +{
> +	uint16_t used;
> +
> +	used = !!(desc->flags & VRING_DESC_F_USED(1));
> +
> +	return _desc_is_used(desc) && used == vr->used_wrap_counter;
>   }

This is not in the right patch.

>   /* The standard layout for the ring is a continuous chunk of memory which
> diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c
> index eb891433e..4078fba8e 100644
> --- a/drivers/net/virtio/virtio_rxtx.c
> +++ b/drivers/net/virtio/virtio_rxtx.c
> @@ -38,6 +38,7 @@
>   #define  VIRTIO_DUMP_PACKET(m, len) do { } while (0)
>   #endif
>   
> +

Remove trailing line.

I need to review the remaining, but you can work on above comments in
the mean time.

Regards,
Maxime
  
Jens Freimann Oct. 10, 2018, 11:43 a.m. UTC | #2
On Wed, Oct 10, 2018 at 09:27:31AM +0200, Maxime Coquelin wrote:
>
>
>On 10/03/2018 03:11 PM, Jens Freimann wrote:
>>This implements the transmit path for devices with
>>support for packed virtqueues.
>>
>>Signed-off-by: Jens Freiman <jfreimann@redhat.com>
>>---
>>  drivers/net/virtio/virtio_ethdev.c |  32 ++--
>>  drivers/net/virtio/virtio_ethdev.h |   2 +
>>  drivers/net/virtio/virtio_ring.h   |  15 +-
>>  drivers/net/virtio/virtio_rxtx.c   | 276 +++++++++++++++++++++++++++++
>>  drivers/net/virtio/virtqueue.h     |  18 +-
>>  5 files changed, 329 insertions(+), 14 deletions(-)
>>
>>diff --git a/drivers/net/virtio/virtio_ethdev.c b/drivers/net/virtio/virtio_ethdev.c
>>index d6a1613dd..c65ac365c 100644
>>--- a/drivers/net/virtio/virtio_ethdev.c
>>+++ b/drivers/net/virtio/virtio_ethdev.c
>>@@ -390,6 +390,8 @@ virtio_init_queue(struct rte_eth_dev *dev, uint16_t vtpci_queue_idx)
>>  	vq->hw = hw;
>>  	vq->vq_queue_index = vtpci_queue_idx;
>>  	vq->vq_nentries = vq_size;
>>+	if (vtpci_packed_queue(hw))
>>+		vq->vq_ring.avail_wrap_counter = 1;
>>  	/*
>>  	 * Reserve a memzone for vring elements
>>@@ -496,16 +498,22 @@ virtio_init_queue(struct rte_eth_dev *dev, uint16_t vtpci_queue_idx)
>>  		memset(txr, 0, vq_size * sizeof(*txr));
>>  		for (i = 0; i < vq_size; i++) {
>>  			struct vring_desc *start_dp = txr[i].tx_indir;
>>-
>>-			vring_desc_init_split(start_dp, RTE_DIM(txr[i].tx_indir));
>>-
>>+			struct vring_desc_packed*start_dp_packed = txr[i].tx_indir_pq;
>>+	
>>  			/* first indirect descriptor is always the tx header */
>>-			start_dp->addr = txvq->virtio_net_hdr_mem
>>-				+ i * sizeof(*txr)
>>-				+ offsetof(struct virtio_tx_region, tx_hdr);
>>-
>>-			start_dp->len = hw->vtnet_hdr_size;
>>-			start_dp->flags = VRING_DESC_F_NEXT;
>>+			if (vtpci_packed_queue(hw)) {
>No need to init desc here?

No, for split rings this is only done to chain descriptors (set the
next field). For packed rings we don't need this.

>
>>+				start_dp_packed->addr = txvq->virtio_net_hdr_mem
>>+					+ i * sizeof(*txr)
>>+					+ offsetof(struct virtio_tx_region, tx_hdr);
>>+				start_dp_packed->len = hw->vtnet_hdr_size;
>>+			} else {
>>+				vring_desc_init_split(start_dp, RTE_DIM(txr[i].tx_indir));
>>+				start_dp->addr = txvq->virtio_net_hdr_mem
>>+					+ i * sizeof(*txr)
>>+					+ offsetof(struct virtio_tx_region, tx_hdr);
>>+				start_dp->len = hw->vtnet_hdr_size;
>>+				start_dp->flags = VRING_DESC_F_NEXT;
>>+			}
>>  		}
>>  	}
>>@@ -1344,7 +1352,11 @@ set_rxtx_funcs(struct rte_eth_dev *eth_dev)
>>  		eth_dev->rx_pkt_burst = &virtio_recv_pkts;
>>  	}
>>-	if (hw->use_inorder_tx) {
>>+	if (vtpci_packed_queue(hw)) {
>>+		PMD_INIT_LOG(INFO, "virtio: using virtio 1.1 Tx path on port %u",
>>+			eth_dev->data->port_id);
>>+		eth_dev->tx_pkt_burst = virtio_xmit_pkts_packed;
>>+	} else if (hw->use_inorder_tx) {
>>  		PMD_INIT_LOG(INFO, "virtio: using inorder Tx path on port %u",
>>  			eth_dev->data->port_id);
>>  		eth_dev->tx_pkt_burst = virtio_xmit_pkts_inorder;
>>diff --git a/drivers/net/virtio/virtio_ethdev.h b/drivers/net/virtio/virtio_ethdev.h
>>index e0f80e5a4..05d355180 100644
>>--- a/drivers/net/virtio/virtio_ethdev.h
>>+++ b/drivers/net/virtio/virtio_ethdev.h
>>@@ -82,6 +82,8 @@ uint16_t virtio_recv_mergeable_pkts_inorder(void *rx_queue,
>>  uint16_t virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts,
>>  		uint16_t nb_pkts);
>>+uint16_t virtio_xmit_pkts_packed(void *tx_queue, struct rte_mbuf **tx_pkts,
>>+		uint16_t nb_pkts);
>>  uint16_t virtio_xmit_pkts_inorder(void *tx_queue, struct rte_mbuf **tx_pkts,
>>  		uint16_t nb_pkts);
>>diff --git a/drivers/net/virtio/virtio_ring.h b/drivers/net/virtio/virtio_ring.h
>>index b9e63d4d4..dbffd4dcd 100644
>>--- a/drivers/net/virtio/virtio_ring.h
>>+++ b/drivers/net/virtio/virtio_ring.h
>>@@ -108,14 +108,25 @@ set_desc_avail(struct vring *vr, struct vring_desc_packed *desc)
>>  }
>>  static inline int
>>-desc_is_used(struct vring_desc_packed *desc, struct vring *vr)
>>+_desc_is_used(struct vring_desc_packed *desc)
>>  {
>>  	uint16_t used, avail;
>>  	used = !!(desc->flags & VRING_DESC_F_USED(1));
>>  	avail = !!(desc->flags & VRING_DESC_F_AVAIL(1));
>>-	return used == avail && used == vr->used_wrap_counter;
>>+	return used == avail;
>>+
>>+}
>>+
>>+static inline int
>>+desc_is_used(struct vring_desc_packed *desc, struct vring *vr)
>>+{
>>+	uint16_t used;
>>+
>>+	used = !!(desc->flags & VRING_DESC_F_USED(1));
>>+
>>+	return _desc_is_used(desc) && used == vr->used_wrap_counter;
>>  }
>
>This is not in the right patch.

yes, fixed.

>>  /* The standard layout for the ring is a continuous chunk of memory which
>>diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c
>>index eb891433e..4078fba8e 100644
>>--- a/drivers/net/virtio/virtio_rxtx.c
>>+++ b/drivers/net/virtio/virtio_rxtx.c
>>@@ -38,6 +38,7 @@
>>  #define  VIRTIO_DUMP_PACKET(m, len) do { } while (0)
>>  #endif
>>+
>
>Remove trailing line.
>
>I need to review the remaining, but you can work on above comments in
>the mean time.

done. Thanks for the review!

regards,
Jens
  
Maxime Coquelin Oct. 11, 2018, 5:31 p.m. UTC | #3
I'm testing your series, and it gets stuck after 256 packets in transmit
path. When it happens, descs flags indicate it has been made available
by the driver (desc->flags = 0x80), but it is not consistent with the
expected wrap counter value (0).

Not sure this is the root cause, but it seems below code is broken:

On 10/03/2018 03:11 PM, Jens Freimann wrote:
> +static inline void
> +virtqueue_enqueue_xmit_packed(struct virtnet_tx *txvq, struct rte_mbuf *cookie,
> +			uint16_t needed, int use_indirect, int can_push,
> +			int in_order)
> +{
> +	struct virtio_tx_region *txr = txvq->virtio_net_hdr_mz->addr;
> +	struct vq_desc_extra *dxp, *head_dxp;
> +	struct virtqueue *vq = txvq->vq;
> +	struct vring_desc_packed *start_dp, *head_dp;
> +	uint16_t seg_num = cookie->nb_segs;
> +	uint16_t idx, head_id;
> +	uint16_t head_size = vq->hw->vtnet_hdr_size;
> +	struct virtio_net_hdr *hdr;
> +	int wrap_counter = vq->vq_ring.avail_wrap_counter;
> +
> +	head_id = vq->vq_desc_head_idx;
> +	idx = head_id;	
> +	start_dp = vq->vq_ring.desc_packed;
> +	dxp = &vq->vq_descx[idx];
> +	dxp->ndescs = needed;
> +
> +	head_dp = &vq->vq_ring.desc_packed[head_id];
> +	head_dxp = &vq->vq_descx[head_id];
> +	head_dxp->cookie = (void *) cookie;
> +
> +	if (can_push) {
> +		/* prepend cannot fail, checked by caller */
> +		hdr = (struct virtio_net_hdr *)
> +			rte_pktmbuf_prepend(cookie, head_size);
> +		/* rte_pktmbuf_prepend() counts the hdr size to the pkt length,
> +		 * which is wrong. Below subtract restores correct pkt size.
> +		 */
> +		cookie->pkt_len -= head_size;
> +
> +		/* if offload disabled, it is not zeroed below, do it now */
> +		if (!vq->hw->has_tx_offload) {
> +			ASSIGN_UNLESS_EQUAL(hdr->csum_start, 0);
> +			ASSIGN_UNLESS_EQUAL(hdr->csum_offset, 0);
> +			ASSIGN_UNLESS_EQUAL(hdr->flags, 0);
> +			ASSIGN_UNLESS_EQUAL(hdr->gso_type, 0);
> +			ASSIGN_UNLESS_EQUAL(hdr->gso_size, 0);
> +			ASSIGN_UNLESS_EQUAL(hdr->hdr_len, 0);
> +		}
> +	} else if (use_indirect) {
> +		/* setup tx ring slot to point to indirect
> +		 * descriptor list stored in reserved region.
> +		 *
> +		 * the first slot in indirect ring is already preset
> +		 * to point to the header in reserved region
> +		 */
> +		start_dp[idx].addr  = txvq->virtio_net_hdr_mem +
> +			RTE_PTR_DIFF(&txr[idx].tx_indir_pq, txr);
> +		start_dp[idx].len   = (seg_num + 1) * sizeof(struct vring_desc_packed);
> +		start_dp[idx].flags = VRING_DESC_F_INDIRECT;
> +		hdr = (struct virtio_net_hdr *)&txr[idx].tx_hdr;
> +
> +		/* loop below will fill in rest of the indirect elements */
> +		start_dp = txr[idx].tx_indir_pq;
> +		idx = 1;
> +	} else {
> +		/* setup first tx ring slot to point to header
> +		 * stored in reserved region.
> +		 */
> +		start_dp[idx].addr  = txvq->virtio_net_hdr_mem +
> +			RTE_PTR_DIFF(&txr[idx].tx_hdr, txr);
> +		start_dp[idx].len   = vq->hw->vtnet_hdr_size;
> +		start_dp[idx].flags = VRING_DESC_F_NEXT;
> +		start_dp[idx].flags |=
> +			VRING_DESC_F_AVAIL(vq->vq_ring.avail_wrap_counter) |
> +			VRING_DESC_F_USED(!vq->vq_ring.avail_wrap_counter);
> +		hdr = (struct virtio_net_hdr *)&txr[idx].tx_hdr;
> +		idx = dxp->next;
> +	}
> +
> +	virtqueue_xmit_offload(hdr, cookie, vq->hw->has_tx_offload);
> +
> +	do {
> +		if (idx >= vq->vq_nentries) {
> +			idx = 0;
> +			vq->vq_ring.avail_wrap_counter ^= 1;
> +		}
> +		start_dp[idx].addr  = VIRTIO_MBUF_DATA_DMA_ADDR(cookie, vq);
> +		start_dp[idx].len   = cookie->data_len;
> +		start_dp[idx].flags = cookie->next ? VRING_DESC_F_NEXT : 0;
> +		start_dp[idx].flags |=
> +			VRING_DESC_F_AVAIL(vq->vq_ring.avail_wrap_counter) |
> +			VRING_DESC_F_USED(!vq->vq_ring.avail_wrap_counter);
> +		if (use_indirect) {
> +			if (++idx >= (seg_num + 1))
> +				break;
> +		} else {
> +			dxp = &vq->vq_descx[idx];
> +			idx = dxp->next;
> +		}

Imagine current idx is 255, dxp->next will give idx 0, right?
In that case, for desc[0], on next iteration, the flags won't be set 
available properly, as vq->vq_ring.avail_wrap_counter isn't updated.

I'm not sure how it could work like this, shouldn't dxp save the wrap
counter value in out-of-order case?
  
Jens Freimann Oct. 12, 2018, 7:24 a.m. UTC | #4
On Thu, Oct 11, 2018 at 07:31:57PM +0200, Maxime Coquelin wrote:
>
>I'm testing your series, and it gets stuck after 256 packets in transmit
>path. When it happens, descs flags indicate it has been made available
>by the driver (desc->flags = 0x80), but it is not consistent with the
>expected wrap counter value (0).
>
>Not sure this is the root cause, but it seems below code is broken:

>
>On 10/03/2018 03:11 PM, Jens Freimann wrote:

[snip]
>+
>>+	do {
>>+		if (idx >= vq->vq_nentries) {
>>+			idx = 0;
>>+			vq->vq_ring.avail_wrap_counter ^= 1;
>>+		}
>>+		start_dp[idx].addr  = VIRTIO_MBUF_DATA_DMA_ADDR(cookie, vq);
>>+		start_dp[idx].len   = cookie->data_len;
>>+		start_dp[idx].flags = cookie->next ? VRING_DESC_F_NEXT : 0;
>>+		start_dp[idx].flags |=
>>+			VRING_DESC_F_AVAIL(vq->vq_ring.avail_wrap_counter) |
>>+			VRING_DESC_F_USED(!vq->vq_ring.avail_wrap_counter);
>>+		if (use_indirect) {
>>+			if (++idx >= (seg_num + 1))
>>+				break;
>>+		} else {
>>+			dxp = &vq->vq_descx[idx];
>>+			idx = dxp->next;
>>+		}
>
>Imagine current idx is 255, dxp->next will give idx 0, right?

No it will be VQ_RING_DESC_CHAIN_END which is defined as 32768. 

>In that case, for desc[0], on next iteration, the flags won't be set 
>available properly, as vq->vq_ring.avail_wrap_counter isn't updated.

It will wrap because VQ_RING_DESC_CHAIN_END is > ring size.

I can't reproduce what you see. Do you test with txonly fwd mode?

regards,
Jens
  
Maxime Coquelin Oct. 12, 2018, 7:41 a.m. UTC | #5
On 10/12/2018 09:24 AM, Jens Freimann wrote:
> On Thu, Oct 11, 2018 at 07:31:57PM +0200, Maxime Coquelin wrote:
>>
>> I'm testing your series, and it gets stuck after 256 packets in transmit
>> path. When it happens, descs flags indicate it has been made available
>> by the driver (desc->flags = 0x80), but it is not consistent with the
>> expected wrap counter value (0).
>>
>> Not sure this is the root cause, but it seems below code is broken:
> 
>>
>> On 10/03/2018 03:11 PM, Jens Freimann wrote:
> 
> [snip]
>> +
>>> +    do {
>>> +        if (idx >= vq->vq_nentries) {
>>> +            idx = 0;
>>> +            vq->vq_ring.avail_wrap_counter ^= 1;
>>> +        }
>>> +        start_dp[idx].addr  = VIRTIO_MBUF_DATA_DMA_ADDR(cookie, vq);
>>> +        start_dp[idx].len   = cookie->data_len;
>>> +        start_dp[idx].flags = cookie->next ? VRING_DESC_F_NEXT : 0;
>>> +        start_dp[idx].flags |=
>>> +            VRING_DESC_F_AVAIL(vq->vq_ring.avail_wrap_counter) |
>>> +            VRING_DESC_F_USED(!vq->vq_ring.avail_wrap_counter);
>>> +        if (use_indirect) {
>>> +            if (++idx >= (seg_num + 1))
>>> +                break;
>>> +        } else {
>>> +            dxp = &vq->vq_descx[idx];
>>> +            idx = dxp->next;
>>> +        }
>>
>> Imagine current idx is 255, dxp->next will give idx 0, right?
> 
> No it will be VQ_RING_DESC_CHAIN_END which is defined as 32768.
>> In that case, for desc[0], on next iteration, the flags won't be set 
>> available properly, as vq->vq_ring.avail_wrap_counter isn't updated.
> 
> It will wrap because VQ_RING_DESC_CHAIN_END is > ring size.

I'm not sure to understand. I dig a bit deeper into the code and may
come back if I have questions.

> I can't reproduce what you see. Do you test with txonly fwd mode?

Yes, txonly fwd mode.

On host side, I have latest master with my 2 patches series enabling
packed ring in vhost backend and Tiwei's patch fixing notification
disablement.

For QEMU, I use the branch you shared and started it with below cmdline:
./x86_64-softmmu/qemu-system-x86_64 -enable-kvm -m 3072 -smp 3 
-machine q35 -cpu host   -chardev socket,id=char0,path=/tmp/vhost-user1 
  -netdev type=vhost-user,id=hn2,chardev=char0,vhostforce,queues=1 
-device 
virtio-net-pci,netdev=hn2,id=v0,mq=off,mrg_rxbuf=off,ring_packed=on,mac=52:54:00:11:22:11 
   -object 
memory-backend-file,id=mem,size=3G,mem-path=/dev/hugepages,share=on 
-numa node,memdev=mem -mem-prealloc   -k fr   -net 
user,hostfwd=tcp::10021-:22 -net nic   -serial stdio 
/home/virt/rhel7.6-1.qcow2

Regards,
Maxime

> regards,
> Jens
  

Patch

diff --git a/drivers/net/virtio/virtio_ethdev.c b/drivers/net/virtio/virtio_ethdev.c
index d6a1613dd..c65ac365c 100644
--- a/drivers/net/virtio/virtio_ethdev.c
+++ b/drivers/net/virtio/virtio_ethdev.c
@@ -390,6 +390,8 @@  virtio_init_queue(struct rte_eth_dev *dev, uint16_t vtpci_queue_idx)
 	vq->hw = hw;
 	vq->vq_queue_index = vtpci_queue_idx;
 	vq->vq_nentries = vq_size;
+	if (vtpci_packed_queue(hw))
+		vq->vq_ring.avail_wrap_counter = 1;
 
 	/*
 	 * Reserve a memzone for vring elements
@@ -496,16 +498,22 @@  virtio_init_queue(struct rte_eth_dev *dev, uint16_t vtpci_queue_idx)
 		memset(txr, 0, vq_size * sizeof(*txr));
 		for (i = 0; i < vq_size; i++) {
 			struct vring_desc *start_dp = txr[i].tx_indir;
-
-			vring_desc_init_split(start_dp, RTE_DIM(txr[i].tx_indir));
-
+			struct vring_desc_packed*start_dp_packed = txr[i].tx_indir_pq;
+	
 			/* first indirect descriptor is always the tx header */
-			start_dp->addr = txvq->virtio_net_hdr_mem
-				+ i * sizeof(*txr)
-				+ offsetof(struct virtio_tx_region, tx_hdr);
-
-			start_dp->len = hw->vtnet_hdr_size;
-			start_dp->flags = VRING_DESC_F_NEXT;
+			if (vtpci_packed_queue(hw)) {
+				start_dp_packed->addr = txvq->virtio_net_hdr_mem
+					+ i * sizeof(*txr)
+					+ offsetof(struct virtio_tx_region, tx_hdr);
+				start_dp_packed->len = hw->vtnet_hdr_size;
+			} else {
+				vring_desc_init_split(start_dp, RTE_DIM(txr[i].tx_indir));
+				start_dp->addr = txvq->virtio_net_hdr_mem
+					+ i * sizeof(*txr)
+					+ offsetof(struct virtio_tx_region, tx_hdr);
+				start_dp->len = hw->vtnet_hdr_size;
+				start_dp->flags = VRING_DESC_F_NEXT;
+			}
 		}
 	}
 
@@ -1344,7 +1352,11 @@  set_rxtx_funcs(struct rte_eth_dev *eth_dev)
 		eth_dev->rx_pkt_burst = &virtio_recv_pkts;
 	}
 
-	if (hw->use_inorder_tx) {
+	if (vtpci_packed_queue(hw)) {
+		PMD_INIT_LOG(INFO, "virtio: using virtio 1.1 Tx path on port %u",
+			eth_dev->data->port_id);
+		eth_dev->tx_pkt_burst = virtio_xmit_pkts_packed;
+	} else if (hw->use_inorder_tx) {
 		PMD_INIT_LOG(INFO, "virtio: using inorder Tx path on port %u",
 			eth_dev->data->port_id);
 		eth_dev->tx_pkt_burst = virtio_xmit_pkts_inorder;
diff --git a/drivers/net/virtio/virtio_ethdev.h b/drivers/net/virtio/virtio_ethdev.h
index e0f80e5a4..05d355180 100644
--- a/drivers/net/virtio/virtio_ethdev.h
+++ b/drivers/net/virtio/virtio_ethdev.h
@@ -82,6 +82,8 @@  uint16_t virtio_recv_mergeable_pkts_inorder(void *rx_queue,
 
 uint16_t virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts,
 		uint16_t nb_pkts);
+uint16_t virtio_xmit_pkts_packed(void *tx_queue, struct rte_mbuf **tx_pkts,
+		uint16_t nb_pkts);
 
 uint16_t virtio_xmit_pkts_inorder(void *tx_queue, struct rte_mbuf **tx_pkts,
 		uint16_t nb_pkts);
diff --git a/drivers/net/virtio/virtio_ring.h b/drivers/net/virtio/virtio_ring.h
index b9e63d4d4..dbffd4dcd 100644
--- a/drivers/net/virtio/virtio_ring.h
+++ b/drivers/net/virtio/virtio_ring.h
@@ -108,14 +108,25 @@  set_desc_avail(struct vring *vr, struct vring_desc_packed *desc)
 }
 
 static inline int
-desc_is_used(struct vring_desc_packed *desc, struct vring *vr)
+_desc_is_used(struct vring_desc_packed *desc)
 {
 	uint16_t used, avail;
 
 	used = !!(desc->flags & VRING_DESC_F_USED(1));
 	avail = !!(desc->flags & VRING_DESC_F_AVAIL(1));
 
-	return used == avail && used == vr->used_wrap_counter;
+	return used == avail;
+
+}
+
+static inline int
+desc_is_used(struct vring_desc_packed *desc, struct vring *vr)
+{
+	uint16_t used;
+
+	used = !!(desc->flags & VRING_DESC_F_USED(1));
+
+	return _desc_is_used(desc) && used == vr->used_wrap_counter;
 }
 
 /* The standard layout for the ring is a continuous chunk of memory which
diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c
index eb891433e..4078fba8e 100644
--- a/drivers/net/virtio/virtio_rxtx.c
+++ b/drivers/net/virtio/virtio_rxtx.c
@@ -38,6 +38,7 @@ 
 #define  VIRTIO_DUMP_PACKET(m, len) do { } while (0)
 #endif
 
+
 int
 virtio_dev_rx_queue_done(void *rxq, uint16_t offset)
 {
@@ -88,6 +89,41 @@  vq_ring_free_chain(struct virtqueue *vq, uint16_t desc_idx)
 	dp->next = VQ_RING_DESC_CHAIN_END;
 }
 
+void
+vq_ring_free_chain_packed(struct virtqueue *vq, uint16_t desc_idx)
+{
+	struct vring_desc_packed *dp;
+	struct vq_desc_extra *dxp = NULL, *dxp_tail = NULL;
+	uint16_t desc_idx_last = desc_idx;
+
+	dp  = &vq->vq_ring.desc_packed[desc_idx];
+	dxp = &vq->vq_descx[desc_idx];
+	vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt + dxp->ndescs);
+	if ((dp->flags & VRING_DESC_F_INDIRECT) == 0) {
+		while (dp->flags & VRING_DESC_F_NEXT) {
+			desc_idx_last = dxp->next;
+			dp = &vq->vq_ring.desc_packed[dxp->next];
+			dxp = &vq->vq_descx[dxp->next];
+		}
+	}
+
+	/*
+	 * We must append the existing free chain, if any, to the end of
+	 * newly freed chain. If the virtqueue was completely used, then
+	 * head would be VQ_RING_DESC_CHAIN_END (ASSERTed above).
+	 */
+	if (vq->vq_desc_tail_idx == VQ_RING_DESC_CHAIN_END) {
+		vq->vq_desc_head_idx = desc_idx;
+	} 
+	else {
+		dxp_tail = &vq->vq_descx[vq->vq_desc_tail_idx];
+		dxp_tail->next = desc_idx;
+	}
+
+	vq->vq_desc_tail_idx = desc_idx_last;
+	dxp->next = VQ_RING_DESC_CHAIN_END;
+}
+
 static uint16_t
 virtqueue_dequeue_burst_rx(struct virtqueue *vq, struct rte_mbuf **rx_pkts,
 			   uint32_t *len, uint16_t num)
@@ -165,6 +201,33 @@  virtqueue_dequeue_rx_inorder(struct virtqueue *vq,
 #endif
 
 /* Cleanup from completed transmits. */
+static void
+virtio_xmit_cleanup_packed(struct virtqueue *vq)
+{
+	uint16_t used_idx, id;
+	uint16_t size = vq->vq_nentries;
+	struct vring_desc_packed *desc = vq->vq_ring.desc_packed;
+	struct vq_desc_extra *dxp;
+
+	used_idx = vq->vq_used_cons_idx;
+	while (desc_is_used(&desc[used_idx], &vq->vq_ring) &&
+	       vq->vq_free_cnt < size) {
+		used_idx = vq->vq_used_cons_idx;
+		id = desc[used_idx].index; 
+		dxp = &vq->vq_descx[id];
+		if (++vq->vq_used_cons_idx >= size) {
+			vq->vq_used_cons_idx -= size;
+			vq->vq_ring.used_wrap_counter ^= 1;
+		}
+		vq_ring_free_chain_packed(vq, id);
+		if (dxp->cookie != NULL) {
+			rte_pktmbuf_free(dxp->cookie);
+			dxp->cookie = NULL;
+		}
+		used_idx = vq->vq_used_cons_idx;
+	}
+}
+
 static void
 virtio_xmit_cleanup(struct virtqueue *vq, uint16_t num)
 {
@@ -456,6 +519,128 @@  virtqueue_enqueue_xmit_inorder(struct virtnet_tx *txvq,
 	vq->vq_desc_head_idx = idx & (vq->vq_nentries - 1);
 }
 
+static inline void
+virtqueue_enqueue_xmit_packed(struct virtnet_tx *txvq, struct rte_mbuf *cookie,
+			uint16_t needed, int use_indirect, int can_push,
+			int in_order)
+{
+	struct virtio_tx_region *txr = txvq->virtio_net_hdr_mz->addr;
+	struct vq_desc_extra *dxp, *head_dxp;
+	struct virtqueue *vq = txvq->vq;
+	struct vring_desc_packed *start_dp, *head_dp;
+	uint16_t seg_num = cookie->nb_segs;
+	uint16_t idx, head_id;
+	uint16_t head_size = vq->hw->vtnet_hdr_size;
+	struct virtio_net_hdr *hdr;
+	int wrap_counter = vq->vq_ring.avail_wrap_counter;
+
+	head_id = vq->vq_desc_head_idx;
+	idx = head_id;	
+	start_dp = vq->vq_ring.desc_packed;
+	dxp = &vq->vq_descx[idx];
+	dxp->ndescs = needed;
+
+	head_dp = &vq->vq_ring.desc_packed[head_id];
+	head_dxp = &vq->vq_descx[head_id];
+	head_dxp->cookie = (void *) cookie;
+
+	if (can_push) {
+		/* prepend cannot fail, checked by caller */
+		hdr = (struct virtio_net_hdr *)
+			rte_pktmbuf_prepend(cookie, head_size);
+		/* rte_pktmbuf_prepend() counts the hdr size to the pkt length,
+		 * which is wrong. Below subtract restores correct pkt size.
+		 */
+		cookie->pkt_len -= head_size;
+
+		/* if offload disabled, it is not zeroed below, do it now */
+		if (!vq->hw->has_tx_offload) {
+			ASSIGN_UNLESS_EQUAL(hdr->csum_start, 0);
+			ASSIGN_UNLESS_EQUAL(hdr->csum_offset, 0);
+			ASSIGN_UNLESS_EQUAL(hdr->flags, 0);
+			ASSIGN_UNLESS_EQUAL(hdr->gso_type, 0);
+			ASSIGN_UNLESS_EQUAL(hdr->gso_size, 0);
+			ASSIGN_UNLESS_EQUAL(hdr->hdr_len, 0);
+		}
+	} else if (use_indirect) {
+		/* setup tx ring slot to point to indirect
+		 * descriptor list stored in reserved region.
+		 *
+		 * the first slot in indirect ring is already preset
+		 * to point to the header in reserved region
+		 */
+		start_dp[idx].addr  = txvq->virtio_net_hdr_mem +
+			RTE_PTR_DIFF(&txr[idx].tx_indir_pq, txr);
+		start_dp[idx].len   = (seg_num + 1) * sizeof(struct vring_desc_packed);
+		start_dp[idx].flags = VRING_DESC_F_INDIRECT;
+		hdr = (struct virtio_net_hdr *)&txr[idx].tx_hdr;
+
+		/* loop below will fill in rest of the indirect elements */
+		start_dp = txr[idx].tx_indir_pq;
+		idx = 1;
+	} else {
+		/* setup first tx ring slot to point to header
+		 * stored in reserved region.
+		 */
+		start_dp[idx].addr  = txvq->virtio_net_hdr_mem +
+			RTE_PTR_DIFF(&txr[idx].tx_hdr, txr);
+		start_dp[idx].len   = vq->hw->vtnet_hdr_size;
+		start_dp[idx].flags = VRING_DESC_F_NEXT;
+		start_dp[idx].flags |=
+			VRING_DESC_F_AVAIL(vq->vq_ring.avail_wrap_counter) |
+			VRING_DESC_F_USED(!vq->vq_ring.avail_wrap_counter);
+		hdr = (struct virtio_net_hdr *)&txr[idx].tx_hdr;
+		idx = dxp->next;
+	}
+
+	virtqueue_xmit_offload(hdr, cookie, vq->hw->has_tx_offload);
+
+	do {
+		if (idx >= vq->vq_nentries) {
+			idx = 0;
+			vq->vq_ring.avail_wrap_counter ^= 1;
+		}
+		start_dp[idx].addr  = VIRTIO_MBUF_DATA_DMA_ADDR(cookie, vq);
+		start_dp[idx].len   = cookie->data_len;
+		start_dp[idx].flags = cookie->next ? VRING_DESC_F_NEXT : 0;
+		start_dp[idx].flags |=
+			VRING_DESC_F_AVAIL(vq->vq_ring.avail_wrap_counter) |
+			VRING_DESC_F_USED(!vq->vq_ring.avail_wrap_counter);
+		if (use_indirect) {
+			if (++idx >= (seg_num + 1))
+				break;
+		} else {
+			dxp = &vq->vq_descx[idx];
+			idx = dxp->next;
+		}
+	} while ((cookie = cookie->next) != NULL);
+
+	if (use_indirect)
+		idx = vq->vq_ring.desc_packed[head_id].index;
+
+	vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - needed);
+
+	rte_smp_wmb();
+	if (needed > 1)
+		head_dp->flags = VRING_DESC_F_NEXT |
+			VRING_DESC_F_AVAIL(wrap_counter) |
+			VRING_DESC_F_USED(!wrap_counter);
+	rte_smp_mb();
+
+	if (!in_order) {
+		if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END)
+			vq->vq_desc_tail_idx = idx;
+	}
+	if (idx >= vq->vq_nentries) {
+		idx = 0;
+		vq->vq_ring.avail_wrap_counter ^= 1;
+	}
+	vq->vq_desc_head_idx = idx;
+	vq->vq_avail_idx = idx;
+
+}
+
+
 static inline void
 virtqueue_enqueue_xmit(struct virtnet_tx *txvq, struct rte_mbuf *cookie,
 			uint16_t needed, int use_indirect, int can_push,
@@ -1346,6 +1531,97 @@  virtio_recv_mergeable_pkts(void *rx_queue,
 	return nb_rx;
 }
 
+uint16_t
+virtio_xmit_pkts_packed(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
+{
+	struct virtnet_tx *txvq = tx_queue;
+	struct virtqueue *vq = txvq->vq;
+	struct virtio_hw *hw = vq->hw;
+	uint16_t hdr_size = hw->vtnet_hdr_size;
+	uint16_t nb_tx = 0;
+	int error;
+
+	if (unlikely(hw->started == 0 && tx_pkts != hw->inject_pkts))
+		return nb_tx;
+
+	if (unlikely(nb_pkts < 1))
+		return nb_pkts;
+
+	PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts);
+
+	virtio_rmb();
+	if (likely(nb_pkts > vq->vq_nentries - vq->vq_free_thresh))
+		virtio_xmit_cleanup_packed(vq);
+
+	for (nb_tx = 0; nb_tx < nb_pkts; nb_tx++) {
+		struct rte_mbuf *txm = tx_pkts[nb_tx];
+		int can_push = 0, use_indirect = 0, slots, need;
+
+		/* Do VLAN tag insertion */
+		if (unlikely(txm->ol_flags & PKT_TX_VLAN_PKT)) {
+			error = rte_vlan_insert(&txm);
+			if (unlikely(error)) {
+				rte_pktmbuf_free(txm);
+				continue;
+			}
+		}
+
+		/* optimize ring usage */
+		if ((vtpci_with_feature(hw, VIRTIO_F_ANY_LAYOUT) ||
+		      vtpci_with_feature(hw, VIRTIO_F_VERSION_1)) &&
+		    rte_mbuf_refcnt_read(txm) == 1 &&
+		    RTE_MBUF_DIRECT(txm) &&
+		    txm->nb_segs == 1 &&
+		    rte_pktmbuf_headroom(txm) >= hdr_size &&
+		    rte_is_aligned(rte_pktmbuf_mtod(txm, char *),
+				   __alignof__(struct virtio_net_hdr_mrg_rxbuf)))
+			can_push = 1;
+		else if (vtpci_with_feature(hw, VIRTIO_RING_F_INDIRECT_DESC) &&
+			 txm->nb_segs < VIRTIO_MAX_TX_INDIRECT)
+			use_indirect = 1;
+
+		/* How many main ring entries are needed to this Tx?
+		 * any_layout => number of segments
+		 * indirect   => 1
+		 * default    => number of segments + 1
+		 */
+		slots = use_indirect ? 1 : (txm->nb_segs + !can_push);
+		need = slots - vq->vq_free_cnt;
+
+		/* Positive value indicates it need free vring descriptors */
+		if (unlikely(need > 0)) {
+			virtio_rmb();
+			need = RTE_MIN(need, (int)nb_pkts);
+
+			virtio_xmit_cleanup_packed(vq);
+			need = slots - vq->vq_free_cnt;
+			if (unlikely(need > 0)) {
+				PMD_TX_LOG(ERR,
+					   "No free tx descriptors to transmit");
+				break;
+			}
+		}
+
+		/* Enqueue Packet buffers */
+		virtqueue_enqueue_xmit_packed(txvq, txm, slots, use_indirect,
+			can_push, 0);
+
+		txvq->stats.bytes += txm->pkt_len;
+		virtio_update_packet_stats(&txvq->stats, txm);
+	}
+
+	txvq->stats.packets += nb_tx;
+
+	if (likely(nb_tx)) {
+		if (unlikely(virtqueue_kick_prepare_packed(vq))) {
+			virtqueue_notify(vq);
+			PMD_TX_LOG(DEBUG, "Notified backend after xmit");
+		}
+	}
+
+	return nb_tx;
+}
+
 uint16_t
 virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
 {
diff --git a/drivers/net/virtio/virtqueue.h b/drivers/net/virtio/virtqueue.h
index 549962c28..58f600648 100644
--- a/drivers/net/virtio/virtqueue.h
+++ b/drivers/net/virtio/virtqueue.h
@@ -242,8 +242,12 @@  struct virtio_net_hdr_mrg_rxbuf {
 #define VIRTIO_MAX_TX_INDIRECT 8
 struct virtio_tx_region {
 	struct virtio_net_hdr_mrg_rxbuf tx_hdr;
-	struct vring_desc tx_indir[VIRTIO_MAX_TX_INDIRECT]
-			   __attribute__((__aligned__(16)));
+	union {
+		struct vring_desc tx_indir[VIRTIO_MAX_TX_INDIRECT]
+			__attribute__((__aligned__(16)));
+		struct vring_desc_packed tx_indir_pq[VIRTIO_MAX_TX_INDIRECT]
+			__attribute__((__aligned__(16)));
+	};
 };
 
 static inline uint16_t
@@ -328,6 +332,7 @@  virtio_get_queue_type(struct virtio_hw *hw, uint16_t vtpci_queue_idx)
 #define VIRTQUEUE_NUSED(vq) ((uint16_t)((vq)->vq_ring.used->idx - (vq)->vq_used_cons_idx))
 
 void vq_ring_free_chain(struct virtqueue *vq, uint16_t desc_idx);
+void vq_ring_free_chain_packed(struct virtqueue *vq, uint16_t desc_idx);
 void vq_ring_free_inorder(struct virtqueue *vq, uint16_t desc_idx,
 			  uint16_t num);
 
@@ -361,6 +366,15 @@  virtqueue_kick_prepare(struct virtqueue *vq)
 	return !(vq->vq_ring.used->flags & VRING_USED_F_NO_NOTIFY);
 }
 
+static inline int
+virtqueue_kick_prepare_packed(struct virtqueue *vq)
+{
+	uint16_t flags;
+
+	flags = vq->vq_ring.device_event->desc_event_flags & RING_EVENT_FLAGS_DESC;
+	return (flags != RING_EVENT_FLAGS_DISABLE);
+}
+
 static inline void
 virtqueue_notify(struct virtqueue *vq)
 {