[dpdk-dev,RFC,v2] virtio ring layout optimization and vectorization.

Message ID 1443117780-4771-1-git-send-email-huawei.xie@intel.com
State Superseded, archived
Headers show

Commit Message

Huawei Xie Sept. 24, 2015, 6:03 p.m.
This single patch is for people to get familiar with the optimization and is for collecting feedbacks.
It isn't splitted because it is straightforward.
Haven't finished the cleanups.
The description and illustration of the idea is in a previous mail titled "virtio optimization idea".

v2 changes:
adapt to new rte_mbuf format
fixes pkt_mb[6] and [7] pkt_len issue 

Signed-off-by: Huawei Xie <huawei.xie@intel.com>
---
 config/common_linuxapp                  |   1 +
 drivers/net/virtio/Makefile             |   2 +-
 drivers/net/virtio/virtio_ethdev.c      |  19 ++
 drivers/net/virtio/virtio_ethdev.h      |   5 +
 drivers/net/virtio/virtio_rxtx.c        |  62 ++++-
 drivers/net/virtio/virtio_rxtx.h        |  41 ++++
 drivers/net/virtio/virtio_rxtx_simple.c | 386 ++++++++++++++++++++++++++++++++
 drivers/net/virtio/virtqueue.h          |   5 +
 8 files changed, 519 insertions(+), 2 deletions(-)
 create mode 100644 drivers/net/virtio/virtio_rxtx.h
 create mode 100644 drivers/net/virtio/virtio_rxtx_simple.c

Patch

diff --git a/config/common_linuxapp b/config/common_linuxapp
index 0de43d5..b597617 100644
--- a/config/common_linuxapp
+++ b/config/common_linuxapp
@@ -241,6 +241,7 @@  CONFIG_RTE_LIBRTE_ENIC_DEBUG=n
 # Compile burst-oriented VIRTIO PMD driver
 #
 CONFIG_RTE_LIBRTE_VIRTIO_PMD=y
+CONFIG_RTE_VIRTIO_SIMPLE=n
 CONFIG_RTE_LIBRTE_VIRTIO_DEBUG_INIT=n
 CONFIG_RTE_LIBRTE_VIRTIO_DEBUG_RX=n
 CONFIG_RTE_LIBRTE_VIRTIO_DEBUG_TX=n
diff --git a/drivers/net/virtio/Makefile b/drivers/net/virtio/Makefile
index 930b60f..89a8a37 100644
--- a/drivers/net/virtio/Makefile
+++ b/drivers/net/virtio/Makefile
@@ -50,7 +50,7 @@  SRCS-$(CONFIG_RTE_LIBRTE_VIRTIO_PMD) += virtqueue.c
 SRCS-$(CONFIG_RTE_LIBRTE_VIRTIO_PMD) += virtio_pci.c
 SRCS-$(CONFIG_RTE_LIBRTE_VIRTIO_PMD) += virtio_rxtx.c
 SRCS-$(CONFIG_RTE_LIBRTE_VIRTIO_PMD) += virtio_ethdev.c
-
+SRCS-$(CONFIG_RTE_VIRTIO_SIMPLE)     += virtio_rxtx_simple.c
 
 # this lib depends upon:
 DEPDIRS-$(CONFIG_RTE_LIBRTE_VIRTIO_PMD) += lib/librte_eal lib/librte_ether
diff --git a/drivers/net/virtio/virtio_ethdev.c b/drivers/net/virtio/virtio_ethdev.c
index 465d3cd..9cd8a8b 100644
--- a/drivers/net/virtio/virtio_ethdev.c
+++ b/drivers/net/virtio/virtio_ethdev.c
@@ -61,6 +61,7 @@ 
 #include "virtio_pci.h"
 #include "virtio_logs.h"
 #include "virtqueue.h"
+#include "virtio_rxtx.h"
 
 
 static int eth_virtio_dev_init(struct rte_eth_dev *eth_dev);
@@ -246,6 +247,9 @@  virtio_dev_queue_release(struct virtqueue *vq) {
 		VIRTIO_WRITE_REG_2(hw, VIRTIO_PCI_QUEUE_SEL, vq->queue_id);
 		VIRTIO_WRITE_REG_4(hw, VIRTIO_PCI_QUEUE_PFN, 0);
 
+		if (vq->sw_ring)
+			rte_free(vq->sw_ring);
+
 		rte_free(vq);
 		vq = NULL;
 	}
@@ -291,6 +295,9 @@  int virtio_dev_queue_setup(struct rte_eth_dev *dev,
 			dev->data->port_id, queue_idx);
 		vq = rte_zmalloc(vq_name, sizeof(struct virtqueue) +
 			vq_size * sizeof(struct vq_desc_extra), RTE_CACHE_LINE_SIZE);
+		vq->sw_ring = rte_zmalloc_socket("rxq->sw_ring",
+			(RTE_PMD_VIRTIO_RX_MAX_BURST + vq_size) * sizeof(vq->sw_ring[0]),
+			RTE_CACHE_LINE_SIZE, socket_id);
 	} else if (queue_type == VTNET_TQ) {
 		snprintf(vq_name, sizeof(vq_name), "port%d_tvq%d",
 			dev->data->port_id, queue_idx);
@@ -307,6 +314,10 @@  int virtio_dev_queue_setup(struct rte_eth_dev *dev,
 		PMD_INIT_LOG(ERR, "%s: Can not allocate virtqueue", __func__);
 		return (-ENOMEM);
 	}
+	if (queue_type == VTNET_RQ && vq->sw_ring == NULL) {
+		PMD_INIT_LOG(ERR, "%s: Can not allocate RX soft ring", __func__);
+		return -ENOMEM;
+	}
 
 	vq->hw = hw;
 	vq->port_id = dev->data->port_id;
@@ -1150,6 +1161,14 @@  rx_func_get(struct rte_eth_dev *eth_dev)
 		eth_dev->rx_pkt_burst = &virtio_recv_mergeable_pkts;
 	else
 		eth_dev->rx_pkt_burst = &virtio_recv_pkts;
+
+#ifdef RTE_VIRTIO_SIMPLE
+	if (!vtpci_with_feature(hw, VIRTIO_NET_F_MRG_RXBUF)) {
+		printf("use simple rxtx\n");
+		eth_dev->rx_pkt_burst = &virtio_recv_pkts_vec;
+		eth_dev->tx_pkt_burst = &virtio_xmit_pkts_simple;
+	}
+#endif
 }
 
 /*
diff --git a/drivers/net/virtio/virtio_ethdev.h b/drivers/net/virtio/virtio_ethdev.h
index 9026d42..fa11afa 100644
--- a/drivers/net/virtio/virtio_ethdev.h
+++ b/drivers/net/virtio/virtio_ethdev.h
@@ -108,6 +108,11 @@  uint16_t virtio_recv_mergeable_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
 uint16_t virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts,
 		uint16_t nb_pkts);
 
+uint16_t virtio_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts,
+                uint16_t nb_pkts);
+
+uint16_t virtio_xmit_pkts_simple(void *tx_queue, struct rte_mbuf **tx_pkts,
+                uint16_t nb_pkts);
 
 /*
  * The VIRTIO_NET_F_GUEST_TSO[46] features permit the host to send us
diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c
index c5b53bb..0c729b3 100644
--- a/drivers/net/virtio/virtio_rxtx.c
+++ b/drivers/net/virtio/virtio_rxtx.c
@@ -54,6 +54,7 @@ 
 #include "virtio_logs.h"
 #include "virtio_ethdev.h"
 #include "virtqueue.h"
+#include "virtio_rxtx.h"
 
 #ifdef RTE_LIBRTE_VIRTIO_DEBUG_DUMP
 #define VIRTIO_DUMP_PACKET(m, len) rte_pktmbuf_dump(stdout, m, len)
@@ -298,6 +299,18 @@  virtio_dev_vring_start(struct virtqueue *vq, int queue_type)
 		/* Allocate blank mbufs for the each rx descriptor */
 		nbufs = 0;
 		error = ENOSPC;
+
+#ifdef RTE_VIRTIO_SIMPLE
+		for (i = 0; i < vq->vq_nentries; i++) {
+			vq->vq_ring.avail->ring[i] = i;
+			vq->vq_ring.desc[i].flags = VRING_DESC_F_WRITE;
+		}
+
+		memset(&vq->fake_mbuf, 0, sizeof(vq->fake_mbuf));
+		for (i = 0; i < RTE_PMD_VIRTIO_RX_MAX_BURST; i++)
+			vq->sw_ring[vq->vq_nentries + i] = &vq->fake_mbuf;
+#endif
+
 		while (!virtqueue_full(vq)) {
 			m = rte_rxmbuf_alloc(vq->mpool);
 			if (m == NULL)
@@ -306,8 +319,11 @@  virtio_dev_vring_start(struct virtqueue *vq, int queue_type)
 			/******************************************
 			*         Enqueue allocated buffers        *
 			*******************************************/
+#ifdef RTE_VIRTIO_SIMPLE
+			error = virtqueue_enqueue_recv_refill_simple(vq, m);
+#else
 			error = virtqueue_enqueue_recv_refill(vq, m);
-
+#endif
 			if (error) {
 				rte_pktmbuf_free(m);
 				break;
@@ -324,6 +340,23 @@  virtio_dev_vring_start(struct virtqueue *vq, int queue_type)
 		VIRTIO_WRITE_REG_4(vq->hw, VIRTIO_PCI_QUEUE_PFN,
 			vq->mz->phys_addr >> VIRTIO_PCI_QUEUE_ADDR_SHIFT);
 	} else if (queue_type == VTNET_TQ) {
+#ifdef RTE_VIRTIO_SIMPLE
+		int mid_idx  = vq->vq_nentries >> 1;
+		for (i = 0; i < mid_idx; i++) {
+			vq->vq_ring.avail->ring[i] = i + mid_idx;
+			vq->vq_ring.desc[i + mid_idx].next = i;
+			vq->vq_ring.desc[i + mid_idx].addr =
+				vq->virtio_net_hdr_mem +
+					mid_idx * vq->hw->vtnet_hdr_size;
+			vq->vq_ring.desc[i + mid_idx].len =
+				vq->hw->vtnet_hdr_size;
+			vq->vq_ring.desc[i + mid_idx].flags = VRING_DESC_F_NEXT;
+			vq->vq_ring.desc[i].flags = 0;
+		}
+		for (i = mid_idx; i < vq->vq_nentries; i++)
+                        vq->vq_ring.avail->ring[i] = i;
+#endif
+
 		VIRTIO_WRITE_REG_2(vq->hw, VIRTIO_PCI_QUEUE_SEL,
 			vq->vq_queue_index);
 		VIRTIO_WRITE_REG_4(vq->hw, VIRTIO_PCI_QUEUE_PFN,
@@ -398,6 +431,9 @@  virtio_dev_rx_queue_setup(struct rte_eth_dev *dev,
 	vq->mpool = mp;
 
 	dev->data->rx_queues[queue_idx] = vq;
+
+	virtio_rxq_vec_setup(vq);
+
 	return 0;
 }
 
@@ -825,3 +861,27 @@  virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
 
 	return nb_tx;
 }
+
+uint16_t __attribute__((weak))
+virtio_recv_pkts_vec(
+	void __rte_unused *rx_queue,
+	struct rte_mbuf __rte_unused **rx_pkts,
+	uint16_t __rte_unused nb_pkts)
+{
+	return 0;
+}
+
+uint16_t __attribute__((weak))
+virtio_xmit_pkts_simple(
+	void __rte_unused *tx_queue,
+	struct rte_mbuf __rte_unused **tx_pkts,
+	uint16_t __rte_unused nb_pkts)
+{
+	return 0;
+}
+
+int __attribute__((weak))
+virtio_rxq_vec_setup(struct virtqueue __rte_unused *rxq)
+{
+	return -1;
+}
diff --git a/drivers/net/virtio/virtio_rxtx.h b/drivers/net/virtio/virtio_rxtx.h
new file mode 100644
index 0000000..8bd8260
--- /dev/null
+++ b/drivers/net/virtio/virtio_rxtx.h
@@ -0,0 +1,41 @@ 
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2010-2015 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#define RTE_PMD_VIRTIO_RX_MAX_BURST 64
+
+int virtio_rxq_vec_setup(struct virtqueue *rxq);
+
+#ifdef RTE_VIRTIO_SIMPLE
+int virtqueue_enqueue_recv_refill_simple(struct virtqueue *vq,
+	struct rte_mbuf *m);
+#endif
diff --git a/drivers/net/virtio/virtio_rxtx_simple.c b/drivers/net/virtio/virtio_rxtx_simple.c
new file mode 100644
index 0000000..4b45594
--- /dev/null
+++ b/drivers/net/virtio/virtio_rxtx_simple.c
@@ -0,0 +1,386 @@ 
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2010-2015 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+
+#include <tmmintrin.h>
+
+#include <rte_cycles.h>
+#include <rte_memory.h>
+#include <rte_memzone.h>
+#include <rte_branch_prediction.h>
+#include <rte_mempool.h>
+#include <rte_malloc.h>
+#include <rte_mbuf.h>
+#include <rte_ether.h>
+#include <rte_ethdev.h>
+#include <rte_prefetch.h>
+#include <rte_string_fns.h>
+#include <rte_errno.h>
+#include <rte_byteorder.h>
+
+#include "virtio_logs.h"
+#include "virtio_ethdev.h"
+#include "virtqueue.h"
+#include "virtio_rxtx.h"
+
+#define RTE_VIRTIO_VPMD_RX_BURST 32
+#define RTE_VIRTIO_DESC_PER_LOOP 8
+#define RTE_VIRTIO_VPMD_RX_REARM_THRESH RTE_VIRTIO_VPMD_RX_BURST
+
+int __attribute__((cold))
+virtqueue_enqueue_recv_refill_simple(struct virtqueue *vq,
+	struct rte_mbuf *cookie)
+{
+	struct vq_desc_extra *dxp;
+	struct vring_desc *start_dp;
+	uint16_t desc_idx;
+
+	desc_idx = vq->vq_avail_idx & (vq->vq_nentries - 1);
+	dxp = &vq->vq_descx[desc_idx];
+	dxp->cookie = (void *)cookie;
+	vq->sw_ring[desc_idx] = cookie;
+
+	start_dp = vq->vq_ring.desc;
+	start_dp[desc_idx].addr = (uint64_t)((uintptr_t)cookie->buf_physaddr +
+		RTE_PKTMBUF_HEADROOM - sizeof(struct virtio_net_hdr));
+	start_dp[desc_idx].len = cookie->buf_len -
+		RTE_PKTMBUF_HEADROOM + sizeof(struct virtio_net_hdr);
+
+	vq->vq_free_cnt--;
+	vq->vq_avail_idx++;
+
+	return 0;
+}
+
+static void
+virtio_rxq_rearm_vec(struct virtqueue *rxvq)
+{
+	int i;
+	uint16_t desc_idx;
+	struct rte_mbuf **sw_ring;
+	struct vring_desc *start_dp;
+	int ret;
+
+	desc_idx = rxvq->vq_avail_idx & (rxvq->vq_nentries - 1);
+	sw_ring = &rxvq->sw_ring[desc_idx];
+	start_dp = &rxvq->vq_ring.desc[desc_idx];
+
+	ret = rte_mempool_get_bulk(rxvq->mpool, (void **)sw_ring,
+		RTE_VIRTIO_VPMD_RX_REARM_THRESH);
+	if (unlikely(ret))
+		return;
+
+	for (i = 0; i < RTE_VIRTIO_VPMD_RX_REARM_THRESH; i++) {
+		uintptr_t p;
+		p = (uintptr_t)&sw_ring[i]->rearm_data;
+		*(uint64_t *)p = rxvq->mbuf_initializer;
+		start_dp[i].addr =
+			(uint64_t)((uintptr_t)sw_ring[i]->buf_physaddr +
+			RTE_PKTMBUF_HEADROOM - sizeof(struct virtio_net_hdr));
+		start_dp[i].len = sw_ring[i]->buf_len -
+			RTE_PKTMBUF_HEADROOM + sizeof(struct virtio_net_hdr);
+	}
+
+	rxvq->vq_avail_idx += RTE_VIRTIO_VPMD_RX_REARM_THRESH;
+	rxvq->vq_free_cnt -= RTE_VIRTIO_VPMD_RX_REARM_THRESH;
+	vq_update_avail_idx(rxvq);
+}
+
+uint16_t
+virtio_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts,
+	uint16_t nb_pkts)
+{
+	struct virtqueue *rxvq = rx_queue;
+	uint16_t nb_used;
+
+	uint16_t desc_idx;
+	struct vring_used_elem *rused;
+	struct rte_mbuf **sw_ring;
+	struct rte_mbuf **sw_ring_end;
+	uint16_t nb_pkts_received;
+
+	__m128i shuf_msk1, shuf_msk2, len_adjust;
+
+	shuf_msk1 = _mm_set_epi8(
+		0xFF, 0xFF, 0xFF, 0xFF,
+		0xFF, 0xFF,		/* vlan tci */
+		5, 4,			/* dat len */
+		0xFF, 0xFF, 5, 4,	/* pkt len */
+		0xFF, 0xFF, 0xFF, 0xFF	/* packet type */
+
+	);
+
+	shuf_msk2 = _mm_set_epi8(
+		0xFF, 0xFF, 0xFF, 0xFF,
+		0xFF, 0xFF,		/* vlan tci */
+		13, 12,			/* dat len */
+		0xFF, 0xFF, 13, 12,	/* pkt len */
+		0xFF, 0xFF, 0xFF, 0xFF	/* packet type */
+	);
+
+	len_adjust = _mm_set_epi16(
+		0, 0,
+		0,
+		(uint16_t) -sizeof(struct virtio_net_hdr),
+		0, (uint16_t) -sizeof(struct virtio_net_hdr),
+		0, 0);
+
+	if (unlikely(nb_pkts < RTE_VIRTIO_VPMD_RX_BURST))
+		return 0;
+
+	nb_used = *(volatile uint16_t *)&rxvq->vq_ring.used->idx -
+		rxvq->vq_used_cons_idx;
+
+	rte_compiler_barrier();
+
+	if (unlikely(nb_used == 0))
+		return 0;
+
+	nb_used = likely(nb_used < RTE_VIRTIO_VPMD_RX_BURST) ?
+			nb_used : RTE_VIRTIO_VPMD_RX_BURST;
+
+	desc_idx = (uint16_t)(rxvq->vq_used_cons_idx & (rxvq->vq_nentries - 1));
+	rused = &rxvq->vq_ring.used->ring[desc_idx];
+	sw_ring  = &rxvq->sw_ring[desc_idx];
+	sw_ring_end = &rxvq->sw_ring[rxvq->vq_nentries];
+
+	_mm_prefetch((const void *)rused, _MM_HINT_T0);
+
+	if (rxvq->vq_free_cnt >= RTE_VIRTIO_VPMD_RX_REARM_THRESH) {
+		virtio_rxq_rearm_vec(rxvq);
+		if (unlikely(virtqueue_kick_prepare(rxvq)))
+			virtqueue_notify(rxvq);
+	}
+
+	for (nb_pkts_received = 0;
+		nb_pkts_received < RTE_VIRTIO_VPMD_RX_BURST;) {
+		__m128i desc[RTE_VIRTIO_DESC_PER_LOOP / 2];
+		__m128i mbp[RTE_VIRTIO_DESC_PER_LOOP / 2];
+		__m128i pkt_mb[RTE_VIRTIO_DESC_PER_LOOP];
+
+		mbp[0] = _mm_loadu_si128((__m128i *)(sw_ring + 0));
+		desc[0] = _mm_loadu_si128((__m128i *)(rused + 0));
+		_mm_storeu_si128((__m128i *)&rx_pkts[0], mbp[0]);
+
+		mbp[1] = _mm_loadu_si128((__m128i *)(sw_ring + 2));
+		desc[1] = _mm_loadu_si128((__m128i *)(rused + 2));
+		_mm_storeu_si128((__m128i *)&rx_pkts[2], mbp[1]);
+
+		mbp[2] = _mm_loadu_si128((__m128i *)(sw_ring + 4));
+		desc[2] = _mm_loadu_si128((__m128i *)(rused + 4));
+		_mm_storeu_si128((__m128i *)&rx_pkts[4], mbp[2]);
+
+		mbp[3] = _mm_loadu_si128((__m128i *)(sw_ring + 6));
+		desc[3] = _mm_loadu_si128((__m128i *)(rused + 6));
+		_mm_storeu_si128((__m128i *)&rx_pkts[6], mbp[3]);
+
+		pkt_mb[1] = _mm_shuffle_epi8(desc[0], shuf_msk2);
+		pkt_mb[0] = _mm_shuffle_epi8(desc[0], shuf_msk1);
+		pkt_mb[1] = _mm_add_epi16(pkt_mb[1], len_adjust);
+		pkt_mb[0] = _mm_add_epi16(pkt_mb[0], len_adjust);
+		_mm_storeu_si128((void *)&rx_pkts[1]->rx_descriptor_fields1,
+			pkt_mb[1]);
+		_mm_storeu_si128((void *)&rx_pkts[0]->rx_descriptor_fields1,
+			pkt_mb[0]);
+
+		pkt_mb[3] = _mm_shuffle_epi8(desc[1], shuf_msk2);
+		pkt_mb[2] = _mm_shuffle_epi8(desc[1], shuf_msk1);
+		pkt_mb[3] = _mm_add_epi16(pkt_mb[3], len_adjust);
+		pkt_mb[2] = _mm_add_epi16(pkt_mb[2], len_adjust);
+		_mm_storeu_si128((void *)&rx_pkts[3]->rx_descriptor_fields1,
+			pkt_mb[3]);
+		_mm_storeu_si128((void *)&rx_pkts[2]->rx_descriptor_fields1,
+			pkt_mb[2]);
+
+		pkt_mb[5] = _mm_shuffle_epi8(desc[2], shuf_msk2);
+		pkt_mb[4] = _mm_shuffle_epi8(desc[2], shuf_msk1);
+		pkt_mb[5] = _mm_add_epi16(pkt_mb[5], len_adjust);
+		pkt_mb[4] = _mm_add_epi16(pkt_mb[4], len_adjust);
+		_mm_storeu_si128((void *)&rx_pkts[5]->rx_descriptor_fields1,
+			pkt_mb[5]);
+		_mm_storeu_si128((void *)&rx_pkts[4]->rx_descriptor_fields1,
+			pkt_mb[4]);
+
+		pkt_mb[7] = _mm_shuffle_epi8(desc[3], shuf_msk2);
+		pkt_mb[6] = _mm_shuffle_epi8(desc[3], shuf_msk1);
+		pkt_mb[7] = _mm_add_epi16(pkt_mb[7], len_adjust);
+		pkt_mb[6] = _mm_add_epi16(pkt_mb[6], len_adjust);
+		_mm_storeu_si128((void *)&rx_pkts[7]->rx_descriptor_fields1,
+			pkt_mb[7]);
+		_mm_storeu_si128((void *)&rx_pkts[6]->rx_descriptor_fields1,
+			pkt_mb[6]);
+
+		if (unlikely(nb_used <= RTE_VIRTIO_DESC_PER_LOOP)) {
+			if (sw_ring + nb_used <= sw_ring_end)
+				nb_pkts_received += nb_used;
+			else
+				nb_pkts_received += sw_ring_end - sw_ring;
+			break;
+		} else {
+			if (unlikely(sw_ring + RTE_VIRTIO_DESC_PER_LOOP >=
+				sw_ring_end)) {
+				nb_pkts_received += sw_ring_end - sw_ring;
+				break;
+			} else {
+				nb_pkts_received += RTE_VIRTIO_DESC_PER_LOOP;
+
+				rx_pkts += RTE_VIRTIO_DESC_PER_LOOP;
+				sw_ring += RTE_VIRTIO_DESC_PER_LOOP;
+				rused   += RTE_VIRTIO_DESC_PER_LOOP;
+				nb_used -= RTE_VIRTIO_DESC_PER_LOOP;
+			}
+		}
+	}
+	rxvq->vq_used_cons_idx += nb_pkts_received;
+	rxvq->vq_free_cnt += nb_pkts_received;
+	rxvq->packets += nb_pkts_received;
+	return nb_pkts_received;
+}
+
+#define VIRTIO_TX_FREE_THRESH 32
+#define VIRTIO_TX_MAX_FREE_BUF_SZ 32
+#define VIRTIO_TX_FREE_NR 32
+/* vq->tx_free_cnt could mean num of free slots so we could avoid shift */
+static inline void __attribute__((always_inline))
+virtio_xmit_cleanup(struct virtqueue *vq)
+{
+	uint16_t i, desc_idx;
+	int nb_free = 0;
+	struct rte_mbuf *m, *free[VIRTIO_TX_MAX_FREE_BUF_SZ];
+
+	desc_idx = (uint16_t)(vq->vq_used_cons_idx &
+		((vq->vq_nentries >> 1) - 1));
+	free[0] = (struct rte_mbuf *)vq->vq_descx[desc_idx++].cookie;
+	nb_free = 1;
+
+	for (i = 1; i < VIRTIO_TX_FREE_NR; i++) {
+		m = (struct rte_mbuf *)vq->vq_descx[desc_idx++].cookie;
+		if (likely(m->pool == free[0]->pool))
+			free[nb_free++] = m;
+		else {
+			rte_mempool_put_bulk(free[0]->pool, (void **)free,
+				nb_free);
+			free[0] = m;
+			nb_free = 1;
+		}
+	}
+
+	rte_mempool_put_bulk(free[0]->pool, (void **)free, nb_free);
+	vq->vq_used_cons_idx += VIRTIO_TX_FREE_NR;
+	vq->vq_free_cnt += (VIRTIO_TX_FREE_NR << 1);
+
+	return;
+}
+
+uint16_t
+virtio_xmit_pkts_simple(void *tx_queue, struct rte_mbuf **tx_pkts,
+	uint16_t nb_pkts)
+{
+	struct virtqueue *txvq = tx_queue;
+	uint16_t nb_used;
+	uint16_t desc_idx;
+	struct vring_desc *start_dp;
+	uint16_t n, nb_commit;
+	int i;
+	uint16_t desc_idx_max = (txvq->vq_nentries >> 1) - 1;
+
+	nb_used = VIRTQUEUE_NUSED(txvq);
+	rte_compiler_barrier();
+
+	nb_commit = nb_pkts = RTE_MIN((txvq->vq_free_cnt >> 1), nb_pkts);
+	desc_idx = (uint16_t) (txvq->vq_avail_idx & desc_idx_max);
+	start_dp = txvq->vq_ring.desc;
+	n = (uint16_t) (desc_idx_max + 1 - desc_idx);
+
+	if (nb_used >= VIRTIO_TX_FREE_THRESH)
+		virtio_xmit_cleanup(tx_queue);
+
+	if (nb_commit >= n) {
+		for (i = 0; i < n; i++)
+			txvq->vq_descx[desc_idx + i].cookie = tx_pkts[i];
+		for (i = 0; i < n; i++) {
+			start_dp[desc_idx].addr =
+				RTE_MBUF_DATA_DMA_ADDR(*tx_pkts);
+			start_dp[desc_idx].len = (*tx_pkts)->pkt_len;
+			tx_pkts++;
+			desc_idx++;
+		}
+		nb_commit -= n;
+		desc_idx = 0;
+	}
+	for (i = 0; i < nb_commit; i++)
+		txvq->vq_descx[desc_idx + i].cookie = tx_pkts[i];
+	for (i = 0; i < nb_commit; i++) {
+		start_dp[desc_idx].addr = RTE_MBUF_DATA_DMA_ADDR(*tx_pkts);
+		start_dp[desc_idx].len = (*tx_pkts)->pkt_len;
+		tx_pkts++;
+		desc_idx++;
+	}
+
+	rte_compiler_barrier();
+
+	txvq->vq_free_cnt -= (uint16_t)(nb_pkts << 1);
+	txvq->vq_avail_idx += nb_pkts;
+	txvq->vq_ring.avail->idx = txvq->vq_avail_idx;
+
+	if (likely(nb_pkts)) {
+		if (unlikely(virtqueue_kick_prepare(txvq)))
+			virtqueue_notify(txvq);
+	}
+
+	return nb_pkts;
+}
+
+int __attribute__((cold))
+virtio_rxq_vec_setup(struct virtqueue *rxq)
+{
+	uintptr_t p;
+	struct rte_mbuf mb_def = { .buf_addr = 0 }; /* zeroed mbuf */
+
+	mb_def.nb_segs = 1;
+	mb_def.data_off = RTE_PKTMBUF_HEADROOM;
+	mb_def.port = rxq->port_id;
+	rte_mbuf_refcnt_set(&mb_def, 1);
+
+	/* prevent compiler reordering: rearm_data covers previous fields */
+	rte_compiler_barrier();
+	p = (uintptr_t)&mb_def.rearm_data;
+	rxq->mbuf_initializer = *(uint64_t *)p;
+
+	return 0;
+}
diff --git a/drivers/net/virtio/virtqueue.h b/drivers/net/virtio/virtqueue.h
index 7789411..22b1f87 100644
--- a/drivers/net/virtio/virtqueue.h
+++ b/drivers/net/virtio/virtqueue.h
@@ -188,6 +188,7 @@  struct virtqueue {
 	 */
 	uint16_t vq_used_cons_idx;
 	uint16_t vq_avail_idx;
+	uint64_t mbuf_initializer; /**< value to init mbufs. */
 	phys_addr_t virtio_net_hdr_mem; /**< hdr for each xmit packet */
 
 	/* Statistics */
@@ -195,6 +196,10 @@  struct virtqueue {
 	uint64_t	bytes;
 	uint64_t	errors;
 
+	struct rte_mbuf **sw_ring; /**< RX software ring. */
+	/* dummy mbuf, for wraparound when processing RX ring. */
+	struct rte_mbuf fake_mbuf;
+
 	struct vq_desc_extra {
 		void              *cookie;
 		uint16_t          ndescs;