[v6,06/11] net/virtio: implement transmit path for packed queues

Message ID 20180921103308.16357-7-jfreimann@redhat.com (mailing list archive)
State Superseded, archived
Delegated to: Maxime Coquelin
Headers
Series implement packed virtqueues |

Checks

Context Check Description
ci/Intel-compilation success Compilation OK

Commit Message

Jens Freimann Sept. 21, 2018, 10:33 a.m. UTC
  This implements the transmit path for devices with
support for packed virtqueues.

Add the feature bit and enable code to
add buffers to vring and mark descriptors as available.

Signed-off-by: Jens Freiman <jfreimann@redhat.com>
---
 drivers/net/virtio/virtio_ethdev.c |   8 +-
 drivers/net/virtio/virtio_ethdev.h |   2 +
 drivers/net/virtio/virtio_ring.h   |  15 +-
 drivers/net/virtio/virtio_rxtx.c   | 243 +++++++++++++++++++++++++++++
 drivers/net/virtio/virtqueue.h     |  17 +-
 5 files changed, 280 insertions(+), 5 deletions(-)
  

Comments

Tiwei Bie Sept. 21, 2018, 12:26 p.m. UTC | #1
On Fri, Sep 21, 2018 at 12:33:03PM +0200, Jens Freimann wrote:
[...]
>  
>  static inline int
> -desc_is_used(struct vring_desc_packed *desc, struct vring *vr)
> +_desc_is_used(struct vring_desc_packed *desc)
>  {
>  	uint16_t used, avail;
>  
>  	used = !!(desc->flags & VRING_DESC_F_USED(1));
>  	avail = !!(desc->flags & VRING_DESC_F_AVAIL(1));
>  
> -	return used == avail && used == vr->used_wrap_counter;
> +	return used == avail;
> +
> +}
> +
> +static inline int
> +desc_is_used(struct vring_desc_packed *desc, struct vring *vr)
> +{
> +	uint16_t used;
> +
> +	used = !!(desc->flags & VRING_DESC_F_USED(1));
> +
> +	return _desc_is_used(desc) && used == vr->used_wrap_counter;
>  }
>  
>  /* The standard layout for the ring is a continuous chunk of memory which
> diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c
> index eb891433e..ea6300563 100644
> --- a/drivers/net/virtio/virtio_rxtx.c
> +++ b/drivers/net/virtio/virtio_rxtx.c
> @@ -38,6 +38,7 @@
>  #define  VIRTIO_DUMP_PACKET(m, len) do { } while (0)
>  #endif
>  
> +
>  int
>  virtio_dev_rx_queue_done(void *rxq, uint16_t offset)
>  {
> @@ -165,6 +166,31 @@ virtqueue_dequeue_rx_inorder(struct virtqueue *vq,
>  #endif
>  
>  /* Cleanup from completed transmits. */
> +static void
> +virtio_xmit_cleanup_packed(struct virtqueue *vq)
> +{
> +	uint16_t idx;
> +	uint16_t size = vq->vq_nentries;
> +	struct vring_desc_packed *desc = vq->vq_ring.desc_packed;
> +	struct vq_desc_extra *dxp;
> +
> +	idx = vq->vq_used_cons_idx;
> +	while (_desc_is_used(&desc[idx]) &&

We can't just compare the AVAIL bit and USED bit to
check whether a desc is used.

> +	       vq->vq_free_cnt < size) {
> +		dxp = &vq->vq_descx[idx];

The code is still assuming the descs will be written
back by device in order. The vq->vq_descx[] needs to
be managed e.g. as a list to support the out-of-order
processing. IOW, we can't assume vq->vq_descx[idx]
is corresponding to desc[idx] when device may write
back the descs out of order.

> +		vq->vq_free_cnt += dxp->ndescs;
> +		idx += dxp->ndescs;
> +		idx = idx >= size ? idx - size : idx;
> +		if (idx == 0) {
> +			vq->vq_ring.used_wrap_counter ^= 1;
> +		}
> +		if (dxp->cookie != NULL) {
> +			rte_pktmbuf_free(dxp->cookie);
> +			dxp->cookie = NULL;
> +		}
> +	}
> +}
[...]
  
Jens Freimann Sept. 21, 2018, 12:37 p.m. UTC | #2
On Fri, Sep 21, 2018 at 08:26:58PM +0800, Tiwei Bie wrote:
>On Fri, Sep 21, 2018 at 12:33:03PM +0200, Jens Freimann wrote:
>[...]
>>
>>  static inline int
>> -desc_is_used(struct vring_desc_packed *desc, struct vring *vr)
>> +_desc_is_used(struct vring_desc_packed *desc)
>>  {
>>  	uint16_t used, avail;
>>
>>  	used = !!(desc->flags & VRING_DESC_F_USED(1));
>>  	avail = !!(desc->flags & VRING_DESC_F_AVAIL(1));
>>
>> -	return used == avail && used == vr->used_wrap_counter;
>> +	return used == avail;
>> +
>> +}
>> +
>> +static inline int
>> +desc_is_used(struct vring_desc_packed *desc, struct vring *vr)
>> +{
>> +	uint16_t used;
>> +
>> +	used = !!(desc->flags & VRING_DESC_F_USED(1));
>> +
>> +	return _desc_is_used(desc) && used == vr->used_wrap_counter;
>>  }
>>
>>  /* The standard layout for the ring is a continuous chunk of memory which
>> diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c
>> index eb891433e..ea6300563 100644
>> --- a/drivers/net/virtio/virtio_rxtx.c
>> +++ b/drivers/net/virtio/virtio_rxtx.c
>> @@ -38,6 +38,7 @@
>>  #define  VIRTIO_DUMP_PACKET(m, len) do { } while (0)
>>  #endif
>>
>> +
>>  int
>>  virtio_dev_rx_queue_done(void *rxq, uint16_t offset)
>>  {
>> @@ -165,6 +166,31 @@ virtqueue_dequeue_rx_inorder(struct virtqueue *vq,
>>  #endif
>>
>>  /* Cleanup from completed transmits. */
>> +static void
>> +virtio_xmit_cleanup_packed(struct virtqueue *vq)
>> +{
>> +	uint16_t idx;
>> +	uint16_t size = vq->vq_nentries;
>> +	struct vring_desc_packed *desc = vq->vq_ring.desc_packed;
>> +	struct vq_desc_extra *dxp;
>> +
>> +	idx = vq->vq_used_cons_idx;
>> +	while (_desc_is_used(&desc[idx]) &&
>
>We can't just compare the AVAIL bit and USED bit to
>check whether a desc is used.

We can't compare with the current wrap counter value as well
because it won't match the flags in descriptors. So check against
used_wrap_counter ^= 1 then?
>
>> +	       vq->vq_free_cnt < size) {
>> +		dxp = &vq->vq_descx[idx];
>
>The code is still assuming the descs will be written
>back by device in order. The vq->vq_descx[] needs to
>be managed e.g. as a list to support the out-of-order
>processing. IOW, we can't assume vq->vq_descx[idx]
>is corresponding to desc[idx] when device may write
>back the descs out of order.

I changed it to not assume this in other spots but missed this one.  I
will check more carefully and add code to make vq_descx entries a list.

Thanks for the review!

regards,
Jens
  
Tiwei Bie Sept. 21, 2018, 12:49 p.m. UTC | #3
On Fri, Sep 21, 2018 at 02:37:32PM +0200, Jens Freimann wrote:
> On Fri, Sep 21, 2018 at 08:26:58PM +0800, Tiwei Bie wrote:
> > On Fri, Sep 21, 2018 at 12:33:03PM +0200, Jens Freimann wrote:
> > [...]
> > > 
> > >  static inline int
> > > -desc_is_used(struct vring_desc_packed *desc, struct vring *vr)
> > > +_desc_is_used(struct vring_desc_packed *desc)
> > >  {
> > >  	uint16_t used, avail;
> > > 
> > >  	used = !!(desc->flags & VRING_DESC_F_USED(1));
> > >  	avail = !!(desc->flags & VRING_DESC_F_AVAIL(1));
> > > 
> > > -	return used == avail && used == vr->used_wrap_counter;
> > > +	return used == avail;
> > > +
> > > +}
> > > +
> > > +static inline int
> > > +desc_is_used(struct vring_desc_packed *desc, struct vring *vr)
> > > +{
> > > +	uint16_t used;
> > > +
> > > +	used = !!(desc->flags & VRING_DESC_F_USED(1));
> > > +
> > > +	return _desc_is_used(desc) && used == vr->used_wrap_counter;
> > >  }
> > > 
> > >  /* The standard layout for the ring is a continuous chunk of memory which
> > > diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c
> > > index eb891433e..ea6300563 100644
> > > --- a/drivers/net/virtio/virtio_rxtx.c
> > > +++ b/drivers/net/virtio/virtio_rxtx.c
> > > @@ -38,6 +38,7 @@
> > >  #define  VIRTIO_DUMP_PACKET(m, len) do { } while (0)
> > >  #endif
> > > 
> > > +
> > >  int
> > >  virtio_dev_rx_queue_done(void *rxq, uint16_t offset)
> > >  {
> > > @@ -165,6 +166,31 @@ virtqueue_dequeue_rx_inorder(struct virtqueue *vq,
> > >  #endif
> > > 
> > >  /* Cleanup from completed transmits. */
> > > +static void
> > > +virtio_xmit_cleanup_packed(struct virtqueue *vq)
> > > +{
> > > +	uint16_t idx;
> > > +	uint16_t size = vq->vq_nentries;
> > > +	struct vring_desc_packed *desc = vq->vq_ring.desc_packed;
> > > +	struct vq_desc_extra *dxp;
> > > +
> > > +	idx = vq->vq_used_cons_idx;
> > > +	while (_desc_is_used(&desc[idx]) &&
> > 
> > We can't just compare the AVAIL bit and USED bit to
> > check whether a desc is used.
> 
> We can't compare with the current wrap counter value as well
> because it won't match the flags in descriptors. So check against
> used_wrap_counter ^= 1 then?

I haven't looked into this series yet, so I'm not sure what's
the best way to get the wrap-counter we need here. But, yes,
you need some way to get the wrap-counter we should use here.

> > 
> > > +	       vq->vq_free_cnt < size) {
> > > +		dxp = &vq->vq_descx[idx];
> > 
> > The code is still assuming the descs will be written
> > back by device in order. The vq->vq_descx[] needs to
> > be managed e.g. as a list to support the out-of-order
> > processing. IOW, we can't assume vq->vq_descx[idx]
> > is corresponding to desc[idx] when device may write
> > back the descs out of order.
> 
> I changed it to not assume this in other spots but missed this one.  I
> will check more carefully and add code to make vq_descx entries a list.

After making it support the out-of-order, we may want to do
some performance test for the Tx path only. Because I suspect
we may not be able to get the expected performance improvements
in packed ring due to this when device is faster than driver.

Thanks
  

Patch

diff --git a/drivers/net/virtio/virtio_ethdev.c b/drivers/net/virtio/virtio_ethdev.c
index 29f3e1043..5c28af282 100644
--- a/drivers/net/virtio/virtio_ethdev.c
+++ b/drivers/net/virtio/virtio_ethdev.c
@@ -384,6 +384,8 @@  virtio_init_queue(struct rte_eth_dev *dev, uint16_t vtpci_queue_idx)
 	vq->hw = hw;
 	vq->vq_queue_index = vtpci_queue_idx;
 	vq->vq_nentries = vq_size;
+	if (vtpci_packed_queue(hw))
+		vq->vq_ring.avail_wrap_counter = 1;
 
 	/*
 	 * Reserve a memzone for vring elements
@@ -1338,7 +1340,11 @@  set_rxtx_funcs(struct rte_eth_dev *eth_dev)
 		eth_dev->rx_pkt_burst = &virtio_recv_pkts;
 	}
 
-	if (hw->use_inorder_tx) {
+	if (vtpci_packed_queue(hw)) {
+		PMD_INIT_LOG(INFO, "virtio: using virtio 1.1 Tx path on port %u",
+			eth_dev->data->port_id);
+		eth_dev->tx_pkt_burst = virtio_xmit_pkts_packed;
+	} else if (hw->use_inorder_tx) {
 		PMD_INIT_LOG(INFO, "virtio: using inorder Tx path on port %u",
 			eth_dev->data->port_id);
 		eth_dev->tx_pkt_burst = virtio_xmit_pkts_inorder;
diff --git a/drivers/net/virtio/virtio_ethdev.h b/drivers/net/virtio/virtio_ethdev.h
index b726ad108..04161b461 100644
--- a/drivers/net/virtio/virtio_ethdev.h
+++ b/drivers/net/virtio/virtio_ethdev.h
@@ -79,6 +79,8 @@  uint16_t virtio_recv_mergeable_pkts_inorder(void *rx_queue,
 
 uint16_t virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts,
 		uint16_t nb_pkts);
+uint16_t virtio_xmit_pkts_packed(void *tx_queue, struct rte_mbuf **tx_pkts,
+		uint16_t nb_pkts);
 
 uint16_t virtio_xmit_pkts_inorder(void *tx_queue, struct rte_mbuf **tx_pkts,
 		uint16_t nb_pkts);
diff --git a/drivers/net/virtio/virtio_ring.h b/drivers/net/virtio/virtio_ring.h
index b9e63d4d4..dbffd4dcd 100644
--- a/drivers/net/virtio/virtio_ring.h
+++ b/drivers/net/virtio/virtio_ring.h
@@ -108,14 +108,25 @@  set_desc_avail(struct vring *vr, struct vring_desc_packed *desc)
 }
 
 static inline int
-desc_is_used(struct vring_desc_packed *desc, struct vring *vr)
+_desc_is_used(struct vring_desc_packed *desc)
 {
 	uint16_t used, avail;
 
 	used = !!(desc->flags & VRING_DESC_F_USED(1));
 	avail = !!(desc->flags & VRING_DESC_F_AVAIL(1));
 
-	return used == avail && used == vr->used_wrap_counter;
+	return used == avail;
+
+}
+
+static inline int
+desc_is_used(struct vring_desc_packed *desc, struct vring *vr)
+{
+	uint16_t used;
+
+	used = !!(desc->flags & VRING_DESC_F_USED(1));
+
+	return _desc_is_used(desc) && used == vr->used_wrap_counter;
 }
 
 /* The standard layout for the ring is a continuous chunk of memory which
diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c
index eb891433e..ea6300563 100644
--- a/drivers/net/virtio/virtio_rxtx.c
+++ b/drivers/net/virtio/virtio_rxtx.c
@@ -38,6 +38,7 @@ 
 #define  VIRTIO_DUMP_PACKET(m, len) do { } while (0)
 #endif
 
+
 int
 virtio_dev_rx_queue_done(void *rxq, uint16_t offset)
 {
@@ -165,6 +166,31 @@  virtqueue_dequeue_rx_inorder(struct virtqueue *vq,
 #endif
 
 /* Cleanup from completed transmits. */
+static void
+virtio_xmit_cleanup_packed(struct virtqueue *vq)
+{
+	uint16_t idx;
+	uint16_t size = vq->vq_nentries;
+	struct vring_desc_packed *desc = vq->vq_ring.desc_packed;
+	struct vq_desc_extra *dxp;
+
+	idx = vq->vq_used_cons_idx;
+	while (_desc_is_used(&desc[idx]) &&
+	       vq->vq_free_cnt < size) {
+		dxp = &vq->vq_descx[idx];
+		vq->vq_free_cnt += dxp->ndescs;
+		idx += dxp->ndescs;
+		idx = idx >= size ? idx - size : idx;
+		if (idx == 0) {
+			vq->vq_ring.used_wrap_counter ^= 1;
+		}
+		if (dxp->cookie != NULL) {
+			rte_pktmbuf_free(dxp->cookie);
+			dxp->cookie = NULL;
+		}
+	}
+}
+
 static void
 virtio_xmit_cleanup(struct virtqueue *vq, uint16_t num)
 {
@@ -456,6 +482,129 @@  virtqueue_enqueue_xmit_inorder(struct virtnet_tx *txvq,
 	vq->vq_desc_head_idx = idx & (vq->vq_nentries - 1);
 }
 
+static inline void
+virtqueue_enqueue_xmit_packed(struct virtnet_tx *txvq, struct rte_mbuf *cookie,
+			uint16_t needed, int use_indirect, int can_push,
+			int in_order)
+{
+	struct virtio_tx_region *txr = txvq->virtio_net_hdr_mz->addr;
+	struct vq_desc_extra *dxp;
+	struct virtqueue *vq = txvq->vq;
+	struct vring_desc_packed *start_dp;
+	uint16_t seg_num = cookie->nb_segs;
+	uint16_t head_idx, idx, prev;
+	uint16_t head_id;
+	uint16_t head_size = vq->hw->vtnet_hdr_size;
+	struct virtio_net_hdr *hdr;
+	int wrap_counter = vq->vq_ring.avail_wrap_counter;
+
+	head_idx = vq->vq_desc_head_idx;
+	idx = head_idx;
+	dxp = &vq->vq_descx[idx];
+	dxp->cookie = (void *)cookie;
+	dxp->ndescs = needed;
+
+	start_dp = vq->vq_ring.desc_packed;
+	head_id = start_dp[head_idx].index;
+
+	if (can_push) {
+		/* prepend cannot fail, checked by caller */
+		hdr = (struct virtio_net_hdr *)
+			rte_pktmbuf_prepend(cookie, head_size);
+		/* rte_pktmbuf_prepend() counts the hdr size to the pkt length,
+		 * which is wrong. Below subtract restores correct pkt size.
+		 */
+		cookie->pkt_len -= head_size;
+
+		/* if offload disabled, it is not zeroed below, do it now */
+		if (!vq->hw->has_tx_offload) {
+			ASSIGN_UNLESS_EQUAL(hdr->csum_start, 0);
+			ASSIGN_UNLESS_EQUAL(hdr->csum_offset, 0);
+			ASSIGN_UNLESS_EQUAL(hdr->flags, 0);
+			ASSIGN_UNLESS_EQUAL(hdr->gso_type, 0);
+			ASSIGN_UNLESS_EQUAL(hdr->gso_size, 0);
+			ASSIGN_UNLESS_EQUAL(hdr->hdr_len, 0);
+		}
+	} else if (use_indirect) {
+		/* setup tx ring slot to point to indirect
+		 * descriptor list stored in reserved region.
+		 *
+		 * the first slot in indirect ring is already preset
+		 * to point to the header in reserved region
+		 */
+		start_dp[idx].addr  = txvq->virtio_net_hdr_mem +
+			RTE_PTR_DIFF(&txr[idx].tx_indir, txr);
+		start_dp[idx].len   = (seg_num + 1) * sizeof(struct vring_desc);
+		start_dp[idx].flags = VRING_DESC_F_INDIRECT;
+		hdr = (struct virtio_net_hdr *)&txr[idx].tx_hdr;
+
+		/* loop below will fill in rest of the indirect elements */
+		start_dp = txr[idx].tx_indir_pq;
+		idx = 1;
+	} else {
+		/* setup first tx ring slot to point to header
+		 * stored in reserved region.
+		 */
+		start_dp[idx].addr  = txvq->virtio_net_hdr_mem +
+			RTE_PTR_DIFF(&txr[idx].tx_hdr, txr);
+		start_dp[idx].len   = vq->hw->vtnet_hdr_size;
+		start_dp[idx].flags = VRING_DESC_F_NEXT |
+			VRING_DESC_F_AVAIL(vq->vq_ring.avail_wrap_counter) |
+			VRING_DESC_F_USED(!vq->vq_ring.used_wrap_counter); 
+		hdr = (struct virtio_net_hdr *)&txr[idx].tx_hdr;
+
+		idx++;
+	}
+
+	virtqueue_xmit_offload(hdr, cookie, vq->hw->has_tx_offload);
+
+	do {
+		if (idx >= vq->vq_nentries) {
+			idx -= vq->vq_nentries;
+			vq->vq_ring.avail_wrap_counter ^= 1;
+			vq->vq_ring.used_wrap_counter ^= 1;
+		}
+		start_dp[idx].addr  = VIRTIO_MBUF_DATA_DMA_ADDR(cookie, vq);
+		start_dp[idx].len   = cookie->data_len;
+		start_dp[idx].flags = VRING_DESC_F_NEXT |
+			VRING_DESC_F_AVAIL(vq->vq_ring.avail_wrap_counter) |
+			VRING_DESC_F_USED(!vq->vq_ring.used_wrap_counter); 
+		idx++;
+	} while ((cookie = cookie->next) != NULL);
+
+	if (use_indirect)
+		idx = vq->vq_ring.desc_packed[head_idx].index;
+
+	if (idx >= vq->vq_nentries) {
+		idx -= vq->vq_nentries;
+		vq->vq_ring.avail_wrap_counter ^= 1;
+		vq->vq_ring.used_wrap_counter ^= 1;
+	}
+
+	vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - needed);
+
+	if (needed > 1) {
+		prev = (idx - 1 > 0 ? idx - 1 : vq->vq_nentries) - 1;
+		start_dp[prev].index = head_id;
+		start_dp[prev].flags =
+			(VRING_DESC_F_AVAIL(wrap_counter) |
+			 VRING_DESC_F_USED(!wrap_counter));
+	}
+	start_dp[head_idx].flags =
+		(VRING_DESC_F_AVAIL(wrap_counter) |
+		 VRING_DESC_F_USED(!wrap_counter));
+	rte_smp_wmb();
+
+	vq->vq_desc_head_idx = idx;
+	vq->vq_avail_idx = idx;
+
+	if (!in_order) {
+		if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END)
+			vq->vq_desc_tail_idx = idx;
+	}
+}
+
+
 static inline void
 virtqueue_enqueue_xmit(struct virtnet_tx *txvq, struct rte_mbuf *cookie,
 			uint16_t needed, int use_indirect, int can_push,
@@ -736,6 +885,9 @@  virtio_dev_tx_queue_setup_finish(struct rte_eth_dev *dev,
 	if (hw->use_inorder_tx)
 		vq->vq_ring.desc[vq->vq_nentries - 1].next = 0;
 
+	if (vtpci_packed_queue(hw))
+		vq->vq_ring.avail_wrap_counter = 1;
+
 	VIRTQUEUE_DUMP(vq);
 
 	return 0;
@@ -1346,6 +1498,97 @@  virtio_recv_mergeable_pkts(void *rx_queue,
 	return nb_rx;
 }
 
+uint16_t
+virtio_xmit_pkts_packed(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
+{
+	struct virtnet_tx *txvq = tx_queue;
+	struct virtqueue *vq = txvq->vq;
+	struct virtio_hw *hw = vq->hw;
+	uint16_t hdr_size = hw->vtnet_hdr_size;
+	uint16_t nb_tx = 0;
+	int error;
+
+	if (unlikely(hw->started == 0 && tx_pkts != hw->inject_pkts))
+		return nb_tx;
+
+	if (unlikely(nb_pkts < 1))
+		return nb_pkts;
+
+	PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts);
+
+	virtio_rmb();
+	if (likely(nb_pkts > vq->vq_nentries - vq->vq_free_thresh))
+		virtio_xmit_cleanup_packed(vq);
+
+	for (nb_tx = 0; nb_tx < nb_pkts; nb_tx++) {
+		struct rte_mbuf *txm = tx_pkts[nb_tx];
+		int can_push = 0, use_indirect = 0, slots, need;
+
+		/* Do VLAN tag insertion */
+		if (unlikely(txm->ol_flags & PKT_TX_VLAN_PKT)) {
+			error = rte_vlan_insert(&txm);
+			if (unlikely(error)) {
+				rte_pktmbuf_free(txm);
+				continue;
+			}
+		}
+
+		/* optimize ring usage */
+		if ((vtpci_with_feature(hw, VIRTIO_F_ANY_LAYOUT) ||
+		      vtpci_with_feature(hw, VIRTIO_F_VERSION_1)) &&
+		    rte_mbuf_refcnt_read(txm) == 1 &&
+		    RTE_MBUF_DIRECT(txm) &&
+		    txm->nb_segs == 1 &&
+		    rte_pktmbuf_headroom(txm) >= hdr_size &&
+		    rte_is_aligned(rte_pktmbuf_mtod(txm, char *),
+				   __alignof__(struct virtio_net_hdr_mrg_rxbuf)))
+			can_push = 1;
+		else if (vtpci_with_feature(hw, VIRTIO_RING_F_INDIRECT_DESC) &&
+			 txm->nb_segs < VIRTIO_MAX_TX_INDIRECT)
+			use_indirect = 1;
+
+		/* How many main ring entries are needed to this Tx?
+		 * any_layout => number of segments
+		 * indirect   => 1
+		 * default    => number of segments + 1
+		 */
+		slots = use_indirect ? 1 : (txm->nb_segs + !can_push);
+		need = slots - vq->vq_free_cnt;
+
+		/* Positive value indicates it need free vring descriptors */
+		if (unlikely(need > 0)) {
+			virtio_rmb();
+			need = RTE_MIN(need, (int)nb_pkts);
+
+			virtio_xmit_cleanup_packed(vq);
+			need = slots - vq->vq_free_cnt;
+			if (unlikely(need > 0)) {
+				PMD_TX_LOG(ERR,
+					   "No free tx descriptors to transmit");
+				break;
+			}
+		}
+
+		/* Enqueue Packet buffers */
+		virtqueue_enqueue_xmit_packed(txvq, txm, slots, use_indirect,
+			can_push, 0);
+
+		txvq->stats.bytes += txm->pkt_len;
+		virtio_update_packet_stats(&txvq->stats, txm);
+	}
+
+	txvq->stats.packets += nb_tx;
+
+	if (likely(nb_tx)) {
+		if (unlikely(virtqueue_kick_prepare_packed(vq))) {
+			virtqueue_notify(vq);
+			PMD_TX_LOG(DEBUG, "Notified backend after xmit");
+		}
+	}
+
+	return nb_tx;
+}
+
 uint16_t
 virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
 {
diff --git a/drivers/net/virtio/virtqueue.h b/drivers/net/virtio/virtqueue.h
index eb220563f..ffa2d8f92 100644
--- a/drivers/net/virtio/virtqueue.h
+++ b/drivers/net/virtio/virtqueue.h
@@ -241,8 +241,12 @@  struct virtio_net_hdr_mrg_rxbuf {
 #define VIRTIO_MAX_TX_INDIRECT 8
 struct virtio_tx_region {
 	struct virtio_net_hdr_mrg_rxbuf tx_hdr;
-	struct vring_desc tx_indir[VIRTIO_MAX_TX_INDIRECT]
-			   __attribute__((__aligned__(16)));
+	union {
+		struct vring_desc tx_indir[VIRTIO_MAX_TX_INDIRECT]
+			__attribute__((__aligned__(16)));
+		struct vring_desc_packed tx_indir_pq[VIRTIO_MAX_TX_INDIRECT]
+			__attribute__((__aligned__(16)));
+	};
 };
 
 static inline uint16_t
@@ -360,6 +364,15 @@  virtqueue_kick_prepare(struct virtqueue *vq)
 	return !(vq->vq_ring.used->flags & VRING_USED_F_NO_NOTIFY);
 }
 
+static inline int
+virtqueue_kick_prepare_packed(struct virtqueue *vq)
+{
+	uint16_t flags;
+
+	flags = vq->vq_ring.device_event->desc_event_flags & RING_EVENT_FLAGS_DESC;
+	return (flags != RING_EVENT_FLAGS_DISABLE);
+}
+
 static inline void
 virtqueue_notify(struct virtqueue *vq)
 {