[RFC,v2,2/2] net/vhost_dma: add vHost DMA driver
diff mbox series

Message ID 1572598450-245091-3-git-send-email-jiayu.hu@intel.com
State RFC, archived
Delegated to: Maxime Coquelin
Headers show
Series
  • Add a PMD for DMA-accelerated vhost-user
Related show

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/Intel-compilation success Compilation OK

Commit Message

Hu, Jiayu Nov. 1, 2019, 8:54 a.m. UTC
This patch introduces a new PMD for DMA accelerated vhost-user, which
provides basic functionality of packet reception and transmission. This
PMD leverages librte_vhost to handle vhost messages, but it implements
own vring's enqueue and dequeue operations.

The PMD leverages DMA engines (e.g., I/OAT, a DMA engine in Intel's
processor), to accelerate large data movements in enqueue and dequeue
operations. Large copies are offloaded to the DMA in an asynchronous mode.
That is, the CPU just submits copy jobs to the DMA but without waiting
for its completion; there is no CPU intervention during data transfer.
Thus, we can save precious CPU cycles and improve the overall performance
for vhost-user based applications, like OVS. The PMD still uses the CPU to
performs small copies, due to startup overheads associated with the DMA.

Note that the PMD is able to support various DMA engines to accelerate
data movements in enqueue and dequeue operations; currently the supported
DMA engine is I/OAT. The PMD just supports I/OAT acceleration in the
enqueue operation, and it still uses the CPU to perform all copies in
the dequeue operation. In addition, the PMD only supports the split ring.

The DMA device used by a queue is assigned by users; for the queue
without assigning a DMA device, the PMD will use the CPU to perform
all copies for both enqueue and dequeue operations. Currently, the PMD
just supports I/OAT, and a queue can only use one I/OAT device, and an
I/OAT device can only be used by one queue at a time.

The PMD has 4 parameters.
 - iface: The parameter is used to specify a path to connect to a
 	front end device.
 - queues: The parameter is used to specify the number of the queues
 	front end device has (Default is 1).
 - client: The parameter is used to specify the vhost port working as
 	client mode or server mode (Default is server mode).
 - dmas: This parameter is used to specify the assigned DMA device
 	of a queue.

Here is an example.
$ ./testpmd -c f -n 4 \
--vdev 'dma_vhost0,iface=/tmp/sock0,queues=1,dmas=txq0@00:04.0,client=0'

Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
---
 config/common_base                                 |    2 +
 config/common_linux                                |    1 +
 drivers/Makefile                                   |    2 +-
 drivers/net/Makefile                               |    1 +
 drivers/net/vhost_dma/Makefile                     |   31 +
 drivers/net/vhost_dma/eth_vhost.c                  | 1495 ++++++++++++++++++++
 drivers/net/vhost_dma/eth_vhost.h                  |  264 ++++
 drivers/net/vhost_dma/internal.h                   |  225 +++
 .../net/vhost_dma/rte_pmd_vhost_dma_version.map    |    4 +
 drivers/net/vhost_dma/virtio_net.c                 | 1234 ++++++++++++++++
 mk/rte.app.mk                                      |    1 +
 11 files changed, 3259 insertions(+), 1 deletion(-)
 create mode 100644 drivers/net/vhost_dma/Makefile
 create mode 100644 drivers/net/vhost_dma/eth_vhost.c
 create mode 100644 drivers/net/vhost_dma/eth_vhost.h
 create mode 100644 drivers/net/vhost_dma/internal.h
 create mode 100644 drivers/net/vhost_dma/rte_pmd_vhost_dma_version.map
 create mode 100644 drivers/net/vhost_dma/virtio_net.c

Comments

Maxime Coquelin Dec. 17, 2019, 8:27 a.m. UTC | #1
Hi Jiayu,

On 11/1/19 9:54 AM, Jiayu Hu wrote:
> This patch introduces a new PMD for DMA accelerated vhost-user, which
> provides basic functionality of packet reception and transmission. This
> PMD leverages librte_vhost to handle vhost messages, but it implements
> own vring's enqueue and dequeue operations.
> 
> The PMD leverages DMA engines (e.g., I/OAT, a DMA engine in Intel's
> processor), to accelerate large data movements in enqueue and dequeue
> operations. Large copies are offloaded to the DMA in an asynchronous mode.
> That is, the CPU just submits copy jobs to the DMA but without waiting
> for its completion; there is no CPU intervention during data transfer.
> Thus, we can save precious CPU cycles and improve the overall performance
> for vhost-user based applications, like OVS. The PMD still uses the CPU to
> performs small copies, due to startup overheads associated with the DMA.
> 
> Note that the PMD is able to support various DMA engines to accelerate
> data movements in enqueue and dequeue operations; currently the supported
> DMA engine is I/OAT. The PMD just supports I/OAT acceleration in the
> enqueue operation, and it still uses the CPU to perform all copies in
> the dequeue operation. In addition, the PMD only supports the split ring.
> 
> The DMA device used by a queue is assigned by users; for the queue
> without assigning a DMA device, the PMD will use the CPU to perform
> all copies for both enqueue and dequeue operations. Currently, the PMD
> just supports I/OAT, and a queue can only use one I/OAT device, and an
> I/OAT device can only be used by one queue at a time.
> 
> The PMD has 4 parameters.
>  - iface: The parameter is used to specify a path to connect to a
>  	front end device.
>  - queues: The parameter is used to specify the number of the queues
>  	front end device has (Default is 1).
>  - client: The parameter is used to specify the vhost port working as
>  	client mode or server mode (Default is server mode).
>  - dmas: This parameter is used to specify the assigned DMA device
>  	of a queue.
> 
> Here is an example.
> $ ./testpmd -c f -n 4 \
> --vdev 'dma_vhost0,iface=/tmp/sock0,queues=1,dmas=txq0@00:04.0,client=0'

dma_vhost0 is not a good name, you have to mention it is net specific.

Is there a tool to list available DMA engines?

> 
> Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
> ---
>  config/common_base                                 |    2 +
>  config/common_linux                                |    1 +
>  drivers/Makefile                                   |    2 +-
>  drivers/net/Makefile                               |    1 +
>  drivers/net/vhost_dma/Makefile                     |   31 +
>  drivers/net/vhost_dma/eth_vhost.c                  | 1495 ++++++++++++++++++++
>  drivers/net/vhost_dma/eth_vhost.h                  |  264 ++++
>  drivers/net/vhost_dma/internal.h                   |  225 +++
>  .../net/vhost_dma/rte_pmd_vhost_dma_version.map    |    4 +
>  drivers/net/vhost_dma/virtio_net.c                 | 1234 ++++++++++++++++
>  mk/rte.app.mk                                      |    1 +
>  11 files changed, 3259 insertions(+), 1 deletion(-)
>  create mode 100644 drivers/net/vhost_dma/Makefile
>  create mode 100644 drivers/net/vhost_dma/eth_vhost.c
>  create mode 100644 drivers/net/vhost_dma/eth_vhost.h
>  create mode 100644 drivers/net/vhost_dma/internal.h
>  create mode 100644 drivers/net/vhost_dma/rte_pmd_vhost_dma_version.map
>  create mode 100644 drivers/net/vhost_dma/virtio_net.c

You need to add Meson support.


More generally, I have been through the series and I'm not sure having a
dedicated PMD driver for this is a good idea due to all the code
duplication it implies.

I understand it has been done this way to avoid impacting the pure SW
datapath implementation. But I'm sure the series could be reduced to a
few hundred of lines if it was integrated in vhost-user library.
Moreover, your series does not support packed ring, so it means even
more code would need to be duplicated in the end.

What do you think?

Thanks,
Maxime
Maxime Coquelin Dec. 17, 2019, 10:20 a.m. UTC | #2
On 12/17/19 9:27 AM, Maxime Coquelin wrote:
> Hi Jiayu,
> 
> On 11/1/19 9:54 AM, Jiayu Hu wrote:
>> This patch introduces a new PMD for DMA accelerated vhost-user, which
>> provides basic functionality of packet reception and transmission. This
>> PMD leverages librte_vhost to handle vhost messages, but it implements
>> own vring's enqueue and dequeue operations.
>>
>> The PMD leverages DMA engines (e.g., I/OAT, a DMA engine in Intel's
>> processor), to accelerate large data movements in enqueue and dequeue
>> operations. Large copies are offloaded to the DMA in an asynchronous mode.
>> That is, the CPU just submits copy jobs to the DMA but without waiting
>> for its completion; there is no CPU intervention during data transfer.
>> Thus, we can save precious CPU cycles and improve the overall performance
>> for vhost-user based applications, like OVS. The PMD still uses the CPU to
>> performs small copies, due to startup overheads associated with the DMA.
>>
>> Note that the PMD is able to support various DMA engines to accelerate
>> data movements in enqueue and dequeue operations; currently the supported
>> DMA engine is I/OAT. The PMD just supports I/OAT acceleration in the
>> enqueue operation, and it still uses the CPU to perform all copies in
>> the dequeue operation. In addition, the PMD only supports the split ring.
>>
>> The DMA device used by a queue is assigned by users; for the queue
>> without assigning a DMA device, the PMD will use the CPU to perform
>> all copies for both enqueue and dequeue operations. Currently, the PMD
>> just supports I/OAT, and a queue can only use one I/OAT device, and an
>> I/OAT device can only be used by one queue at a time.
>>
>> The PMD has 4 parameters.
>>  - iface: The parameter is used to specify a path to connect to a
>>  	front end device.
>>  - queues: The parameter is used to specify the number of the queues
>>  	front end device has (Default is 1).
>>  - client: The parameter is used to specify the vhost port working as
>>  	client mode or server mode (Default is server mode).
>>  - dmas: This parameter is used to specify the assigned DMA device
>>  	of a queue.
>>
>> Here is an example.
>> $ ./testpmd -c f -n 4 \
>> --vdev 'dma_vhost0,iface=/tmp/sock0,queues=1,dmas=txq0@00:04.0,client=0'
> 
> dma_vhost0 is not a good name, you have to mention it is net specific.
> 
> Is there a tool to list available DMA engines?

Thinking at it again, wouldn't it be possible that the user doesn't
specify a specific DMA device ID, but instead allocate one device
at init time by specifying all the capabilities the DMA device need
to match?

If no DMA device available with matching capabilities, then fallback to
SW mode.

Also, I think we don't want to call directly IOAT API directly here, but
instead introduce a DMA library so that the Vhost DMA stuff isn't vendor
specific.

Adding a few ARM people in cc, to know whether they have plan/interrest
in supporting DMA acceleration for Vhost.

Regards,
Maxime

>>
>> Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
>> ---
>>  config/common_base                                 |    2 +
>>  config/common_linux                                |    1 +
>>  drivers/Makefile                                   |    2 +-
>>  drivers/net/Makefile                               |    1 +
>>  drivers/net/vhost_dma/Makefile                     |   31 +
>>  drivers/net/vhost_dma/eth_vhost.c                  | 1495 ++++++++++++++++++++
>>  drivers/net/vhost_dma/eth_vhost.h                  |  264 ++++
>>  drivers/net/vhost_dma/internal.h                   |  225 +++
>>  .../net/vhost_dma/rte_pmd_vhost_dma_version.map    |    4 +
>>  drivers/net/vhost_dma/virtio_net.c                 | 1234 ++++++++++++++++
>>  mk/rte.app.mk                                      |    1 +
>>  11 files changed, 3259 insertions(+), 1 deletion(-)
>>  create mode 100644 drivers/net/vhost_dma/Makefile
>>  create mode 100644 drivers/net/vhost_dma/eth_vhost.c
>>  create mode 100644 drivers/net/vhost_dma/eth_vhost.h
>>  create mode 100644 drivers/net/vhost_dma/internal.h
>>  create mode 100644 drivers/net/vhost_dma/rte_pmd_vhost_dma_version.map
>>  create mode 100644 drivers/net/vhost_dma/virtio_net.c
> 
> You need to add Meson support.
> 
> 
> More generally, I have been through the series and I'm not sure having a
> dedicated PMD driver for this is a good idea due to all the code
> duplication it implies.
> 
> I understand it has been done this way to avoid impacting the pure SW
> datapath implementation. But I'm sure the series could be reduced to a
> few hundred of lines if it was integrated in vhost-user library.
> Moreover, your series does not support packed ring, so it means even
> more code would need to be duplicated in the end.
> 
> What do you think?
> 
> Thanks,
> Maxime
>
Hu, Jiayu Dec. 18, 2019, 2:51 a.m. UTC | #3
Hi Maxime,

Thanks for your suggestions. Replies are inline.

> -----Original Message-----
> From: Maxime Coquelin <maxime.coquelin@redhat.com>
> Sent: Tuesday, December 17, 2019 6:20 PM
> To: Hu, Jiayu <jiayu.hu@intel.com>; dev@dpdk.org
> Cc: Bie, Tiwei <tiwei.bie@intel.com>; Wang, Zhihong
> <zhihong.wang@intel.com>; Richardson, Bruce
> <bruce.richardson@intel.com>; Honnappa Nagarahalli
> <Honnappa.Nagarahalli@arm.com>; Hemant Agrawal
> <hemant.agrawal@nxp.com>; jerinj@marvell.com
> Subject: Re: [dpdk-dev] [RFC v2 2/2] net/vhost_dma: add vHost DMA driver
> 
> 
> >> The PMD has 4 parameters.
> >>  - iface: The parameter is used to specify a path to connect to a
> >>  	front end device.
> >>  - queues: The parameter is used to specify the number of the queues
> >>  	front end device has (Default is 1).
> >>  - client: The parameter is used to specify the vhost port working as
> >>  	client mode or server mode (Default is server mode).
> >>  - dmas: This parameter is used to specify the assigned DMA device
> >>  	of a queue.
> >>
> >> Here is an example.
> >> $ ./testpmd -c f -n 4 \
> >> --vdev
> 'dma_vhost0,iface=/tmp/sock0,queues=1,dmas=txq0@00:04.0,client=0'
> >
> > dma_vhost0 is not a good name, you have to mention it is net specific.
> >
> > Is there a tool to list available DMA engines?
> 
> Thinking at it again, wouldn't it be possible that the user doesn't
> specify a specific DMA device ID, but instead allocate one device
> at init time by specifying all the capabilities the DMA device need
> to match?
> 
> If no DMA device available with matching capabilities, then fallback to
> SW mode.

I think it’s a good way to allocate DMAs by capabilities, like NUMA node. But
the problem is how to express the relationship between queues and DMAs.
As RX/TX is per-queue basis, we need to know if a queue needs DMA acceleration
or not. Current vdev parameters explicitly specify which queue uses which
DMA; if we use DMA capability, will the parameters look too complex? Like
dmas={txq0@(numa0, other capabilities);txq1@(numa0, ...)}

Any suggestions?

> 
> Also, I think we don't want to call directly IOAT API directly here, but
> instead introduce a DMA library so that the Vhost DMA stuff isn't vendor
> specific.

As I know, all DMA engines in DPDK are implemented as rawdev and there is no
unified API for them. If it's necessary, we can provide a DMA library for them
in the future, so that more scenarios can benefit from the DMA engines.

IMO, vhost is one of consumers of the DMA engines, so the DMA library doesn't
have to be inside vhost; in addition, we just know IOAT can accelerate vhost, but
don't know about other DMA engines. Therefore, at this point, I think it's OK to
directly call IOAT API. Furthermore, we can replace IOAT specific API in vhost PMD,
if there is a DMA library in the future; as long as current vdev parameters
for DMA are generic enough, internal API replacement will not be visible to users.

How do you think?

Regards,
Jiayu

> 
> Adding a few ARM people in cc, to know whether they have plan/interrest
> in supporting DMA acceleration for Vhost.
> 
> Regards,
> Maxime
>
Hu, Jiayu Dec. 18, 2019, 3:11 a.m. UTC | #4
Hi Maxime,

Replies are inline.

> -----Original Message-----
> From: Maxime Coquelin <maxime.coquelin@redhat.com>
> Sent: Tuesday, December 17, 2019 4:27 PM
> To: Hu, Jiayu <jiayu.hu@intel.com>; dev@dpdk.org
> Cc: Bie, Tiwei <tiwei.bie@intel.com>; Wang, Zhihong
> <zhihong.wang@intel.com>; Richardson, Bruce
> <bruce.richardson@intel.com>
> Subject: Re: [RFC v2 2/2] net/vhost_dma: add vHost DMA driver
> 
> Hi Jiayu,
> > Here is an example.
> > $ ./testpmd -c f -n 4 \
> > --vdev
> 'dma_vhost0,iface=/tmp/sock0,queues=1,dmas=txq0@00:04.0,client=0'
> 
> dma_vhost0 is not a good name, you have to mention it is net specific.
> 
> Is there a tool to list available DMA engines?

Yes, you can use dpdk-devbind.py to list available DMA engines.

> 
> >
> > Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
> > ---
> >  config/common_base                                 |    2 +
> >  config/common_linux                                |    1 +
> >  drivers/Makefile                                   |    2 +-
> >  drivers/net/Makefile                               |    1 +
> >  drivers/net/vhost_dma/Makefile                     |   31 +
> >  drivers/net/vhost_dma/eth_vhost.c                  | 1495
> ++++++++++++++++++++
> >  drivers/net/vhost_dma/eth_vhost.h                  |  264 ++++
> >  drivers/net/vhost_dma/internal.h                   |  225 +++
> >  .../net/vhost_dma/rte_pmd_vhost_dma_version.map    |    4 +
> >  drivers/net/vhost_dma/virtio_net.c                 | 1234 ++++++++++++++++
> >  mk/rte.app.mk                                      |    1 +
> >  11 files changed, 3259 insertions(+), 1 deletion(-)
> >  create mode 100644 drivers/net/vhost_dma/Makefile
> >  create mode 100644 drivers/net/vhost_dma/eth_vhost.c
> >  create mode 100644 drivers/net/vhost_dma/eth_vhost.h
> >  create mode 100644 drivers/net/vhost_dma/internal.h
> >  create mode 100644
> drivers/net/vhost_dma/rte_pmd_vhost_dma_version.map
> >  create mode 100644 drivers/net/vhost_dma/virtio_net.c
> 
> You need to add Meson support.

Will add meson build later.

> 
> 
> More generally, I have been through the series and I'm not sure having a
> dedicated PMD driver for this is a good idea due to all the code
> duplication it implies.
> 
> I understand it has been done this way to avoid impacting the pure SW
> datapath implementation. But I'm sure the series could be reduced to a
> few hundred of lines if it was integrated in vhost-user library.
> Moreover, your series does not support packed ring, so it means even
> more code would need to be duplicated in the end.

Yes, providing a new PMD is to avoid impacting vhost library. To avoid
too much duplicated code, we can just provide a separate DMA accelerated
data-path in vhost-user PMD, rather than introducing a new PMD. How
do you think?

Thanks,
Jiayu
> 
> What do you think?
> 
> Thanks,
> Maxime

Patch
diff mbox series

diff --git a/config/common_base b/config/common_base
index b2be3d9..8029519 100644
--- a/config/common_base
+++ b/config/common_base
@@ -1026,6 +1026,8 @@  CONFIG_RTE_LIBRTE_VHOST_DEBUG=n
 #
 CONFIG_RTE_LIBRTE_PMD_VHOST=n
 
+CONFIG_RTE_LIBRTE_PMD_VHOST_DMA=n
+
 #
 # Compile IFC driver
 # To compile, CONFIG_RTE_LIBRTE_VHOST and CONFIG_RTE_EAL_VFIO
diff --git a/config/common_linux b/config/common_linux
index 96e2e1f..1b2b1f2 100644
--- a/config/common_linux
+++ b/config/common_linux
@@ -17,6 +17,7 @@  CONFIG_RTE_LIBRTE_VHOST=y
 CONFIG_RTE_LIBRTE_VHOST_NUMA=y
 CONFIG_RTE_LIBRTE_VHOST_POSTCOPY=n
 CONFIG_RTE_LIBRTE_PMD_VHOST=y
+CONFIG_RTE_LIBRTE_PMD_VHOST_DMA=y
 CONFIG_RTE_LIBRTE_IFC_PMD=y
 CONFIG_RTE_LIBRTE_PMD_AF_PACKET=y
 CONFIG_RTE_LIBRTE_PMD_MEMIF=y
diff --git a/drivers/Makefile b/drivers/Makefile
index 7d5da5d..6bdab76 100644
--- a/drivers/Makefile
+++ b/drivers/Makefile
@@ -9,7 +9,7 @@  DEPDIRS-bus := common
 DIRS-y += mempool
 DEPDIRS-mempool := common bus
 DIRS-y += net
-DEPDIRS-net := common bus mempool
+DEPDIRS-net := common bus mempool raw
 DIRS-$(CONFIG_RTE_LIBRTE_BBDEV) += baseband
 DEPDIRS-baseband := common bus mempool
 DIRS-$(CONFIG_RTE_LIBRTE_CRYPTODEV) += crypto
diff --git a/drivers/net/Makefile b/drivers/net/Makefile
index cee3036..5535ca5 100644
--- a/drivers/net/Makefile
+++ b/drivers/net/Makefile
@@ -71,6 +71,7 @@  endif # $(CONFIG_RTE_LIBRTE_SCHED)
 
 ifeq ($(CONFIG_RTE_LIBRTE_VHOST),y)
 DIRS-$(CONFIG_RTE_LIBRTE_PMD_VHOST) += vhost
+DIRS-$(CONFIG_RTE_LIBRTE_PMD_VHOST_DMA) += vhost_dma
 ifeq ($(CONFIG_RTE_EAL_VFIO),y)
 DIRS-$(CONFIG_RTE_LIBRTE_IFC_PMD) += ifc
 endif
diff --git a/drivers/net/vhost_dma/Makefile b/drivers/net/vhost_dma/Makefile
new file mode 100644
index 0000000..da4dd49
--- /dev/null
+++ b/drivers/net/vhost_dma/Makefile
@@ -0,0 +1,31 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2019 Intel Corporation
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+#
+# library name
+#
+LIB = librte_pmd_vhost_dma.a
+
+LDLIBS += -lpthread
+LDLIBS += -lrte_eal -lrte_mbuf -lrte_mempool -lrte_ring
+LDLIBS += -lrte_ethdev -lrte_net -lrte_kvargs -lrte_vhost
+LDLIBS += -lrte_bus_vdev
+LDLIBS += -lrte_rawdev -lrte_rawdev_ioat
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+CFLAGS += -fno-strict-aliasing
+CFLAGS += -DALLOW_EXPERIMENTAL_API
+
+EXPORT_MAP := rte_pmd_vhost_dma_version.map
+
+LIBABIVER := 1
+
+#
+# all source are stored in SRCS-y
+#
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_VHOST_DMA) += eth_vhost.c virtio_net.c
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/drivers/net/vhost_dma/eth_vhost.c b/drivers/net/vhost_dma/eth_vhost.c
new file mode 100644
index 0000000..72e199e
--- /dev/null
+++ b/drivers/net/vhost_dma/eth_vhost.c
@@ -0,0 +1,1495 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+#include <unistd.h>
+#include <pthread.h>
+#include <stdbool.h>
+
+#include <rte_mbuf.h>
+#include <rte_ethdev_driver.h>
+#include <rte_ethdev_vdev.h>
+#include <rte_malloc.h>
+#include <rte_memcpy.h>
+#include <rte_bus_vdev.h>
+#include <rte_kvargs.h>
+#include <rte_vhost.h>
+#include <rte_spinlock.h>
+#include <rte_log.h>
+#include <rte_string_fns.h>
+#include <rte_rawdev.h>
+#include <rte_ioat_rawdev.h>
+
+#include "eth_vhost.h"
+
+int vhost_dma_logtype;
+
+enum {VIRTIO_RXQ, VIRTIO_TXQ, VIRTIO_QNUM};
+
+#define ETH_VHOST_IFACE_ARG		"iface"
+#define ETH_VHOST_QUEUES_ARG		"queues"
+#define ETH_VHOST_CLIENT_ARG		"client"
+#define ETH_VHOST_DMA_ARG		"dmas"
+
+static const char *valid_arguments[] = {
+	ETH_VHOST_IFACE_ARG,
+	ETH_VHOST_QUEUES_ARG,
+	ETH_VHOST_CLIENT_ARG,
+	ETH_VHOST_DMA_ARG,
+	NULL
+};
+
+static struct rte_ether_addr base_eth_addr = {
+	.addr_bytes = {
+		0x56 /* V */,
+		0x48 /* H */,
+		0x4F /* O */,
+		0x53 /* S */,
+		0x54 /* T */,
+		0x00
+	}
+};
+
+struct internal_list {
+	TAILQ_ENTRY(internal_list) next;
+	struct rte_eth_dev *eth_dev;
+};
+
+TAILQ_HEAD(internal_list_head, internal_list);
+static struct internal_list_head internal_list =
+	TAILQ_HEAD_INITIALIZER(internal_list);
+
+static pthread_mutex_t internal_list_lock = PTHREAD_MUTEX_INITIALIZER;
+
+static struct rte_eth_link pmd_link = {
+		.link_speed = 10000,
+		.link_duplex = ETH_LINK_FULL_DUPLEX,
+		.link_status = ETH_LINK_DOWN
+};
+
+#define VHOST_XSTATS_NAME_SIZE 64
+
+struct vhost_xstats_name_off {
+	char name[VHOST_XSTATS_NAME_SIZE];
+	uint64_t offset;
+};
+
+/* [rx]_is prepended to the name string here */
+static const struct vhost_xstats_name_off vhost_rxport_stat_strings[] = {
+	{"good_packets",
+	 offsetof(struct vhost_queue, stats.pkts)},
+	{"total_bytes",
+	 offsetof(struct vhost_queue, stats.bytes)},
+	{"missed_pkts",
+	 offsetof(struct vhost_queue, stats.missed_pkts)},
+	{"broadcast_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_BROADCAST_PKT])},
+	{"multicast_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_MULTICAST_PKT])},
+	{"unicast_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_UNICAST_PKT])},
+	 {"undersize_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_UNDERSIZE_PKT])},
+	{"size_64_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_64_PKT])},
+	{"size_65_to_127_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_65_TO_127_PKT])},
+	{"size_128_to_255_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_128_TO_255_PKT])},
+	{"size_256_to_511_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_256_TO_511_PKT])},
+	{"size_512_to_1023_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_512_TO_1023_PKT])},
+	{"size_1024_to_1522_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_1024_TO_1522_PKT])},
+	{"size_1523_to_max_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_1523_TO_MAX_PKT])},
+	{"errors_with_bad_CRC",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_ERRORS_PKT])},
+	{"fragmented_errors",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_ERRORS_FRAGMENTED])},
+	{"jabber_errors",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_ERRORS_JABBER])},
+	{"unknown_protos_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_UNKNOWN_PROTOCOL])},
+};
+
+/* [tx]_ is prepended to the name string here */
+static const struct vhost_xstats_name_off vhost_txport_stat_strings[] = {
+	{"good_packets",
+	 offsetof(struct vhost_queue, stats.pkts)},
+	{"total_bytes",
+	 offsetof(struct vhost_queue, stats.bytes)},
+	{"missed_pkts",
+	 offsetof(struct vhost_queue, stats.missed_pkts)},
+	{"broadcast_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_BROADCAST_PKT])},
+	{"multicast_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_MULTICAST_PKT])},
+	{"unicast_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_UNICAST_PKT])},
+	{"undersize_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_UNDERSIZE_PKT])},
+	{"size_64_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_64_PKT])},
+	{"size_65_to_127_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_65_TO_127_PKT])},
+	{"size_128_to_255_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_128_TO_255_PKT])},
+	{"size_256_to_511_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_256_TO_511_PKT])},
+	{"size_512_to_1023_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_512_TO_1023_PKT])},
+	{"size_1024_to_1522_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_1024_TO_1522_PKT])},
+	{"size_1523_to_max_packets",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_1523_TO_MAX_PKT])},
+	{"errors_with_bad_CRC",
+	 offsetof(struct vhost_queue, stats.xstats[VHOST_ERRORS_PKT])},
+};
+
+#define VHOST_NB_XSTATS_RXPORT (sizeof(vhost_rxport_stat_strings) / \
+				sizeof(vhost_rxport_stat_strings[0]))
+
+#define VHOST_NB_XSTATS_TXPORT (sizeof(vhost_txport_stat_strings) / \
+				sizeof(vhost_txport_stat_strings[0]))
+
+static int
+vhost_dev_xstats_reset(struct rte_eth_dev *dev)
+{
+	struct vhost_queue *vq = NULL;
+	unsigned int i = 0;
+
+	for (i = 0; i < dev->data->nb_rx_queues; i++) {
+		vq = dev->data->rx_queues[i];
+		if (!vq)
+			continue;
+		memset(&vq->stats, 0, sizeof(vq->stats));
+	}
+	for (i = 0; i < dev->data->nb_tx_queues; i++) {
+		vq = dev->data->tx_queues[i];
+		if (!vq)
+			continue;
+		memset(&vq->stats, 0, sizeof(vq->stats));
+	}
+
+	return 0;
+}
+
+static int
+vhost_dev_xstats_get_names(struct rte_eth_dev *dev __rte_unused,
+			   struct rte_eth_xstat_name *xstats_names,
+			   unsigned int limit __rte_unused)
+{
+	unsigned int t = 0;
+	int count = 0;
+	int nstats = VHOST_NB_XSTATS_RXPORT + VHOST_NB_XSTATS_TXPORT;
+
+	if (!xstats_names)
+		return nstats;
+	for (t = 0; t < VHOST_NB_XSTATS_RXPORT; t++) {
+		snprintf(xstats_names[count].name,
+			 sizeof(xstats_names[count].name),
+			 "rx_%s", vhost_rxport_stat_strings[t].name);
+		count++;
+	}
+	for (t = 0; t < VHOST_NB_XSTATS_TXPORT; t++) {
+		snprintf(xstats_names[count].name,
+			 sizeof(xstats_names[count].name),
+			 "tx_%s", vhost_txport_stat_strings[t].name);
+		count++;
+	}
+	return count;
+}
+
+static int
+vhost_dev_xstats_get(struct rte_eth_dev *dev, struct rte_eth_xstat *xstats,
+		     unsigned int n)
+{
+	unsigned int i;
+	unsigned int t;
+	unsigned int count = 0;
+	struct vhost_queue *vq = NULL;
+	unsigned int nxstats = VHOST_NB_XSTATS_RXPORT + VHOST_NB_XSTATS_TXPORT;
+
+	if (n < nxstats)
+		return nxstats;
+
+	for (i = 0; i < dev->data->nb_rx_queues; i++) {
+		vq = dev->data->rx_queues[i];
+		if (!vq)
+			continue;
+		vq->stats.xstats[VHOST_UNICAST_PKT] = vq->stats.pkts
+				- (vq->stats.xstats[VHOST_BROADCAST_PKT]
+				+ vq->stats.xstats[VHOST_MULTICAST_PKT]);
+	}
+	for (i = 0; i < dev->data->nb_tx_queues; i++) {
+		vq = dev->data->tx_queues[i];
+		if (!vq)
+			continue;
+		vq->stats.xstats[VHOST_UNICAST_PKT] = vq->stats.pkts
+				+ vq->stats.missed_pkts
+				- (vq->stats.xstats[VHOST_BROADCAST_PKT]
+				+ vq->stats.xstats[VHOST_MULTICAST_PKT]);
+	}
+	for (t = 0; t < VHOST_NB_XSTATS_RXPORT; t++) {
+		xstats[count].value = 0;
+		for (i = 0; i < dev->data->nb_rx_queues; i++) {
+			vq = dev->data->rx_queues[i];
+			if (!vq)
+				continue;
+			xstats[count].value +=
+				*(uint64_t *)(((char *)vq)
+				+ vhost_rxport_stat_strings[t].offset);
+		}
+		xstats[count].id = count;
+		count++;
+	}
+	for (t = 0; t < VHOST_NB_XSTATS_TXPORT; t++) {
+		xstats[count].value = 0;
+		for (i = 0; i < dev->data->nb_tx_queues; i++) {
+			vq = dev->data->tx_queues[i];
+			if (!vq)
+				continue;
+			xstats[count].value +=
+				*(uint64_t *)(((char *)vq)
+				+ vhost_txport_stat_strings[t].offset);
+		}
+		xstats[count].id = count;
+		count++;
+	}
+	return count;
+}
+
+static inline void
+vhost_count_multicast_broadcast(struct vhost_queue *vq,
+				struct rte_mbuf *mbuf)
+{
+	struct rte_ether_addr *ea = NULL;
+	struct vhost_stats *pstats = &vq->stats;
+
+	ea = rte_pktmbuf_mtod(mbuf, struct rte_ether_addr *);
+	if (rte_is_multicast_ether_addr(ea)) {
+		if (rte_is_broadcast_ether_addr(ea))
+			pstats->xstats[VHOST_BROADCAST_PKT]++;
+		else
+			pstats->xstats[VHOST_MULTICAST_PKT]++;
+	}
+}
+
+static void
+vhost_update_packet_xstats(struct vhost_queue *vq,
+			   struct rte_mbuf **bufs,
+			   uint16_t count)
+{
+	uint32_t pkt_len = 0;
+	uint64_t i = 0;
+	uint64_t index;
+	struct vhost_stats *pstats = &vq->stats;
+
+	for (i = 0; i < count ; i++) {
+		pkt_len = bufs[i]->pkt_len;
+		if (pkt_len == 64) {
+			pstats->xstats[VHOST_64_PKT]++;
+		} else if (pkt_len > 64 && pkt_len < 1024) {
+			index = (sizeof(pkt_len) * 8)
+				- __builtin_clz(pkt_len) - 5;
+			pstats->xstats[index]++;
+		} else {
+			if (pkt_len < 64)
+				pstats->xstats[VHOST_UNDERSIZE_PKT]++;
+			else if (pkt_len <= 1522)
+				pstats->xstats[VHOST_1024_TO_1522_PKT]++;
+			else if (pkt_len > 1522)
+				pstats->xstats[VHOST_1523_TO_MAX_PKT]++;
+		}
+		vhost_count_multicast_broadcast(vq, bufs[i]);
+	}
+}
+
+static uint16_t
+eth_vhost_rx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
+{
+	struct vhost_queue *queue = q;
+	uint16_t i, nb_rx = 0;
+	uint16_t nb_receive = nb_bufs;
+	struct pmd_internal *dev = queue->internal;
+
+	if (unlikely(rte_atomic32_read(&queue->allow_queuing) == 0))
+		return 0;
+
+	rte_atomic32_set(&queue->while_queuing, 1);
+
+	if (unlikely(rte_atomic32_read(&queue->allow_queuing) == 0))
+		goto out;
+
+	/* get packets from guest's TX queue */
+	while (nb_receive) {
+		uint16_t nb_pkts;
+		uint16_t num = (uint16_t)RTE_MIN(nb_receive,
+						 VHOST_MAX_PKT_BURST);
+
+		nb_pkts = vhost_dma_dequeue_burst(dev, queue->dma_vring,
+						   queue->mb_pool, &bufs[nb_rx],
+						   num);
+
+		nb_rx += nb_pkts;
+		nb_receive -= nb_pkts;
+		if (nb_pkts < num)
+			break;
+	}
+
+	queue->stats.pkts += nb_rx;
+
+	for (i = 0; likely(i < nb_rx); i++) {
+		bufs[i]->port = queue->port;
+		bufs[i]->vlan_tci = 0;
+
+		if (queue->internal->vlan_strip)
+			rte_vlan_strip(bufs[i]);
+
+		queue->stats.bytes += bufs[i]->pkt_len;
+	}
+
+	vhost_update_packet_xstats(queue, bufs, nb_rx);
+
+out:
+	rte_atomic32_set(&queue->while_queuing, 0);
+
+	return nb_rx;
+}
+
+static uint16_t
+eth_vhost_tx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
+{
+	struct vhost_queue *queue = q;
+	struct pmd_internal *dev = queue->internal;
+	uint16_t i, nb_tx = 0;
+	uint16_t nb_send = 0;
+
+	if (unlikely(rte_atomic32_read(&queue->allow_queuing) == 0))
+		return 0;
+
+	rte_atomic32_set(&queue->while_queuing, 1);
+
+	if (unlikely(rte_atomic32_read(&queue->allow_queuing) == 0))
+		goto out;
+
+	for (i = 0; i < nb_bufs; i++) {
+		struct rte_mbuf *m = bufs[i];
+
+		/* do VLAN tag insertion */
+		if (m->ol_flags & PKT_TX_VLAN_PKT) {
+			int error = rte_vlan_insert(&m);
+
+			if (unlikely(error)) {
+				rte_pktmbuf_free(m);
+				continue;
+			}
+		}
+
+		bufs[nb_send] = m;
+		++nb_send;
+	}
+
+	/* send packets to guest's RX queue */
+	while (nb_send) {
+		uint16_t nb_pkts;
+		uint16_t num = (uint16_t)RTE_MIN(nb_send,
+						 VHOST_MAX_PKT_BURST);
+
+		nb_pkts = vhost_dma_enqueue_burst(dev, queue->dma_vring,
+						   &bufs[nb_tx], num);
+
+		nb_tx += nb_pkts;
+		nb_send -= nb_pkts;
+		if (nb_pkts < num)
+			break;
+	}
+
+	queue->stats.pkts += nb_tx;
+	queue->stats.missed_pkts += nb_bufs - nb_tx;
+
+	for (i = 0; likely(i < nb_tx); i++)
+		queue->stats.bytes += bufs[i]->pkt_len;
+
+	vhost_update_packet_xstats(queue, bufs, nb_tx);
+
+	/**
+	 * According to RFC2863 page42 section ifHCOutMulticastPkts and
+	 * ifHCOutBroadcastPkts, the counters "multicast" and "broadcast"
+	 * are increased when packets are not transmitted successfully.
+	 */
+	for (i = nb_tx; i < nb_bufs; i++)
+		vhost_count_multicast_broadcast(queue, bufs[i]);
+out:
+	rte_atomic32_set(&queue->while_queuing, 0);
+
+	return nb_tx;
+}
+
+static int
+eth_dev_configure(struct rte_eth_dev *dev __rte_unused)
+{
+	struct pmd_internal *internal = dev->data->dev_private;
+	const struct rte_eth_rxmode *rxmode = &dev->data->dev_conf.rxmode;
+
+	internal->vlan_strip = !!(rxmode->offloads & DEV_RX_OFFLOAD_VLAN_STRIP);
+
+	return 0;
+}
+
+static inline struct internal_list *
+find_internal_resource(char *ifname)
+{
+	int found = 0;
+	struct internal_list *list;
+	struct pmd_internal *internal;
+
+	if (!ifname)
+		return NULL;
+
+	pthread_mutex_lock(&internal_list_lock);
+
+	TAILQ_FOREACH(list, &internal_list, next) {
+		internal = list->eth_dev->data->dev_private;
+		if (!strcmp(internal->iface_name, ifname)) {
+			found = 1;
+			break;
+		}
+	}
+
+	pthread_mutex_unlock(&internal_list_lock);
+
+	if (!found)
+		return NULL;
+
+	return list;
+}
+
+static int
+eth_rxq_intr_enable(struct rte_eth_dev *dev, uint16_t qid)
+{
+	struct vhost_queue *vq;
+
+	vq = dev->data->rx_queues[qid];
+	if (!vq) {
+		VHOST_LOG(ERR, "rxq%d is not setup yet\n", qid);
+		return -1;
+	}
+
+	VHOST_LOG(INFO, "Enable interrupt for rxq%d\n", qid);
+	vhost_dma_enable_guest_notification(dev->data->dev_private,
+					     vq->dma_vring, 1);
+	rte_wmb();
+
+	return 0;
+}
+
+static int
+eth_rxq_intr_disable(struct rte_eth_dev *dev, uint16_t qid)
+{
+	struct vhost_queue *vq;
+
+	vq = dev->data->rx_queues[qid];
+	if (!vq) {
+		VHOST_LOG(ERR, "rxq%d is not setup yet\n", qid);
+		return -1;
+	}
+
+	VHOST_LOG(INFO, "Disable interrupt for rxq%d\n", qid);
+	vhost_dma_enable_guest_notification(dev->data->dev_private,
+					     vq->dma_vring, 0);
+	rte_wmb();
+
+	return 0;
+}
+
+static void
+eth_vhost_uninstall_intr(struct rte_eth_dev *dev)
+{
+	struct rte_intr_handle *intr_handle = dev->intr_handle;
+
+	if (intr_handle) {
+		if (intr_handle->intr_vec)
+			free(intr_handle->intr_vec);
+		free(intr_handle);
+	}
+
+	dev->intr_handle = NULL;
+}
+
+static int
+eth_vhost_install_intr(struct rte_eth_dev *dev)
+{
+	struct rte_vhost_vring *vv;
+	struct vhost_queue *vq;
+	int count = 0;
+	int nb_rxq = dev->data->nb_rx_queues;
+	int i;
+
+	/* uninstall firstly if we are reconnecting */
+	if (dev->intr_handle)
+		eth_vhost_uninstall_intr(dev);
+
+	dev->intr_handle = malloc(sizeof(*dev->intr_handle));
+	if (!dev->intr_handle) {
+		VHOST_LOG(ERR, "Fail to allocate intr_handle\n");
+		return -ENOMEM;
+	}
+	memset(dev->intr_handle, 0, sizeof(*dev->intr_handle));
+
+	dev->intr_handle->efd_counter_size = sizeof(uint64_t);
+
+	dev->intr_handle->intr_vec =
+		malloc(nb_rxq * sizeof(dev->intr_handle->intr_vec[0]));
+
+	if (!dev->intr_handle->intr_vec) {
+		VHOST_LOG(ERR,
+			  "Failed to allocate memory for interrupt vector\n");
+		free(dev->intr_handle);
+		return -ENOMEM;
+	}
+
+	VHOST_LOG(INFO, "Prepare intr vec\n");
+	for (i = 0; i < nb_rxq; i++) {
+		vq = dev->data->rx_queues[i];
+		if (!vq) {
+			VHOST_LOG(INFO, "rxq-%d not setup yet, skip!\n", i);
+			continue;
+		}
+
+		vv = &vq->dma_vring->vr;
+		if (vv->kickfd < 0) {
+			VHOST_LOG(INFO,
+				  "rxq-%d's kickfd is invalid, skip!\n", i);
+			continue;
+		}
+		dev->intr_handle->intr_vec[i] = RTE_INTR_VEC_RXTX_OFFSET + i;
+		dev->intr_handle->efds[i] = vv->kickfd;
+		count++;
+		VHOST_LOG(INFO, "Installed intr vec for rxq-%d\n", i);
+	}
+
+	dev->intr_handle->nb_efd = count;
+	dev->intr_handle->max_intr = count + 1;
+	dev->intr_handle->type = RTE_INTR_HANDLE_VDEV;
+
+	return 0;
+}
+
+static void
+update_queuing_status(struct rte_eth_dev *dev)
+{
+	struct pmd_internal *internal = dev->data->dev_private;
+	struct vhost_queue *vq;
+	unsigned int i;
+	int allow_queuing = 1;
+
+	if (!dev->data->rx_queues || !dev->data->tx_queues)
+		return;
+
+	if (rte_atomic32_read(&internal->started) == 0 ||
+	    rte_atomic32_read(&internal->dev_attached) == 0)
+		allow_queuing = 0;
+
+	/* wait until rx/tx_pkt_burst stops accessing vhost device */
+	for (i = 0; i < dev->data->nb_rx_queues; i++) {
+		vq = dev->data->rx_queues[i];
+		if (vq == NULL)
+			continue;
+		rte_atomic32_set(&vq->allow_queuing, allow_queuing);
+		while (rte_atomic32_read(&vq->while_queuing))
+			rte_pause();
+	}
+
+	for (i = 0; i < dev->data->nb_tx_queues; i++) {
+		vq = dev->data->tx_queues[i];
+		if (vq == NULL)
+			continue;
+		rte_atomic32_set(&vq->allow_queuing, allow_queuing);
+		while (rte_atomic32_read(&vq->while_queuing))
+			rte_pause();
+	}
+}
+
+static void
+queue_setup(struct rte_eth_dev *eth_dev, struct pmd_internal *internal)
+{
+	struct vhost_queue *vq;
+	int i;
+
+	for (i = 0; i < eth_dev->data->nb_rx_queues; i++) {
+		vq = eth_dev->data->rx_queues[i];
+		if (!vq)
+			continue;
+		vq->vid = internal->vid;
+		vq->internal = internal;
+		vq->port = eth_dev->data->port_id;
+		vq->dma_vring = &internal->dma_vrings[vq->virtqueue_id];
+	}
+	for (i = 0; i < eth_dev->data->nb_tx_queues; i++) {
+		vq = eth_dev->data->tx_queues[i];
+		if (!vq)
+			continue;
+		vq->vid = internal->vid;
+		vq->internal = internal;
+		vq->port = eth_dev->data->port_id;
+		vq->dma_vring = &internal->dma_vrings[vq->virtqueue_id];
+	}
+}
+
+static int
+new_device(int vid)
+{
+	struct rte_eth_dev *eth_dev;
+	struct internal_list *list;
+	struct pmd_internal *internal;
+	struct rte_eth_conf *dev_conf;
+	unsigned i;
+	char ifname[PATH_MAX];
+
+	rte_vhost_get_ifname(vid, ifname, sizeof(ifname));
+	list = find_internal_resource(ifname);
+	if (list == NULL) {
+		VHOST_LOG(INFO, "Invalid device name: %s\n", ifname);
+		return -1;
+	}
+
+	eth_dev = list->eth_dev;
+	internal = eth_dev->data->dev_private;
+	dev_conf = &eth_dev->data->dev_conf;
+
+	internal->vid = vid;
+
+	if (vhost_dma_setup(internal) < 0) {
+		VHOST_LOG(ERR, "Failed to set up vring operations\n");
+		return -1;
+	}
+
+	if (rte_atomic32_read(&internal->started) == 1) {
+		queue_setup(eth_dev, internal);
+
+		if (dev_conf->intr_conf.rxq) {
+			if (eth_vhost_install_intr(eth_dev) < 0) {
+				VHOST_LOG(INFO, "Failed to install "
+					  "interrupt handler.");
+				return -1;
+			}
+		}
+	} else {
+		VHOST_LOG(INFO, "RX/TX queues not exist yet\n");
+	}
+
+	for (i = 0; i < rte_vhost_get_vring_num(vid); i++) {
+		vhost_dma_enable_guest_notification(internal,
+						    &internal->dma_vrings[i],
+						    0);
+	}
+
+	rte_vhost_get_mtu(vid, &eth_dev->data->mtu);
+
+	eth_dev->data->dev_link.link_status = ETH_LINK_UP;
+
+	rte_atomic32_set(&internal->dev_attached, 1);
+	update_queuing_status(eth_dev);
+
+	VHOST_LOG(INFO, "vHost DMA device %d created\n", vid);
+
+	_rte_eth_dev_callback_process(eth_dev, RTE_ETH_EVENT_INTR_LSC, NULL);
+
+	return 0;
+}
+
+static void
+destroy_device(int vid)
+{
+	struct rte_eth_dev *eth_dev;
+	struct pmd_internal *internal;
+	struct vhost_queue *vq;
+	struct internal_list *list;
+	char ifname[PATH_MAX];
+	unsigned i;
+
+	rte_vhost_get_ifname(vid, ifname, sizeof(ifname));
+	list = find_internal_resource(ifname);
+	if (list == NULL) {
+		VHOST_LOG(ERR, "Invalid interface name: %s\n", ifname);
+		return;
+	}
+	eth_dev = list->eth_dev;
+	internal = eth_dev->data->dev_private;
+
+	rte_atomic32_set(&internal->dev_attached, 0);
+	update_queuing_status(eth_dev);
+
+	eth_dev->data->dev_link.link_status = ETH_LINK_DOWN;
+
+	/**
+	 * Before destroy front end's information, we must guarantee
+	 * that RX/TX threads have stopped accessing queues.
+	 */
+	vhost_dma_remove(internal);
+
+	if (eth_dev->data->rx_queues && eth_dev->data->tx_queues) {
+		for (i = 0; i < eth_dev->data->nb_rx_queues; i++) {
+			vq = eth_dev->data->rx_queues[i];
+			if (!vq)
+				continue;
+			vq->vid = -1;
+		}
+		for (i = 0; i < eth_dev->data->nb_tx_queues; i++) {
+			vq = eth_dev->data->tx_queues[i];
+			if (!vq)
+				continue;
+			vq->vid = -1;
+		}
+	}
+
+	VHOST_LOG(INFO, "vHost DMA device %d destroyed\n", vid);
+	eth_vhost_uninstall_intr(eth_dev);
+
+	_rte_eth_dev_callback_process(eth_dev, RTE_ETH_EVENT_INTR_LSC, NULL);
+}
+
+#define IOAT_RING_SIZE 1024
+
+static int
+vring_state_changed(int vid, uint16_t vring, int enable)
+{
+	struct rte_eth_dev *eth_dev;
+	struct internal_list *list;
+	char ifname[PATH_MAX];
+	struct pmd_internal *dev;
+	struct dma_vring *dma_vr;
+	struct rte_ioat_rawdev_config config;
+	struct rte_rawdev_info info = { .dev_private = &config };
+	char name[32];
+
+	rte_vhost_get_ifname(vid, ifname, sizeof(ifname));
+	list = find_internal_resource(ifname);
+	if (list == NULL) {
+		VHOST_LOG(ERR, "Invalid interface name: %s\n", ifname);
+		return -1;
+	}
+
+	VHOST_LOG(INFO, "vring %u is %s\n", vring,
+		  enable ? "enabled" : "disabled");
+
+	eth_dev = list->eth_dev;
+	_rte_eth_dev_callback_process(eth_dev, RTE_ETH_EVENT_QUEUE_STATE, NULL);
+
+	if (!enable)
+		return 0;
+
+	dev = eth_dev->data->dev_private;
+
+	/**
+	 * a vring can only use one DMA device. If it has been
+	 * assigned one, return immediately.
+	 */
+	dma_vr = &dev->dma_vrings[vring];
+	if (dma_vr->dma_enabled)
+		return 0;
+
+	if (!dev->dmas[vring].is_valid)
+		return 0;
+
+	/**
+	 * attach the given DMA device to the queue and
+	 * configure it. Currently, we only support I/OAT.
+	 */
+	if (dev->dmas[vring].type != IOAT) {
+		VHOST_LOG(DEBUG, "Cannot enable DMA for queue %d, since "
+			  "it is not an I/OAT device\n",
+			  dev->dmas[vring].dev_id);
+		return 0;
+	}
+
+	rte_pci_device_name(&dev->dmas[vring].addr, name, sizeof(name));
+	rte_rawdev_info_get(dev->dmas[vring].dev_id, &info);
+	config.ring_size = IOAT_RING_SIZE;
+	if (rte_rawdev_configure(dev->dmas[vring].dev_id, &info) < 0) {
+		VHOST_LOG(ERR, "Config the DMA device %s failed\n", name);
+		return -1;
+	}
+
+	rte_rawdev_start(dev->dmas[vring].dev_id);
+	memcpy(&dma_vr->dma_addr, &dev->dmas[vring].addr,
+	       sizeof(struct rte_pci_addr));
+	dma_vr->dev_id = dev->dmas[vring].dev_id;
+	dma_vr->dma_enabled = true;
+	dma_vr->nr_inflight = 0;
+	dma_vr->nr_batching = 0;
+
+	VHOST_LOG(INFO, "Attach DMA %s to the TX queue %u of port %u\n",
+		  name, vring / VIRTIO_QNUM, eth_dev->data->port_id);
+	return 0;
+}
+
+static struct vhost_device_ops vhost_ops = {
+	.new_device          = new_device,
+	.destroy_device      = destroy_device,
+	.vring_state_changed = vring_state_changed,
+};
+
+static int
+eth_dev_start(struct rte_eth_dev *eth_dev)
+{
+	struct pmd_internal *internal = eth_dev->data->dev_private;
+	struct rte_eth_conf *dev_conf = &eth_dev->data->dev_conf;
+
+	queue_setup(eth_dev, internal);
+
+	if (rte_atomic32_read(&internal->dev_attached) == 1) {
+		if (dev_conf->intr_conf.rxq) {
+			if (eth_vhost_install_intr(eth_dev) < 0) {
+				VHOST_LOG(INFO, "Failed to install "
+					  "interrupt handler.");
+				return -1;
+			}
+		}
+	}
+
+	rte_atomic32_set(&internal->started, 1);
+	update_queuing_status(eth_dev);
+
+	return 0;
+}
+
+static void
+eth_dev_stop(struct rte_eth_dev *dev)
+{
+	struct pmd_internal *internal = dev->data->dev_private;
+
+	rte_atomic32_set(&internal->started, 0);
+	update_queuing_status(dev);
+}
+
+static void
+eth_dev_close(struct rte_eth_dev *dev)
+{
+	struct pmd_internal *internal;
+	struct internal_list *list;
+	unsigned int i;
+
+	internal = dev->data->dev_private;
+	if (!internal)
+		return;
+
+	eth_dev_stop(dev);
+
+	rte_vhost_driver_unregister(internal->iface_name);
+
+	list = find_internal_resource(internal->iface_name);
+	if (!list)
+		return;
+
+	pthread_mutex_lock(&internal_list_lock);
+	TAILQ_REMOVE(&internal_list, list, next);
+	pthread_mutex_unlock(&internal_list_lock);
+	rte_free(list);
+
+	if (dev->data->rx_queues)
+		for (i = 0; i < dev->data->nb_rx_queues; i++)
+			rte_free(dev->data->rx_queues[i]);
+
+	if (dev->data->tx_queues)
+		for (i = 0; i < dev->data->nb_tx_queues; i++)
+			rte_free(dev->data->tx_queues[i]);
+
+	free(internal->dev_name);
+	free(internal->iface_name);
+	rte_free(internal);
+
+	dev->data->dev_private = NULL;
+}
+
+static int
+eth_rx_queue_setup(struct rte_eth_dev *dev, uint16_t rx_queue_id,
+		   uint16_t nb_rx_desc __rte_unused,
+		   unsigned int socket_id,
+		   const struct rte_eth_rxconf *rx_conf __rte_unused,
+		   struct rte_mempool *mb_pool)
+{
+	struct vhost_queue *vq;
+
+	vq = rte_zmalloc_socket(NULL, sizeof(struct vhost_queue),
+				RTE_CACHE_LINE_SIZE, socket_id);
+	if (vq == NULL) {
+		VHOST_LOG(ERR, "Failed to allocate memory for rx queue\n");
+		return -ENOMEM;
+	}
+
+	vq->mb_pool = mb_pool;
+	vq->virtqueue_id = rx_queue_id * VIRTIO_QNUM + VIRTIO_TXQ;
+	dev->data->rx_queues[rx_queue_id] = vq;
+
+	return 0;
+}
+
+static int
+eth_tx_queue_setup(struct rte_eth_dev *dev, uint16_t tx_queue_id,
+		   uint16_t nb_tx_desc __rte_unused,
+		   unsigned int socket_id,
+		   const struct rte_eth_txconf *tx_conf __rte_unused)
+{
+	struct vhost_queue *vq;
+
+	vq = rte_zmalloc_socket(NULL, sizeof(struct vhost_queue),
+				RTE_CACHE_LINE_SIZE, socket_id);
+	if (vq == NULL) {
+		VHOST_LOG(ERR, "Failed to allocate memory for tx queue\n");
+		return -ENOMEM;
+	}
+
+	vq->virtqueue_id = tx_queue_id * VIRTIO_QNUM + VIRTIO_RXQ;
+	dev->data->tx_queues[tx_queue_id] = vq;
+
+	return 0;
+}
+
+static int
+eth_dev_info(struct rte_eth_dev *dev,
+	     struct rte_eth_dev_info *dev_info)
+{
+	struct pmd_internal *internal;
+
+	internal = dev->data->dev_private;
+	if (internal == NULL) {
+		VHOST_LOG(ERR, "Invalid device specified\n");
+		return -ENODEV;
+	}
+
+	dev_info->max_mac_addrs = 1;
+	dev_info->max_rx_pktlen = (uint32_t)-1;
+	dev_info->max_rx_queues = internal->max_queues;
+	dev_info->max_tx_queues = internal->max_queues;
+	dev_info->min_rx_bufsize = 0;
+
+	dev_info->tx_offload_capa = DEV_TX_OFFLOAD_MULTI_SEGS |
+				DEV_TX_OFFLOAD_VLAN_INSERT;
+	dev_info->rx_offload_capa = DEV_RX_OFFLOAD_VLAN_STRIP;
+
+	return 0;
+}
+
+static int
+eth_stats_get(struct rte_eth_dev *dev, struct rte_eth_stats *stats)
+{
+	unsigned i;
+	unsigned long rx_total = 0, tx_total = 0;
+	unsigned long rx_total_bytes = 0, tx_total_bytes = 0;
+	struct vhost_queue *vq;
+
+	for (i = 0; i < RTE_ETHDEV_QUEUE_STAT_CNTRS &&
+	     i < dev->data->nb_rx_queues; i++) {
+		if (dev->data->rx_queues[i] == NULL)
+			continue;
+		vq = dev->data->rx_queues[i];
+		stats->q_ipackets[i] = vq->stats.pkts;
+		rx_total += stats->q_ipackets[i];
+
+		stats->q_ibytes[i] = vq->stats.bytes;
+		rx_total_bytes += stats->q_ibytes[i];
+	}
+
+	for (i = 0; i < RTE_ETHDEV_QUEUE_STAT_CNTRS &&
+	     i < dev->data->nb_tx_queues; i++) {
+		if (dev->data->tx_queues[i] == NULL)
+			continue;
+		vq = dev->data->tx_queues[i];
+		stats->q_opackets[i] = vq->stats.pkts;
+		tx_total += stats->q_opackets[i];
+
+		stats->q_obytes[i] = vq->stats.bytes;
+		tx_total_bytes += stats->q_obytes[i];
+	}
+
+	stats->ipackets = rx_total;
+	stats->opackets = tx_total;
+	stats->ibytes = rx_total_bytes;
+	stats->obytes = tx_total_bytes;
+
+	return 0;
+}
+
+static int
+eth_stats_reset(struct rte_eth_dev *dev)
+{
+	struct vhost_queue *vq;
+	unsigned i;
+
+	for (i = 0; i < dev->data->nb_rx_queues; i++) {
+		if (dev->data->rx_queues[i] == NULL)
+			continue;
+		vq = dev->data->rx_queues[i];
+		vq->stats.pkts = 0;
+		vq->stats.bytes = 0;
+	}
+	for (i = 0; i < dev->data->nb_tx_queues; i++) {
+		if (dev->data->tx_queues[i] == NULL)
+			continue;
+		vq = dev->data->tx_queues[i];
+		vq->stats.pkts = 0;
+		vq->stats.bytes = 0;
+		vq->stats.missed_pkts = 0;
+	}
+
+	return 0;
+}
+
+static void
+eth_queue_release(void *q)
+{
+	rte_free(q);
+}
+
+static int
+eth_tx_done_cleanup(void *txq __rte_unused, uint32_t free_cnt __rte_unused)
+{
+	/**
+	 * vHost does not hang onto mbuf. eth_vhost_tx() copies packet data
+	 * and releases mbuf, so nothing to cleanup.
+	 */
+	return 0;
+}
+
+static int
+eth_link_update(struct rte_eth_dev *dev __rte_unused,
+		int wait_to_complete __rte_unused)
+{
+	return 0;
+}
+
+static uint32_t
+eth_rx_queue_count(struct rte_eth_dev *dev, uint16_t rx_queue_id)
+{
+	struct vhost_queue *vq;
+
+	vq = dev->data->rx_queues[rx_queue_id];
+	if (unlikely(vq == NULL))
+		return 0;
+
+	return rte_vhost_rx_queue_count(vq->vid, vq->virtqueue_id);
+}
+
+static const struct eth_dev_ops ops = {
+	.dev_start = eth_dev_start,
+	.dev_stop = eth_dev_stop,
+	.dev_close = eth_dev_close,
+	.dev_configure = eth_dev_configure,
+	.dev_infos_get = eth_dev_info,
+	.rx_queue_setup = eth_rx_queue_setup,
+	.tx_queue_setup = eth_tx_queue_setup,
+	.rx_queue_release = eth_queue_release,
+	.tx_queue_release = eth_queue_release,
+	.tx_done_cleanup = eth_tx_done_cleanup,
+	.rx_queue_count = eth_rx_queue_count,
+	.link_update = eth_link_update,
+	.stats_get = eth_stats_get,
+	.stats_reset = eth_stats_reset,
+	.xstats_reset = vhost_dev_xstats_reset,
+	.xstats_get = vhost_dev_xstats_get,
+	.xstats_get_names = vhost_dev_xstats_get_names,
+	.rx_queue_intr_enable = eth_rxq_intr_enable,
+	.rx_queue_intr_disable = eth_rxq_intr_disable,
+};
+
+static int
+eth_dev_vhost_create(struct rte_vdev_device *dev, char *iface_name,
+		     int16_t queues, const unsigned int numa_node,
+		     uint64_t flags, struct dma_info *dmas)
+{
+	const char *name = rte_vdev_device_name(dev);
+	struct rte_eth_dev_data *data;
+	struct pmd_internal *internal = NULL;
+	struct rte_eth_dev *eth_dev = NULL;
+	struct rte_ether_addr *eth_addr = NULL;
+	struct internal_list *list = NULL;
+
+	VHOST_LOG(INFO, "Creating vHost-DMA backend on numa socket %u\n",
+		  numa_node);
+
+	list = rte_zmalloc_socket(name, sizeof(*list), 0, numa_node);
+	if (list == NULL)
+		goto error;
+
+	/* reserve an ethdev entry */
+	eth_dev = rte_eth_vdev_allocate(dev, sizeof(*internal));
+	if (eth_dev == NULL)
+		goto error;
+	data = eth_dev->data;
+
+	eth_addr = rte_zmalloc_socket(name, sizeof(*eth_addr), 0, numa_node);
+	if (eth_addr == NULL)
+		goto error;
+	data->mac_addrs = eth_addr;
+	*eth_addr = base_eth_addr;
+	eth_addr->addr_bytes[5] = eth_dev->data->port_id;
+
+	/**
+	 * now put it all together
+	 * - store queue data in internal,
+	 * - point eth_dev_data to internal
+	 * - and point eth_dev structure to new eth_dev_data structure
+	 */
+	internal = eth_dev->data->dev_private;
+	internal->dev_name = strdup(name);
+	if (internal->dev_name == NULL)
+		goto error;
+	internal->iface_name = strdup(iface_name);
+	if (internal->iface_name == NULL)
+		goto error;
+
+	list->eth_dev = eth_dev;
+	pthread_mutex_lock(&internal_list_lock);
+	TAILQ_INSERT_TAIL(&internal_list, list, next);
+	pthread_mutex_unlock(&internal_list_lock);
+
+	data->nb_rx_queues = queues;
+	data->nb_tx_queues = queues;
+	internal->max_queues = queues;
+	internal->vid = -1;
+
+	memcpy(internal->dmas, dmas, sizeof(struct dma_info) * 2 *
+	       RTE_MAX_QUEUES_PER_PORT);
+
+	data->dev_link = pmd_link;
+	data->dev_flags = RTE_ETH_DEV_INTR_LSC | RTE_ETH_DEV_CLOSE_REMOVE;
+
+	eth_dev->dev_ops = &ops;
+
+	/* assign rx and tx ops */
+	eth_dev->rx_pkt_burst = eth_vhost_rx;
+	eth_dev->tx_pkt_burst = eth_vhost_tx;
+
+	if (rte_vhost_driver_register(iface_name, flags))
+		goto error;
+
+	if (rte_vhost_driver_disable_features(iface_name,
+					      VHOST_DMA_UNSUPPORTED_FEATURES) <
+	    0)
+		goto error;
+
+	if (rte_vhost_driver_callback_register(iface_name, &vhost_ops) < 0) {
+		VHOST_LOG(ERR, "Can't register callbacks\n");
+		goto error;
+	}
+
+	if (rte_vhost_driver_start(iface_name) < 0) {
+		VHOST_LOG(ERR, "Failed to start driver for %s\n", iface_name);
+		goto error;
+	}
+
+	rte_eth_dev_probing_finish(eth_dev);
+	return data->port_id;
+
+error:
+	if (internal) {
+		free(internal->iface_name);
+		free(internal->dev_name);
+	}
+	rte_eth_dev_release_port(eth_dev);
+	rte_free(list);
+
+	return -1;
+}
+
+static inline int
+open_iface(const char *key __rte_unused, const char *value, void *extra_args)
+{
+	const char **iface_name = extra_args;
+
+	if (value == NULL)
+		return -1;
+
+	*iface_name = value;
+
+	return 0;
+}
+
+struct dma_info_input {
+	struct dma_info dmas[RTE_MAX_QUEUES_PER_PORT * 2];
+	uint16_t nr;
+};
+
+static inline int
+open_dma(const char *key __rte_unused, const char *value, void *extra_args)
+{
+	struct dma_info_input *dma_info = extra_args;
+	char *input = strndup(value, strlen(value) + 1);
+	char *addrs = input;
+	char *ptrs[2];
+	char *start, *end, *substr;
+	int64_t qid, vring_id;
+	struct rte_ioat_rawdev_config config;
+	struct rte_rawdev_info info = { .dev_private = &config };
+	char name[32];
+	int dev_id;
+	int ret = 0;
+
+	while (isblank(*addrs))
+		addrs++;
+	if (addrs == '\0') {
+		VHOST_LOG(ERR, "No input DMA addresses\n");
+		ret = -1;
+		goto out;
+	}
+
+	/* process single DMA device */
+	if (*addrs != '(') {
+		rte_strsplit(addrs, strlen(addrs), ptrs, 2, '@');
+
+		start = strstr(ptrs[0], "txq");
+		if (start == NULL) {
+			VHOST_LOG(ERR, "We only support DMA for TX "
+				  "queues currently\n");
+			ret = -1;
+			goto out;
+		}
+
+		start += 3;
+		qid = strtol(start, &end, 0);
+		if (end == start) {
+			VHOST_LOG(ERR, "No input queue ID\n");
+			ret = -1;
+			goto out;
+		}
+
+		vring_id = qid * 2 + VIRTIO_RXQ;
+		/* parse PCI address and check if the input DMA is supported */
+		if (rte_pci_addr_parse(ptrs[1],
+		    &dma_info->dmas[vring_id].addr) < 0) {
+			VHOST_LOG(ERR, "Invalid DMA address %s\n", ptrs[1]);
+			return -1;
+		}
+
+		rte_pci_device_name(&dma_info->dmas[vring_id].addr,
+				    name, sizeof(name));
+		dev_id = rte_rawdev_get_dev_id(name);
+		if (dev_id == (uint16_t)(-ENODEV) ||
+		    dev_id == (uint16_t)(-EINVAL)) {
+			VHOST_LOG(ERR, "Cannot find device %s.\n", name);
+			ret = -1;
+			goto out;
+		}
+
+		if (rte_rawdev_info_get(dev_id, &info) < 0 ||
+		    strstr(info.driver_name, "ioat") == NULL) {
+			VHOST_LOG(ERR, "The input device %s is invalid or "
+				  "it is not an I/OAT device\n", name);
+			ret = -1;
+			goto out;
+		}
+
+		dma_info->dmas[vring_id].dev_id = dev_id;
+		dma_info->dmas[vring_id].type = IOAT;
+		dma_info->dmas[vring_id].is_valid = true;
+		dma_info->nr++;
+		goto out;
+	}
+
+	/* process multiple DMA devices within bracket. */
+	addrs++;
+	substr = strtok(addrs, ";");
+	if (!substr) {
+		VHOST_LOG(ERR, "No input DMA addresse\n");
+		ret = -1;
+		goto out;
+	}
+
+	do {
+		rte_strsplit(substr, strlen(substr), ptrs, 2, '@');
+
+		start = strstr(ptrs[0], "txq");
+		if (start == NULL) {
+			VHOST_LOG(ERR, "We only support DMA acceleration for "
+				  "TX queues currently\n");
+			ret = -1;
+			goto out;
+		}
+
+		start += 3;
+		qid = strtol(start, &end, 0);
+		if (end == start) {
+			VHOST_LOG(ERR, "No input queue ID\n");
+			ret = -1;
+			goto out;
+		}
+
+		vring_id = qid * 2 + VIRTIO_RXQ;
+		/* parse PCI address and check if the input DMA is supported */
+		rte_pci_addr_parse(ptrs[1], &dma_info->dmas[vring_id].addr);
+		rte_pci_device_name(&dma_info->dmas[vring_id].addr,
+				    name, sizeof(name));
+		dev_id = rte_rawdev_get_dev_id(name);
+		if (dev_id == (uint16_t)(-ENODEV) ||
+		    dev_id == (uint16_t)(-EINVAL)) {
+			VHOST_LOG(ERR, "Cannot find device %s.\n", name);
+			ret = -1;
+			goto out;
+		}
+
+		if (rte_rawdev_info_get(dev_id, &info) < 0 ||
+		    strstr(info.driver_name, "ioat") == NULL) {
+			VHOST_LOG(ERR, "The input device %s is invalid or "
+				  "it is not an I/OAT device\n", name);
+			ret = -1;
+			goto out;
+		}
+
+		dma_info->dmas[vring_id].dev_id = dev_id;
+		dma_info->dmas[vring_id].type = IOAT;
+		dma_info->dmas[vring_id].is_valid = true;
+		dma_info->nr++;
+
+		substr = strtok(NULL, ";)");
+	} while (substr);
+
+out:
+	free(input);
+	return ret;
+}
+
+static inline int
+open_int(const char *key __rte_unused, const char *value, void *extra_args)
+{
+	uint16_t *n = extra_args;
+
+	if (value == NULL || extra_args == NULL)
+		return -EINVAL;
+
+	*n = (uint16_t)strtoul(value, NULL, 0);
+	if (*n == USHRT_MAX && errno == ERANGE)
+		return -1;
+
+	return 0;
+}
+
+static int
+rte_pmd_vhost_dma_probe(struct rte_vdev_device *dev)
+{
+	struct rte_kvargs *kvlist = NULL;
+	int ret = 0;
+	char *iface_name;
+	uint16_t queues;
+	uint64_t flags = 0;
+	int client_mode = 0;
+	struct rte_eth_dev *eth_dev;
+	const char *name = rte_vdev_device_name(dev);
+	struct dma_info_input dma_info = { 0 };
+
+	VHOST_LOG(INFO, "Initializing vHost DMA for %s\n", name);
+
+	if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
+		eth_dev = rte_eth_dev_attach_secondary(name);
+		if (!eth_dev) {
+			VHOST_LOG(ERR, "Failed to probe %s\n", name);
+			return -1;
+		}
+		/* TODO: request info from primary to set up Rx and Tx */
+		eth_dev->dev_ops = &ops;
+		eth_dev->device = &dev->device;
+		rte_eth_dev_probing_finish(eth_dev);
+		return 0;
+	}
+
+	kvlist = rte_kvargs_parse(rte_vdev_device_args(dev), valid_arguments);
+	if (kvlist == NULL)
+		return -1;
+
+	if (rte_kvargs_count(kvlist, ETH_VHOST_IFACE_ARG) == 1) {
+		ret = rte_kvargs_process(kvlist, ETH_VHOST_IFACE_ARG,
+					 &open_iface, &iface_name);
+		if (ret < 0)
+			goto out_free;
+	} else {
+		ret = -1;
+		goto out_free;
+	}
+
+	if (rte_kvargs_count(kvlist, ETH_VHOST_QUEUES_ARG) == 1) {
+		ret = rte_kvargs_process(kvlist, ETH_VHOST_QUEUES_ARG,
+					 &open_int, &queues);
+		if (ret < 0 || queues > RTE_MAX_QUEUES_PER_PORT)
+			goto out_free;
+
+	} else {
+		queues = 1;
+	}
+
+	if (rte_kvargs_count(kvlist, ETH_VHOST_CLIENT_ARG) == 1) {
+		ret = rte_kvargs_process(kvlist, ETH_VHOST_CLIENT_ARG,
+					 &open_int, &client_mode);
+		if (ret < 0)
+			goto out_free;
+
+		if (client_mode)
+			flags |= RTE_VHOST_USER_CLIENT;
+	}
+
+	if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_ARG) == 1) {
+		ret = rte_kvargs_process(kvlist, ETH_VHOST_DMA_ARG,
+					 &open_dma, &dma_info);
+		if (ret < 0)
+			goto out_free;
+
+		if (dma_info.nr > 0)
+			flags |= RTE_VHOST_USER_DMA_COPY;
+	}
+
+	/* vHost-DMA device is in the same NUMA node as the core. */
+	if (dev->device.numa_node == SOCKET_ID_ANY)
+		dev->device.numa_node = rte_socket_id();
+
+	eth_dev_vhost_create(dev, iface_name, queues, dev->device.numa_node,
+			     flags, dma_info.dmas);
+
+out_free:
+	rte_kvargs_free(kvlist);
+	return ret;
+}
+
+static int
+rte_pmd_vhost_dma_remove(struct rte_vdev_device *dev)
+{
+	const char *name;
+	struct rte_eth_dev *eth_dev = NULL;
+
+	name = rte_vdev_device_name(dev);
+	VHOST_LOG(INFO, "Un-Initializing pmd vhost-dma for %s\n", name);
+
+	/* find an ethdev entry */
+	eth_dev = rte_eth_dev_allocated(name);
+	if (eth_dev == NULL)
+		return 0;
+
+	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
+		return rte_eth_dev_release_port(eth_dev);
+
+	eth_dev_close(eth_dev);
+
+	rte_eth_dev_release_port(eth_dev);
+
+	return 0;
+}
+
+static struct rte_vdev_driver pmd_vhost_dma_drv = {
+	.probe = rte_pmd_vhost_dma_probe,
+	.remove = rte_pmd_vhost_dma_remove,
+};
+
+RTE_PMD_REGISTER_VDEV(net_dma_vhost, pmd_vhost_dma_drv);
+RTE_PMD_REGISTER_ALIAS(net_dma_vhost, dma_vhost);
+RTE_PMD_REGISTER_PARAM_STRING(net_dma_vhost,
+	"iface=<ifc> "
+	"queues=<int> "
+	"client=<0|1> "
+	"dmas=(txq0@addr0;txq1@addr1...)");
+
+RTE_INIT(vhost_dma_init_log)
+{
+	vhost_dma_logtype = rte_log_register("vhost_dma");
+	if (vhost_dma_logtype >= 0)
+		rte_log_set_level(vhost_dma_logtype, RTE_LOG_NOTICE);
+}
diff --git a/drivers/net/vhost_dma/eth_vhost.h b/drivers/net/vhost_dma/eth_vhost.h
new file mode 100644
index 0000000..9c37c4d
--- /dev/null
+++ b/drivers/net/vhost_dma/eth_vhost.h
@@ -0,0 +1,264 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+#ifndef _ETH_VHOST_DMA_H_
+#define _ETH_VHOST_DMA_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdint.h>
+#include <stdbool.h>
+
+#include <rte_pci.h>
+#include <rte_vhost.h>
+#include <rte_log.h>
+
+#ifndef VIRTIO_F_IOMMU_PLATFORM
+#define VIRTIO_F_IOMMU_PLATFORM 33
+#endif
+
+#ifndef VIRTIO_F_RING_PACKED
+#define VIRTIO_F_RING_PACKED 34
+#endif
+
+#define VHOST_DMA_UNSUPPORTED_FEATURES ((1ULL << VHOST_F_LOG_ALL)	| \
+		(1ULL << VIRTIO_F_IOMMU_PLATFORM)	| \
+		(1ULL << VIRTIO_F_RING_PACKED))
+
+#define VHOST_MAX_PKT_BURST 32
+
+/* batching size before a DMA kick */
+#define DMA_BATCHING_SIZE 8
+/**
+ * copy length threshold for the DMA engine. We offload copy jobs whose
+ * lengths are greater than DMA_COPY_LENGTH_THRESHOLD to the DMA; for
+ * small copies, we still use the CPU to perform copies, due to startup
+ * overheads associated with the DMA.
+ */
+#define DMA_COPY_LENGTH_THRESHOLD 1024
+
+extern int vhost_dma_logtype;
+
+#define VHOST_LOG(level, fmt, args...) rte_log(RTE_LOG_ ## level,	\
+		vhost_dma_logtype, "VHOST_DMA: " fmt, ## args)
+
+#define vhost_avail_event(vr) \
+	(*(volatile uint16_t*)&(vr)->used->ring[(vr)->size])
+#define vhost_used_event(vr) \
+	(*(volatile uint16_t*)&(vr)->avail->ring[(vr)->size])
+
+enum vhost_xstats_pkts {
+	VHOST_UNDERSIZE_PKT = 0,
+	VHOST_64_PKT,
+	VHOST_65_TO_127_PKT,
+	VHOST_128_TO_255_PKT,
+	VHOST_256_TO_511_PKT,
+	VHOST_512_TO_1023_PKT,
+	VHOST_1024_TO_1522_PKT,
+	VHOST_1523_TO_MAX_PKT,
+	VHOST_BROADCAST_PKT,
+	VHOST_MULTICAST_PKT,
+	VHOST_UNICAST_PKT,
+	VHOST_ERRORS_PKT,
+	VHOST_ERRORS_FRAGMENTED,
+	VHOST_ERRORS_JABBER,
+	VHOST_UNKNOWN_PROTOCOL,
+	VHOST_XSTATS_MAX,
+};
+
+struct vhost_stats {
+	uint64_t pkts;
+	uint64_t bytes;
+	uint64_t missed_pkts;
+	uint64_t xstats[VHOST_XSTATS_MAX];
+};
+
+struct batch_copy_elem {
+	void *dst;
+	void *src;
+	uint32_t len;
+};
+
+struct guest_page {
+	uint64_t guest_phys_addr;
+	uint64_t host_phys_addr;
+	uint64_t size;
+};
+
+enum dma_type {
+	IOAT = 1
+};
+
+struct dma_vring {
+	struct rte_vhost_vring  vr;
+
+	uint16_t last_avail_idx;
+	uint16_t last_used_idx;
+
+	/* the last used index that front end can consume */
+	uint16_t copy_done_used;
+
+	uint16_t signalled_used;
+	bool signalled_used_valid;
+
+	struct vring_used_elem *shadow_used_split;
+	uint16_t shadow_used_idx;
+
+	struct batch_copy_elem  *batch_copy_elems;
+	uint16_t batch_copy_nb_elems;
+
+	bool dma_enabled;
+	/**
+	 * DMA ID. Currently, we only support I/OAT,
+	 * so it's I/OAT rawdev ID.
+	 */
+	uint16_t dev_id;
+	/* DMA address */
+	struct rte_pci_addr dma_addr;
+	/**
+	 * the number of copy jobs submitted to the DMA but may not
+	 * be completed
+	 */
+	uint64_t nr_inflight;
+	int nr_batching;
+
+	/* host physical address of the index of used ring */
+	phys_addr_t used_idx_hpa;
+
+	struct ring_index *indices;
+	uint16_t max_indices;
+};
+
+struct vhost_queue {
+	int vid;
+	rte_atomic32_t allow_queuing;
+	rte_atomic32_t while_queuing;
+	struct pmd_internal *internal;
+	struct rte_mempool *mb_pool;
+	uint16_t port;
+	uint16_t virtqueue_id;
+	struct vhost_stats stats;
+	struct dma_vring *dma_vring;
+};
+
+struct dma_info {
+	struct rte_pci_addr addr;
+	uint16_t dev_id;
+	enum dma_type type;
+	bool is_valid;
+};
+
+struct pmd_internal {
+	rte_atomic32_t dev_attached;
+	rte_atomic32_t started;
+	char *dev_name;
+	char *iface_name;
+	uint16_t max_queues;
+	int vid;
+	uint8_t vlan_strip;
+
+	struct dma_info dmas[RTE_MAX_QUEUES_PER_PORT * 2];
+
+	/* guest's memory regions */
+	struct rte_vhost_memory *mem;
+	/* address mapping table of guest and host physical addresses */
+	struct guest_page *guest_pages;
+	uint32_t nr_guest_pages;
+	uint32_t max_guest_pages;
+
+	/* guest's vrings */
+	struct dma_vring dma_vrings[RTE_MAX_QUEUES_PER_PORT * 2];
+	size_t hdr_len;
+	/* the number of vrings */
+	uint16_t nr_vrings;
+	/* negotiated features */
+	uint64_t features;
+};
+
+static inline void
+vhost_enable_notify_split(struct pmd_internal *dev, struct dma_vring *dma_vr,
+			  int enable)
+{
+	struct rte_vhost_vring *vr = &dma_vr->vr;
+
+	if (!(dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX))) {
+		if (enable)
+			vr->used->flags &= ~VRING_USED_F_NO_NOTIFY;
+		else
+			vr->used->flags |= VRING_USED_F_NO_NOTIFY;
+	} else {
+		if (enable)
+			vhost_avail_event(vr) = dma_vr->last_avail_idx;
+	}
+}
+
+/* This function is to enable front end to notify backend. */
+static inline void
+vhost_dma_enable_guest_notification(struct pmd_internal *dev,
+				     struct dma_vring *dma_vr, int enable)
+{
+	vhost_enable_notify_split(dev, dma_vr, enable);
+}
+
+/**
+ * This function gets front end's memory and vrings information.
+ * In addition, it sets up necessary data structures for enqueue
+ * and dequeue operations.
+ */
+int vhost_dma_setup(struct pmd_internal *dev);
+
+/**
+ * This function destroys front end's information and frees data
+ * structures for enqueue and dequeue operations.
+ */
+void vhost_dma_remove(struct pmd_internal *dev);
+
+/**
+ * This function sends packet buffers to front end's RX vring.
+ * It will free the mbufs of successfully transmitted packets.
+ *
+ * @param dev
+ *  vhost-dma device
+ * @param dma_vr
+ *  a front end's RX vring
+ * @param pkts
+ *  packets to send
+ * @param count
+ *  the number of packets to send
+ *
+ * @return
+ *  the number of packets successfully sent
+ */
+uint16_t vhost_dma_enqueue_burst(struct pmd_internal *dev,
+				  struct dma_vring *dma_vr,
+				  struct rte_mbuf **pkts, uint32_t count);
+
+/**
+ * This function gets packet buffers from front end's TX virtqueue.
+ *
+ * @param dev
+ *  vhost-dma device
+ * @param dma_vr
+ *  a front-end's TX vring
+ * @param mbuf_pool
+ *  mempool for allocating mbufs for received packets
+ * @param pkts
+ *  pointer array used to keep addresses of received packets
+ * @param count
+ *  the element number in 'pkts'
+ *
+ * @return
+ *  the number of packets successfully received
+ */
+uint16_t vhost_dma_dequeue_burst(struct pmd_internal *dev,
+				  struct dma_vring *dma_vr,
+				  struct rte_mempool *mbuf_pool,
+				  struct rte_mbuf **pkts, uint16_t count);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _ETH_VHOST_DMA_H_ */
diff --git a/drivers/net/vhost_dma/internal.h b/drivers/net/vhost_dma/internal.h
new file mode 100644
index 0000000..08591b3
--- /dev/null
+++ b/drivers/net/vhost_dma/internal.h
@@ -0,0 +1,225 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+#ifndef _INTERNAL_H_
+#define _INTERNAL_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include "eth_vhost.h"
+
+struct buf_vector {
+	uint64_t buf_iova;
+	uint64_t buf_addr;
+	uint32_t buf_len;
+	uint32_t desc_idx;
+};
+
+#define BUF_VECTOR_MAX 256
+
+struct ring_index {
+	/* physical address of 'data' */
+	uintptr_t pa;
+	uintptr_t idx;
+	uint16_t data;
+	bool in_use;
+} __rte_cache_aligned;
+
+static __rte_always_inline int
+setup_indices(struct ring_index **indices, uint16_t num)
+{
+	struct ring_index *array;
+	uint16_t i;
+
+	array = rte_zmalloc(NULL, sizeof(struct ring_index) * num, 0);
+	if (!array) {
+		printf("Init indices failed\n");
+		*indices = NULL;
+		return -1;
+	}
+
+	for (i = 0; i < num; i++) {
+		array[i].pa = rte_mem_virt2iova(&array[i].data);
+		array[i].idx = i;
+	}
+
+	*indices = array;
+	return 0;
+}
+
+static __rte_always_inline void
+destroy_indices(struct ring_index **indices)
+{
+	if (!indices)
+		return;
+	rte_free(*indices);
+	*indices = NULL;
+}
+
+static __rte_always_inline struct ring_index *
+get_empty_indices(struct ring_index *indices, uint16_t num)
+{
+	uint16_t i;
+
+	for (i = 0; i < num; i++) {
+		if (!indices[i].in_use)
+			break;
+	}
+
+	if (unlikely(i == num))
+		return NULL;
+
+	indices[i].in_use = true;
+	return &indices[i];
+}
+
+static __rte_always_inline void
+put_used_indices(struct ring_index *indices, uint16_t idx)
+{
+	indices[idx].in_use = false;
+}
+
+static uint64_t
+get_blk_size(int fd)
+{
+	struct stat stat;
+	int ret;
+
+	ret = fstat(fd, &stat);
+	return ret == -1 ? (uint64_t)-1 : (uint64_t)stat.st_blksize;
+}
+
+static __rte_always_inline int
+add_one_guest_page(struct pmd_internal *dev, uint64_t guest_phys_addr,
+		   uint64_t host_phys_addr, uint64_t size)
+{
+	struct guest_page *page, *last_page;
+	struct guest_page *old_pages;
+
+	if (dev->nr_guest_pages == dev->max_guest_pages) {
+		dev->max_guest_pages *= 2;
+		old_pages = dev->guest_pages;
+		dev->guest_pages = realloc(dev->guest_pages,
+					   dev->max_guest_pages *
+					   sizeof(*page));
+		if (!dev->guest_pages) {
+			VHOST_LOG(ERR, "Cannot realloc guest_pages\n");
+			free(old_pages);
+			return -1;
+		}
+	}
+
+	if (dev->nr_guest_pages > 0) {
+		last_page = &dev->guest_pages[dev->nr_guest_pages - 1];
+		/* merge if the two pages are continuous */
+		if (host_phys_addr == last_page->host_phys_addr +
+		    last_page->size) {
+			last_page->size += size;
+			return 0;
+		}
+	}
+
+	page = &dev->guest_pages[dev->nr_guest_pages++];
+	page->guest_phys_addr = guest_phys_addr;
+	page->host_phys_addr  = host_phys_addr;
+	page->size = size;
+
+	return 0;
+}
+
+static __rte_always_inline int
+add_guest_page(struct pmd_internal *dev, struct rte_vhost_mem_region *reg)
+{
+	uint64_t reg_size = reg->size;
+	uint64_t host_user_addr  = reg->host_user_addr;
+	uint64_t guest_phys_addr = reg->guest_phys_addr;
+	uint64_t host_phys_addr;
+	uint64_t size, page_size;
+
+	page_size = get_blk_size(reg->fd);
+	if (page_size == (uint64_t)-1) {
+		VHOST_LOG(ERR, "Cannot get hugepage size through fstat\n");
+		return -1;
+	}
+
+	host_phys_addr = rte_mem_virt2iova((void *)(uintptr_t)host_user_addr);
+	size = page_size - (guest_phys_addr & (page_size - 1));
+	size = RTE_MIN(size, reg_size);
+
+	if (add_one_guest_page(dev, guest_phys_addr, host_phys_addr, size) < 0)
+		return -1;
+
+	host_user_addr  += size;
+	guest_phys_addr += size;
+	reg_size -= size;
+
+	while (reg_size > 0) {
+		size = RTE_MIN(reg_size, page_size);
+		host_phys_addr = rte_mem_virt2iova((void *)(uintptr_t)
+						   host_user_addr);
+		if (add_one_guest_page(dev, guest_phys_addr, host_phys_addr,
+				       size) < 0)
+			return -1;
+
+		host_user_addr  += size;
+		guest_phys_addr += size;
+		reg_size -= size;
+	}
+
+	return 0;
+}
+
+static __rte_always_inline int
+setup_guest_pages(struct pmd_internal *dev, struct rte_vhost_memory *mem)
+{
+	uint32_t nr_regions = mem->nregions;
+	uint32_t i;
+
+	dev->nr_guest_pages = 0;
+	dev->max_guest_pages = 8;
+
+	dev->guest_pages = malloc(dev->max_guest_pages *
+			sizeof(struct guest_page));
+	if (dev->guest_pages == NULL) {
+		VHOST_LOG(ERR, "(%d) failed to allocate memory "
+			  "for dev->guest_pages\n", dev->vid);
+		return -1;
+	}
+
+	for (i = 0; i < nr_regions; i++) {
+		if (add_guest_page(dev, &mem->regions[i]) < 0)
+			return -1;
+	}
+	return 0;
+}
+
+static __rte_always_inline rte_iova_t
+gpa_to_hpa(struct pmd_internal *dev, uint64_t gpa, uint64_t size)
+{
+	uint32_t i;
+	struct guest_page *page;
+
+	for (i = 0; i < dev->nr_guest_pages; i++) {
+		page = &dev->guest_pages[i];
+
+		if (gpa >= page->guest_phys_addr &&
+		    gpa + size < page->guest_phys_addr + page->size) {
+			return gpa - page->guest_phys_addr +
+			       page->host_phys_addr;
+		}
+	}
+
+	return 0;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _INTERNAL_H_ */
diff --git a/drivers/net/vhost_dma/rte_pmd_vhost_dma_version.map b/drivers/net/vhost_dma/rte_pmd_vhost_dma_version.map
new file mode 100644
index 0000000..ff357af
--- /dev/null
+++ b/drivers/net/vhost_dma/rte_pmd_vhost_dma_version.map
@@ -0,0 +1,4 @@ 
+DPDK_20.02 {
+
+	local: *;
+};
diff --git a/drivers/net/vhost_dma/virtio_net.c b/drivers/net/vhost_dma/virtio_net.c
new file mode 100644
index 0000000..7e59450
--- /dev/null
+++ b/drivers/net/vhost_dma/virtio_net.c
@@ -0,0 +1,1234 @@ 
+#include <stdint.h>
+#include <stdbool.h>
+#include <linux/virtio_net.h>
+
+#include <rte_mbuf.h>
+#include <rte_malloc.h>
+#include <rte_memcpy.h>
+#include <rte_ethdev.h>
+#include <rte_vhost.h>
+#include <rte_rawdev.h>
+#include <rte_ioat_rawdev.h>
+#include <rte_log.h>
+
+#include "internal.h"
+
+#define MAX_BATCH_LEN 256
+
+static __rte_always_inline bool
+vq_is_packed(struct pmd_internal *dev)
+{
+	return dev->features & (1ull << VIRTIO_F_RING_PACKED);
+}
+
+static __rte_always_inline int
+vhost_need_event(uint16_t event_idx, uint16_t new_idx, uint16_t old)
+{
+	return (uint16_t)(new_idx - event_idx - 1) < (uint16_t)(new_idx - old);
+}
+
+static __rte_always_inline void
+vhost_vring_call_split(struct pmd_internal *dev, struct dma_vring *dma_vr)
+{
+	struct rte_vhost_vring *vr = &dma_vr->vr;
+
+	/* flush used->idx update before we read avail->flags. */
+	rte_smp_mb();
+
+	if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
+		uint16_t old = dma_vr->signalled_used;
+		uint16_t new = dma_vr->copy_done_used;
+		bool signalled_used_valid = dma_vr->signalled_used_valid;
+
+		dma_vr->signalled_used = new;
+		dma_vr->signalled_used_valid = true;
+
+		VHOST_LOG(DEBUG, "%s: used_event_idx=%d, old=%d, new=%d\n",
+			  __func__, vhost_used_event(vr), old, new);
+
+		if ((vhost_need_event(vhost_used_event(vr), new, old) &&
+		     (vr->callfd >= 0)) || unlikely(!signalled_used_valid))
+			eventfd_write(vr->callfd, (eventfd_t)1);
+	} else {
+		if (!(vr->avail->flags & VRING_AVAIL_F_NO_INTERRUPT) &&
+		    (vr->callfd >= 0))
+			eventfd_write(vr->callfd, (eventfd_t)1);
+	}
+}
+
+/* notify front-end of enqueued packets */
+static __rte_always_inline void
+vhost_dma_vring_call(struct pmd_internal *dev, struct dma_vring *dma_vr)
+{
+	vhost_vring_call_split(dev, dma_vr);
+}
+
+static int
+process_dma_completed(struct pmd_internal *dev, struct dma_vring *dma_vr)
+{
+	uintptr_t flags[255], tmps[255];
+	int dma_done, i;
+	uint16_t used_idx;
+
+	dma_done = rte_ioat_completed_copies(dma_vr->dev_id, 255, flags,
+					     tmps);
+	if (unlikely(dma_done <= 0))
+		return dma_done;
+
+	dma_vr->nr_inflight -= dma_done;
+	for (i = 0; i < dma_done; i++) {
+		if ((uint64_t)flags[i] >= dma_vr->max_indices) {
+			/* the DMA finishes a packet copy job. */
+			struct rte_mbuf *pkt = (struct rte_mbuf *)flags[i];
+
+			rte_mbuf_refcnt_update(pkt, -1);
+			if (rte_mbuf_refcnt_read(pkt) == 1)
+				rte_pktmbuf_free(pkt);
+		} else {
+			/**
+			 * the DMA finishes updating index of the
+			 * used ring.
+			 */
+			uint16_t id = flags[i];
+
+			used_idx = dma_vr->indices[id].data;
+			VHOST_LOG(DEBUG, "The DMA finishes updating index %u "
+				  "for the used ring.\n", used_idx);
+
+			dma_vr->copy_done_used = used_idx;
+			vhost_dma_vring_call(dev, dma_vr);
+			put_used_indices(dma_vr->indices, id);
+		}
+	}
+	return dma_done;
+}
+
+static  __rte_always_inline bool
+rxvq_is_mergeable(struct pmd_internal *dev)
+{
+	return dev->features & (1ULL << VIRTIO_NET_F_MRG_RXBUF);
+}
+
+static __rte_always_inline void
+do_flush_shadow_used_ring_split(struct dma_vring *dma_vr, uint16_t to,
+				uint16_t from, uint16_t size)
+{
+	rte_memcpy(&dma_vr->vr.used->ring[to],
+		   &dma_vr->shadow_used_split[from],
+		   size * sizeof(struct vring_used_elem));
+}
+
+static __rte_always_inline void
+flush_shadow_used_ring_split(struct pmd_internal *dev,
+			     struct dma_vring *dma_vr)
+{
+	uint16_t used_idx = dma_vr->last_used_idx & (dma_vr->vr.size - 1);
+
+	if (used_idx + dma_vr->shadow_used_idx <= dma_vr->vr.size) {
+		do_flush_shadow_used_ring_split(dma_vr, used_idx, 0,
+						dma_vr->shadow_used_idx);
+	} else {
+		uint16_t size;
+
+		/* update used ring interval [used_idx, vr->size] */
+		size = dma_vr->vr.size - used_idx;
+		do_flush_shadow_used_ring_split(dma_vr, used_idx, 0, size);
+
+		/* update the left half used ring interval [0, left_size] */
+		do_flush_shadow_used_ring_split(dma_vr, 0, size,
+						dma_vr->shadow_used_idx -
+						size);
+	}
+	dma_vr->last_used_idx += dma_vr->shadow_used_idx;
+
+	rte_smp_wmb();
+
+	if (dma_vr->dma_enabled && dma_vr->nr_inflight > 0) {
+		struct ring_index *index;
+
+		index = get_empty_indices(dma_vr->indices,
+					  dma_vr->max_indices);
+		index->data = dma_vr->last_used_idx;
+		while (unlikely(rte_ioat_enqueue_copy(dma_vr->dev_id,
+						      index->pa,
+						      dma_vr->used_idx_hpa,
+						      sizeof(uint16_t),
+						      index->idx, 0, 0) ==
+				0)) {
+			int ret;
+
+			do {
+				ret = process_dma_completed(dev, dma_vr);
+			} while (ret <= 0);
+		}
+		dma_vr->nr_batching++;
+		dma_vr->nr_inflight++;
+	} else {
+		/**
+		 * we update index of used ring when all previous copy
+		 * jobs are completed.
+		 *
+		 * without enabling DMA copy, the CPU performs all memory
+		 * copy operations. In this case, the CPU is in charge of
+		 * updating the index of used ring.
+		 *
+		 * with enabling DMA copy, if there are outstanding copy
+		 * jobs of the DMA, to avoid the DMA overwriting the
+		 * write of the CPU, the DMA is in charge of updating
+		 * the index of used ring.
+		 */
+		*(volatile uint16_t *)&dma_vr->vr.used->idx +=
+			dma_vr->shadow_used_idx;
+		dma_vr->copy_done_used += dma_vr->shadow_used_idx;
+	}
+
+	dma_vr->shadow_used_idx = 0;
+}
+
+static __rte_always_inline void
+update_shadow_used_ring_split(struct dma_vring *dma_vr,
+			      uint16_t desc_idx, uint32_t len)
+{
+	uint16_t i = dma_vr->shadow_used_idx++;
+
+	dma_vr->shadow_used_split[i].id  = desc_idx;
+	dma_vr->shadow_used_split[i].len = len;
+}
+
+static inline void
+do_data_copy(struct dma_vring *dma_vr)
+{
+	struct batch_copy_elem *elem = dma_vr->batch_copy_elems;
+	uint16_t count = dma_vr->batch_copy_nb_elems;
+	int i;
+
+	for (i = 0; i < count; i++)
+		rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
+
+	dma_vr->batch_copy_nb_elems = 0;
+}
+
+#define ASSIGN_UNLESS_EQUAL(var, val) do {	\
+	if ((var) != (val))			\
+		(var) = (val);			\
+} while (0)
+
+static __rte_always_inline void
+virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
+{
+	uint64_t csum_l4 = m_buf->ol_flags & PKT_TX_L4_MASK;
+
+	if (m_buf->ol_flags & PKT_TX_TCP_SEG)
+		csum_l4 |= PKT_TX_TCP_CKSUM;
+
+	if (csum_l4) {
+		net_hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM;
+		net_hdr->csum_start = m_buf->l2_len + m_buf->l3_len;
+
+		switch (csum_l4) {
+		case PKT_TX_TCP_CKSUM:
+			net_hdr->csum_offset = (offsetof(struct rte_tcp_hdr,
+						cksum));
+			break;
+		case PKT_TX_UDP_CKSUM:
+			net_hdr->csum_offset = (offsetof(struct rte_udp_hdr,
+						dgram_cksum));
+			break;
+		case PKT_TX_SCTP_CKSUM:
+			net_hdr->csum_offset = (offsetof(struct rte_sctp_hdr,
+						cksum));
+			break;
+		}
+	} else {
+		ASSIGN_UNLESS_EQUAL(net_hdr->csum_start, 0);
+		ASSIGN_UNLESS_EQUAL(net_hdr->csum_offset, 0);
+		ASSIGN_UNLESS_EQUAL(net_hdr->flags, 0);
+	}
+
+	/* IP cksum verification cannot be bypassed, then calculate here */
+	if (m_buf->ol_flags & PKT_TX_IP_CKSUM) {
+		struct rte_ipv4_hdr *ipv4_hdr;
+
+		ipv4_hdr = rte_pktmbuf_mtod_offset(m_buf, struct rte_ipv4_hdr *,
+						   m_buf->l2_len);
+		ipv4_hdr->hdr_checksum = rte_ipv4_cksum(ipv4_hdr);
+	}
+
+	if (m_buf->ol_flags & PKT_TX_TCP_SEG) {
+		if (m_buf->ol_flags & PKT_TX_IPV4)
+			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV4;
+		else
+			net_hdr->gso_type = VIRTIO_NET_HDR_GSO_TCPV6;
+		net_hdr->gso_size = m_buf->tso_segsz;
+		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len
+					+ m_buf->l4_len;
+	} else if (m_buf->ol_flags & PKT_TX_UDP_SEG) {
+		net_hdr->gso_type = VIRTIO_NET_HDR_GSO_UDP;
+		net_hdr->gso_size = m_buf->tso_segsz;
+		net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len +
+			m_buf->l4_len;
+	} else {
+		ASSIGN_UNLESS_EQUAL(net_hdr->gso_type, 0);
+		ASSIGN_UNLESS_EQUAL(net_hdr->gso_size, 0);
+		ASSIGN_UNLESS_EQUAL(net_hdr->hdr_len, 0);
+	}
+}
+
+static __rte_always_inline void *
+vhost_alloc_copy_ind_table(struct pmd_internal *dev, uint64_t desc_addr,
+			   uint64_t desc_len)
+{
+	void *idesc;
+	uint64_t src, dst;
+	uint64_t len, remain = desc_len;
+
+	idesc = rte_malloc(NULL, desc_len, 0);
+	if (unlikely(!idesc))
+		return NULL;
+
+	dst = (uint64_t)(uintptr_t)idesc;
+
+	while (remain) {
+		len = remain;
+		src = rte_vhost_va_from_guest_pa(dev->mem, desc_addr, &len);
+		if (unlikely(!src || !len)) {
+			rte_free(idesc);
+			return NULL;
+		}
+
+		rte_memcpy((void *)(uintptr_t)dst, (void *)(uintptr_t)src,
+			   len);
+
+		remain -= len;
+		dst += len;
+		desc_addr += len;
+	}
+
+	return idesc;
+}
+
+static __rte_always_inline void
+free_ind_table(void *idesc)
+{
+	rte_free(idesc);
+}
+
+static __rte_always_inline int
+map_one_desc(struct pmd_internal *dev, struct buf_vector *buf_vec,
+	     uint16_t *vec_idx, uint64_t desc_iova, uint64_t desc_len)
+{
+	uint16_t vec_id = *vec_idx;
+
+	while (desc_len) {
+		uint64_t desc_addr;
+		uint64_t desc_chunck_len = desc_len;
+
+		if (unlikely(vec_id >= BUF_VECTOR_MAX))
+			return -1;
+
+		desc_addr = rte_vhost_va_from_guest_pa(dev->mem, desc_iova,
+						       &desc_chunck_len);
+		if (unlikely(!desc_addr))
+			return -1;
+
+		rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+		buf_vec[vec_id].buf_iova = desc_iova;
+		buf_vec[vec_id].buf_addr = desc_addr;
+		buf_vec[vec_id].buf_len  = desc_chunck_len;
+
+		desc_len -= desc_chunck_len;
+		desc_iova += desc_chunck_len;
+		vec_id++;
+	}
+	*vec_idx = vec_id;
+
+	return 0;
+}
+
+static __rte_always_inline int
+fill_vec_buf_split(struct pmd_internal *dev, struct dma_vring *dma_vr,
+		   uint32_t avail_idx, uint16_t *vec_idx,
+		   struct buf_vector *buf_vec, uint16_t *desc_chain_head,
+		   uint32_t *desc_chain_len)
+{
+	struct rte_vhost_vring *vr = &dma_vr->vr;
+	uint16_t idx = vr->avail->ring[avail_idx & (vr->size - 1)];
+	uint16_t vec_id = *vec_idx;
+	uint32_t len    = 0;
+	uint64_t dlen;
+	uint32_t nr_descs = vr->size;
+	uint32_t cnt    = 0;
+	struct vring_desc *descs = vr->desc;
+	struct vring_desc *idesc = NULL;
+
+	if (unlikely(idx >= vr->size))
+		return -1;
+
+	*desc_chain_head = idx;
+
+	if (vr->desc[idx].flags & VRING_DESC_F_INDIRECT) {
+		dlen = vr->desc[idx].len;
+		nr_descs = dlen / sizeof(struct vring_desc);
+		if (unlikely(nr_descs > vr->size))
+			return -1;
+
+		descs = (struct vring_desc *)(uintptr_t)
+			rte_vhost_va_from_guest_pa(dev->mem,
+						   vr->desc[idx].addr, &dlen);
+		if (unlikely(!descs))
+			return -1;
+
+		if (unlikely(dlen < vr->desc[idx].len)) {
+			/**
+			 * the indirect desc table is not contiguous
+			 * in process VA space, we have to copy it.
+			 */
+			idesc = vhost_alloc_copy_ind_table(dev,
+							   vr->desc[idx].addr,
+							   vr->desc[idx].len);
+			if (unlikely(!idesc))
+				return -1;
+
+			descs = idesc;
+		}
+
+		idx = 0;
+	}
+
+	while (1) {
+		if (unlikely(idx >= nr_descs || cnt++ >= nr_descs)) {
+			free_ind_table(idesc);
+			return -1;
+		}
+
+		len += descs[idx].len;
+
+		if (unlikely(map_one_desc(dev, buf_vec, &vec_id,
+					  descs[idx].addr, descs[idx].len))) {
+			free_ind_table(idesc);
+			return -1;
+		}
+
+		if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0)
+			break;
+
+		idx = descs[idx].next;
+	}
+
+	*desc_chain_len = len;
+	*vec_idx = vec_id;
+
+	if (unlikely(!!idesc))
+		free_ind_table(idesc);
+
+	return 0;
+}
+
+static inline int
+reserve_avail_buf_split(struct pmd_internal *dev, struct dma_vring *dma_vr,
+			uint32_t size, struct buf_vector *buf_vec,
+			uint16_t *num_buffers, uint16_t avail_head,
+			uint16_t *nr_vec)
+{
+	struct rte_vhost_vring *vr = &dma_vr->vr;
+
+	uint16_t cur_idx;
+	uint16_t vec_idx = 0;
+	uint16_t max_tries, tries = 0;
+	uint16_t head_idx = 0;
+	uint32_t len = 0;
+
+	*num_buffers = 0;
+	cur_idx  = dma_vr->last_avail_idx;
+
+	if (rxvq_is_mergeable(dev))
+		max_tries = vr->size - 1;
+	else
+		max_tries = 1;
+
+	while (size > 0) {
+		if (unlikely(cur_idx == avail_head))
+			return -1;
+		/**
+		 * if we tried all available ring items, and still
+		 * can't get enough buf, it means something abnormal
+		 * happened.
+		 */
+		if (unlikely(++tries > max_tries))
+			return -1;
+
+		if (unlikely(fill_vec_buf_split(dev, dma_vr, cur_idx,
+						&vec_idx, buf_vec,
+						&head_idx, &len) < 0))
+			return -1;
+		len = RTE_MIN(len, size);
+		update_shadow_used_ring_split(dma_vr, head_idx, len);
+		size -= len;
+
+		cur_idx++;
+		*num_buffers += 1;
+	}
+
+	*nr_vec = vec_idx;
+
+	return 0;
+}
+
+static __rte_noinline void
+copy_vnet_hdr_to_desc(struct pmd_internal *dev, struct buf_vector *buf_vec,
+		      struct virtio_net_hdr_mrg_rxbuf *hdr)
+{
+	uint64_t len;
+	uint64_t remain = dev->hdr_len;
+	uint64_t src = (uint64_t)(uintptr_t)hdr, dst;
+	uint64_t iova = buf_vec->buf_iova;
+
+	while (remain) {
+		len = RTE_MIN(remain,
+			      buf_vec->buf_len);
+		dst = buf_vec->buf_addr;
+		rte_memcpy((void *)(uintptr_t)dst, (void *)(uintptr_t)src,
+			   len);
+
+		remain -= len;
+		iova += len;
+		src += len;
+		buf_vec++;
+	}
+}
+
+static __rte_always_inline int
+copy_mbuf_to_desc(struct pmd_internal *dev, struct dma_vring *dma_vr,
+		  struct rte_mbuf *m, struct buf_vector *buf_vec,
+		  uint16_t nr_vec, uint16_t num_buffers, bool *copy_done)
+{
+	uint32_t vec_idx = 0;
+	uint32_t mbuf_offset, mbuf_avail;
+	uint32_t buf_offset, buf_avail;
+	uint64_t buf_addr, buf_iova, buf_len;
+	uint32_t cpy_len;
+	uint64_t hdr_addr;
+	struct rte_mbuf *hdr_mbuf;
+	struct batch_copy_elem *batch_copy = dma_vr->batch_copy_elems;
+	struct virtio_net_hdr_mrg_rxbuf tmp_hdr, *hdr = NULL;
+	uint64_t dst, src;
+	int error = 0;
+
+	if (unlikely(m == NULL)) {
+		error = -1;
+		goto out;
+	}
+
+	*copy_done = true;
+
+	buf_addr = buf_vec[vec_idx].buf_addr;
+	buf_iova = buf_vec[vec_idx].buf_iova;
+	buf_len = buf_vec[vec_idx].buf_len;
+
+	if (unlikely(buf_len < dev->hdr_len && nr_vec <= 1)) {
+		error = -1;
+		goto out;
+	}
+
+	hdr_mbuf = m;
+	hdr_addr = buf_addr;
+	if (unlikely(buf_len < dev->hdr_len))
+		hdr = &tmp_hdr;
+	else
+		hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)hdr_addr;
+
+	VHOST_LOG(DEBUG, "(%d) RX: num merge buffers %d\n", dev->vid,
+		  num_buffers);
+
+	if (unlikely(buf_len < dev->hdr_len)) {
+		buf_offset = dev->hdr_len - buf_len;
+		vec_idx++;
+		buf_addr = buf_vec[vec_idx].buf_addr;
+		buf_iova = buf_vec[vec_idx].buf_iova;
+		buf_len = buf_vec[vec_idx].buf_len;
+		buf_avail = buf_len - buf_offset;
+	} else {
+		buf_offset = dev->hdr_len;
+		buf_avail = buf_len - dev->hdr_len;
+	}
+
+	mbuf_avail  = rte_pktmbuf_data_len(m);
+	mbuf_offset = 0;
+	while (mbuf_avail != 0 || m->next != NULL) {
+		bool dma_copy = false;
+
+		/* done with current buf, get the next one */
+		if (buf_avail == 0) {
+			vec_idx++;
+			if (unlikely(vec_idx >= nr_vec)) {
+				error = -1;
+				goto out;
+			}
+
+			buf_addr = buf_vec[vec_idx].buf_addr;
+			buf_iova = buf_vec[vec_idx].buf_iova;
+			buf_len = buf_vec[vec_idx].buf_len;
+
+			buf_offset = 0;
+			buf_avail  = buf_len;
+		}
+
+		/* done with current mbuf, get the next one */
+		if (mbuf_avail == 0) {
+			m = m->next;
+
+			mbuf_offset = 0;
+			mbuf_avail  = rte_pktmbuf_data_len(m);
+		}
+
+		if (hdr_addr) {
+			virtio_enqueue_offload(hdr_mbuf, &hdr->hdr);
+			if (rxvq_is_mergeable(dev))
+				ASSIGN_UNLESS_EQUAL(hdr->num_buffers,
+						    num_buffers);
+
+			if (unlikely(hdr == &tmp_hdr))
+				copy_vnet_hdr_to_desc(dev, buf_vec, hdr);
+			hdr_addr = 0;
+		}
+
+		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
+
+		if (dma_vr->dma_enabled && cpy_len >=
+		    DMA_COPY_LENGTH_THRESHOLD) {
+			dst = gpa_to_hpa(dev, buf_iova + buf_offset, cpy_len);
+			dma_copy = (dst != 0);
+		}
+
+		if (dma_copy) {
+			src = rte_pktmbuf_iova_offset(m, mbuf_offset);
+
+			/**
+			 * if DMA enqueue fails, we wait until there are
+			 * available DMA descriptors.
+			 */
+			while (unlikely(rte_ioat_enqueue_copy(dma_vr->dev_id,
+							      src, dst, cpy_len,
+							      (uintptr_t)
+							      hdr_mbuf, 0, 0) ==
+					0)) {
+				int ret;
+
+				do {
+					ret = process_dma_completed(dev,
+								    dma_vr);
+				} while (ret <= 0);
+			}
+
+			dma_vr->nr_batching++;
+			dma_vr->nr_inflight++;
+			rte_mbuf_refcnt_update(hdr_mbuf, 1);
+			*copy_done = false;
+		} else if (likely(cpy_len > MAX_BATCH_LEN ||
+				  dma_vr->batch_copy_nb_elems >=
+				  dma_vr->vr.size)) {
+			rte_memcpy((void *)((uintptr_t)(buf_addr + buf_offset)),
+				   rte_pktmbuf_mtod_offset(m, void *,
+							   mbuf_offset),
+				   cpy_len);
+		} else {
+			batch_copy[dma_vr->batch_copy_nb_elems].dst =
+				(void *)((uintptr_t)(buf_addr + buf_offset));
+			batch_copy[dma_vr->batch_copy_nb_elems].src =
+				rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
+			batch_copy[dma_vr->batch_copy_nb_elems].len = cpy_len;
+			dma_vr->batch_copy_nb_elems++;
+		}
+
+		mbuf_avail  -= cpy_len;
+		mbuf_offset += cpy_len;
+		buf_avail  -= cpy_len;
+		buf_offset += cpy_len;
+	}
+
+out:
+	return error;
+}
+
+static __rte_always_inline uint16_t
+vhost_dma_enqueue_split(struct pmd_internal *dev, struct dma_vring *dma_vr,
+			 struct rte_mbuf **pkts, uint32_t count)
+{
+	struct rte_vhost_vring *vr = &dma_vr->vr;
+
+	uint32_t pkt_idx = 0;
+	uint16_t num_buffers;
+	struct buf_vector buf_vec[BUF_VECTOR_MAX];
+	uint16_t avail_head;
+
+	struct rte_mbuf *done_pkts[VHOST_MAX_PKT_BURST];
+	uint32_t i, nr_done = 0;
+	bool copy_done;
+
+	if (dma_vr->dma_enabled && dma_vr->nr_inflight > 0)
+		process_dma_completed(dev, dma_vr);
+
+	avail_head = *((volatile uint16_t *)&vr->avail->idx);
+
+	/**
+	 * the ordering between avail index and
+	 * desc reads needs to be enforced.
+	 */
+	rte_smp_rmb();
+
+	rte_prefetch0(&vr->avail->ring[dma_vr->last_avail_idx &
+			(vr->size - 1)]);
+
+	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
+		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->hdr_len;
+		uint16_t nr_vec = 0;
+
+		if (unlikely(reserve_avail_buf_split(dev, dma_vr, pkt_len,
+						     buf_vec, &num_buffers,
+						     avail_head, &nr_vec) <
+			     0)) {
+			VHOST_LOG(INFO,
+				  "(%d) failed to get enough desc from vring\n",
+				  dev->vid);
+			dma_vr->shadow_used_idx -= num_buffers;
+			break;
+		}
+
+		VHOST_LOG(DEBUG, "(%d) current index %d | end index %d\n",
+			  dev->vid, dma_vr->last_avail_idx,
+			  dma_vr->last_avail_idx + num_buffers);
+
+		if (copy_mbuf_to_desc(dev, dma_vr, pkts[pkt_idx],
+				      buf_vec, nr_vec, num_buffers,
+				      &copy_done) < 0) {
+			dma_vr->shadow_used_idx -= num_buffers;
+			break;
+		}
+
+		if (copy_done)
+			done_pkts[nr_done++] = pkts[pkt_idx];
+
+		if (dma_vr->dma_enabled &&
+		    unlikely(dma_vr->nr_batching >= DMA_BATCHING_SIZE)) {
+			/**
+			 * kick the DMA to do copy once the number of
+			 * batching jobs reaches the batching threshold.
+			 */
+			rte_ioat_do_copies(dma_vr->dev_id);
+			dma_vr->nr_batching = 0;
+		}
+
+		dma_vr->last_avail_idx += num_buffers;
+	}
+
+	do_data_copy(dma_vr);
+
+	if (dma_vr->shadow_used_idx) {
+		flush_shadow_used_ring_split(dev, dma_vr);
+		vhost_dma_vring_call(dev, dma_vr);
+	}
+
+	if (dma_vr->dma_enabled && dma_vr->nr_batching > 0) {
+		rte_ioat_do_copies(dma_vr->dev_id);
+		dma_vr->nr_batching = 0;
+	}
+
+	/* free copy-done packets */
+	for (i = 0; i < nr_done; i++)
+		rte_pktmbuf_free(done_pkts[i]);
+
+	return pkt_idx;
+}
+
+uint16_t
+vhost_dma_enqueue_burst(struct pmd_internal *dev, struct dma_vring *dma_vr,
+			 struct rte_mbuf **pkts, uint32_t count)
+{
+	return vhost_dma_enqueue_split(dev, dma_vr, pkts, count);
+}
+
+static inline bool
+virtio_net_with_host_offload(struct pmd_internal *dev)
+{
+	if (dev->features &
+			((1ULL << VIRTIO_NET_F_CSUM) |
+			 (1ULL << VIRTIO_NET_F_HOST_ECN) |
+			 (1ULL << VIRTIO_NET_F_HOST_TSO4) |
+			 (1ULL << VIRTIO_NET_F_HOST_TSO6) |
+			 (1ULL << VIRTIO_NET_F_HOST_UFO)))
+		return true;
+
+	return false;
+}
+
+static void
+parse_ethernet(struct rte_mbuf *m, uint16_t *l4_proto, void **l4_hdr)
+{
+	struct rte_ipv4_hdr *ipv4_hdr;
+	struct rte_ipv6_hdr *ipv6_hdr;
+	void *l3_hdr = NULL;
+	struct rte_ether_hdr *eth_hdr;
+	uint16_t ethertype;
+
+	eth_hdr = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
+
+	m->l2_len = sizeof(struct rte_ether_hdr);
+	ethertype = rte_be_to_cpu_16(eth_hdr->ether_type);
+
+	if (ethertype == RTE_ETHER_TYPE_VLAN) {
+		struct rte_vlan_hdr *vlan_hdr =
+			(struct rte_vlan_hdr *)(eth_hdr + 1);
+
+		m->l2_len += sizeof(struct rte_vlan_hdr);
+		ethertype = rte_be_to_cpu_16(vlan_hdr->eth_proto);
+	}
+
+	l3_hdr = (char *)eth_hdr + m->l2_len;
+
+	switch (ethertype) {
+	case RTE_ETHER_TYPE_IPV4:
+		ipv4_hdr = l3_hdr;
+		*l4_proto = ipv4_hdr->next_proto_id;
+		m->l3_len = (ipv4_hdr->version_ihl & 0x0f) * 4;
+		*l4_hdr = (char *)l3_hdr + m->l3_len;
+		m->ol_flags |= PKT_TX_IPV4;
+		break;
+	case RTE_ETHER_TYPE_IPV6:
+		ipv6_hdr = l3_hdr;
+		*l4_proto = ipv6_hdr->proto;
+		m->l3_len = sizeof(struct rte_ipv6_hdr);
+		*l4_hdr = (char *)l3_hdr + m->l3_len;
+		m->ol_flags |= PKT_TX_IPV6;
+		break;
+	default:
+		m->l3_len = 0;
+		*l4_proto = 0;
+		*l4_hdr = NULL;
+		break;
+	}
+}
+
+static __rte_always_inline void
+vhost_dequeue_offload(struct virtio_net_hdr *hdr, struct rte_mbuf *m)
+{
+	uint16_t l4_proto = 0;
+	void *l4_hdr = NULL;
+	struct rte_tcp_hdr *tcp_hdr = NULL;
+
+	if (hdr->flags == 0 && hdr->gso_type == VIRTIO_NET_HDR_GSO_NONE)
+		return;
+
+	parse_ethernet(m, &l4_proto, &l4_hdr);
+	if (hdr->flags == VIRTIO_NET_HDR_F_NEEDS_CSUM) {
+		if (hdr->csum_start == (m->l2_len + m->l3_len)) {
+			switch (hdr->csum_offset) {
+			case (offsetof(struct rte_tcp_hdr, cksum)):
+				if (l4_proto == IPPROTO_TCP)
+					m->ol_flags |= PKT_TX_TCP_CKSUM;
+				break;
+			case (offsetof(struct rte_udp_hdr, dgram_cksum)):
+				if (l4_proto == IPPROTO_UDP)
+					m->ol_flags |= PKT_TX_UDP_CKSUM;
+				break;
+			case (offsetof(struct rte_sctp_hdr, cksum)):
+				if (l4_proto == IPPROTO_SCTP)
+					m->ol_flags |= PKT_TX_SCTP_CKSUM;
+				break;
+			default:
+				break;
+			}
+		}
+	}
+
+	if (l4_hdr && hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
+		switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
+		case VIRTIO_NET_HDR_GSO_TCPV4:
+		case VIRTIO_NET_HDR_GSO_TCPV6:
+			tcp_hdr = l4_hdr;
+			m->ol_flags |= PKT_TX_TCP_SEG;
+			m->tso_segsz = hdr->gso_size;
+			m->l4_len = (tcp_hdr->data_off & 0xf0) >> 2;
+			break;
+		case VIRTIO_NET_HDR_GSO_UDP:
+			m->ol_flags |= PKT_TX_UDP_SEG;
+			m->tso_segsz = hdr->gso_size;
+			m->l4_len = sizeof(struct rte_udp_hdr);
+			break;
+		default:
+			VHOST_LOG(WARNING,
+				  "unsupported gso type %u.\n", hdr->gso_type);
+			break;
+		}
+	}
+}
+
+static __rte_noinline void
+copy_vnet_hdr_from_desc(struct virtio_net_hdr *hdr, struct buf_vector *buf_vec)
+{
+	uint64_t len;
+	uint64_t remain = sizeof(struct virtio_net_hdr);
+	uint64_t src;
+	uint64_t dst = (uint64_t)(uintptr_t)hdr;
+
+	while (remain) {
+		len = RTE_MIN(remain, buf_vec->buf_len);
+		src = buf_vec->buf_addr;
+		rte_memcpy((void *)(uintptr_t)dst, (void *)(uintptr_t)src,
+			   len);
+
+		remain -= len;
+		dst += len;
+		buf_vec++;
+	}
+}
+
+static __rte_always_inline int
+copy_desc_to_mbuf(struct pmd_internal *dev, struct dma_vring *dma_vr,
+		  struct buf_vector *buf_vec, uint16_t nr_vec,
+		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool)
+{
+	uint32_t buf_avail, buf_offset;
+	uint64_t buf_addr, buf_iova, buf_len;
+	uint32_t mbuf_avail, mbuf_offset;
+	uint32_t cpy_len;
+	struct rte_mbuf *cur = m, *prev = m;
+	struct virtio_net_hdr tmp_hdr;
+	struct virtio_net_hdr *hdr = NULL;
+	/* a counter to avoid desc dead loop chain */
+	uint16_t vec_idx = 0;
+	struct batch_copy_elem *batch_copy = dma_vr->batch_copy_elems;
+	int error = 0;
+
+	buf_addr = buf_vec[vec_idx].buf_addr;
+	buf_iova = buf_vec[vec_idx].buf_iova;
+	buf_len = buf_vec[vec_idx].buf_len;
+
+	if (unlikely(buf_len < dev->hdr_len && nr_vec <= 1)) {
+		error = -1;
+		goto out;
+	}
+
+	if (virtio_net_with_host_offload(dev)) {
+		if (unlikely(buf_len < sizeof(struct virtio_net_hdr))) {
+			/**
+			 * no luck, the virtio-net header doesn't fit
+			 * in a contiguous virtual area.
+			 */
+			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
+			hdr = &tmp_hdr;
+		} else {
+			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_addr);
+		}
+	}
+
+	/**
+	 * a virtio driver normally uses at least 2 desc buffers
+	 * for Tx: the first for storing the header, and others
+	 * for storing the data.
+	 */
+	if (unlikely(buf_len < dev->hdr_len)) {
+		buf_offset = dev->hdr_len - buf_len;
+		vec_idx++;
+		buf_addr = buf_vec[vec_idx].buf_addr;
+		buf_iova = buf_vec[vec_idx].buf_iova;
+		buf_len = buf_vec[vec_idx].buf_len;
+		buf_avail  = buf_len - buf_offset;
+	} else if (buf_len == dev->hdr_len) {
+		if (unlikely(++vec_idx >= nr_vec))
+			goto out;
+		buf_addr = buf_vec[vec_idx].buf_addr;
+		buf_iova = buf_vec[vec_idx].buf_iova;
+		buf_len = buf_vec[vec_idx].buf_len;
+
+		buf_offset = 0;
+		buf_avail = buf_len;
+	} else {
+		buf_offset = dev->hdr_len;
+		buf_avail = buf_vec[vec_idx].buf_len - dev->hdr_len;
+	}
+
+	mbuf_offset = 0;
+	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
+	while (1) {
+		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
+		(void)buf_iova;
+
+		if (cpy_len > MAX_BATCH_LEN || dma_vr->batch_copy_nb_elems >=
+		    dma_vr->vr.size || (hdr && cur == m)) {
+			rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *,
+							   mbuf_offset),
+				   (void *)((uintptr_t)(buf_addr + buf_offset)),
+				   cpy_len);
+		} else {
+			batch_copy[dma_vr->batch_copy_nb_elems].dst =
+				rte_pktmbuf_mtod_offset(cur, void *,
+							mbuf_offset);
+			batch_copy[dma_vr->batch_copy_nb_elems].src =
+				(void *)((uintptr_t)(buf_addr + buf_offset));
+			batch_copy[dma_vr->batch_copy_nb_elems].len = cpy_len;
+			dma_vr->batch_copy_nb_elems++;
+		}
+
+		mbuf_avail  -= cpy_len;
+		mbuf_offset += cpy_len;
+		buf_avail -= cpy_len;
+		buf_offset += cpy_len;
+
+		/* this buf reaches to its end, get the next one */
+		if (buf_avail == 0) {
+			if (++vec_idx >= nr_vec)
+				break;
+
+			buf_addr = buf_vec[vec_idx].buf_addr;
+			buf_iova = buf_vec[vec_idx].buf_iova;
+			buf_len = buf_vec[vec_idx].buf_len;
+
+			buf_offset = 0;
+			buf_avail  = buf_len;
+		}
+
+		/**
+		 * this mbuf reaches to its end, get a new one
+		 * to hold more data.
+		 */
+		if (mbuf_avail == 0) {
+			cur = rte_pktmbuf_alloc(mbuf_pool);
+			if (unlikely(cur == NULL)) {
+				VHOST_LOG(INFO, "Failed to allocate mbuf.\n");
+				error = -1;
+				goto out;
+			}
+
+			prev->next = cur;
+			prev->data_len = mbuf_offset;
+			m->nb_segs += 1;
+			m->pkt_len += mbuf_offset;
+			prev = cur;
+
+			mbuf_offset = 0;
+			mbuf_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
+		}
+	}
+
+	prev->data_len = mbuf_offset;
+	m->pkt_len += mbuf_offset;
+
+	if (hdr)
+		vhost_dequeue_offload(hdr, m);
+
+out:
+
+	return error;
+}
+
+static __rte_always_inline uint16_t
+vhost_dma_dequeue_split(struct pmd_internal *dev, struct dma_vring *dma_vr,
+			 struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+			 uint16_t count)
+{
+	struct rte_vhost_vring *vr = &dma_vr->vr;
+	uint16_t free_entries, i;
+
+	free_entries = *((volatile uint16_t *)&vr->avail->idx) -
+		dma_vr->last_avail_idx;
+	if (free_entries == 0)
+		return 0;
+
+	/**
+	 * the ordering between avail index and
+	 * desc reads needs to be enforced.
+	 */
+	rte_smp_rmb();
+
+	rte_prefetch0(&vr->avail->ring[dma_vr->last_avail_idx &
+		      (vr->size - 1)]);
+
+	count = RTE_MIN(count, VHOST_MAX_PKT_BURST);
+	count = RTE_MIN(count, free_entries);
+	VHOST_LOG(DEBUG, "(%d) about to dequeue %u buffers\n",
+		  dev->vid, count);
+
+	for (i = 0; i < count; i++) {
+		struct buf_vector buf_vec[BUF_VECTOR_MAX];
+		uint16_t head_idx;
+		uint32_t dummy_len;
+		uint16_t nr_vec = 0;
+		int err;
+
+		if (unlikely(fill_vec_buf_split(dev, dma_vr,
+						dma_vr->last_avail_idx + i,
+						&nr_vec, buf_vec,
+						&head_idx, &dummy_len) < 0))
+			break;
+
+		update_shadow_used_ring_split(dma_vr, head_idx, 0);
+
+		pkts[i] = rte_pktmbuf_alloc(mbuf_pool);
+		if (unlikely(pkts[i] == NULL)) {
+			VHOST_LOG(INFO, "Failed to allocate mbuf.\n");
+			break;
+		}
+
+		err = copy_desc_to_mbuf(dev, dma_vr, buf_vec, nr_vec, pkts[i],
+					mbuf_pool);
+		if (unlikely(err)) {
+			rte_pktmbuf_free(pkts[i]);
+			break;
+		}
+	}
+	dma_vr->last_avail_idx += i;
+
+	do_data_copy(dma_vr);
+
+	if (unlikely(i < count))
+		dma_vr->shadow_used_idx = i;
+	if (dma_vr->shadow_used_idx) {
+		flush_shadow_used_ring_split(dev, dma_vr);
+		vhost_dma_vring_call(dev, dma_vr);
+	}
+
+	return i;
+}
+
+uint16_t
+vhost_dma_dequeue_burst(struct pmd_internal *dev, struct dma_vring *dma_vr,
+			 struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+			 uint16_t count)
+{
+	return vhost_dma_dequeue_split(dev, dma_vr, mbuf_pool, pkts, count);
+}
+
+int
+vhost_dma_setup(struct pmd_internal *dev)
+{
+	struct dma_vring *dma_vr;
+	int vid = dev->vid;
+	int ret;
+	uint16_t i, j, size;
+
+	rte_vhost_get_negotiated_features(vid, &dev->features);
+	if (vq_is_packed(dev)) {
+		VHOST_LOG(ERR, "vHost DMA doesn't support packed ring\n");
+		return -1;
+	}
+
+	if (dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF))
+		dev->hdr_len = sizeof(struct virtio_net_hdr_mrg_rxbuf);
+	else
+		dev->hdr_len = sizeof(struct virtio_net_hdr);
+
+	dev->nr_vrings = rte_vhost_get_vring_num(vid);
+
+	if (rte_vhost_get_mem_table(vid, &dev->mem) < 0) {
+		VHOST_LOG(ERR, "Failed to get guest memory regions\n");
+		return -1;
+	}
+
+	/* set up gpa and hpa mappings */
+	if (setup_guest_pages(dev, dev->mem) < 0) {
+		VHOST_LOG(ERR, "Failed to get hpa and gpa mappings\n");
+		free(dev->mem);
+		return -1;
+	}
+
+	for (i = 0; i < dev->nr_vrings; i++) {
+		dma_vr = &dev->dma_vrings[i];
+
+		ret = rte_vhost_get_vring_base(vid, i, &dma_vr->last_avail_idx,
+					       &dma_vr->last_used_idx);
+		if (ret < 0) {
+			VHOST_LOG(ERR, "Failed to get vring index.\n");
+			goto err;
+		}
+
+		ret = rte_vhost_get_vhost_vring(vid, i, &dma_vr->vr);
+		if (ret < 0) {
+			VHOST_LOG(ERR, "Failed to get vring address.\n");
+			goto err;
+		}
+
+		size = dma_vr->vr.size;
+		dma_vr->shadow_used_split =
+			rte_malloc(NULL, size * sizeof(struct vring_used_elem),
+				   RTE_CACHE_LINE_SIZE);
+		if (dma_vr->shadow_used_split == NULL)
+			goto err;
+
+		dma_vr->batch_copy_elems =
+			rte_malloc(NULL, size * sizeof(struct batch_copy_elem),
+				   RTE_CACHE_LINE_SIZE);
+		if (dma_vr->batch_copy_elems == NULL)
+			goto err;
+
+		/* set up used index array for DMA copy */
+		dma_vr->used_idx_hpa =
+			rte_mem_virt2iova(&dma_vr->vr.used->idx);
+		dma_vr->max_indices = dma_vr->vr.size;
+		setup_indices(&dma_vr->indices, dma_vr->max_indices);
+
+		dma_vr->copy_done_used = dma_vr->last_used_idx;
+		dma_vr->signalled_used = dma_vr->last_used_idx;
+		dma_vr->signalled_used_valid = false;
+
+		dma_vr->shadow_used_idx = 0;
+		dma_vr->batch_copy_nb_elems = 0;
+	}
+
+	return 0;
+
+err:
+	for (j = 0; j <= i; j++) {
+		dma_vr = &dev->dma_vrings[j];
+		rte_free(dma_vr->shadow_used_split);
+		rte_free(dma_vr->batch_copy_elems);
+		destroy_indices(&dma_vr->indices);
+		dma_vr->batch_copy_elems = NULL;
+		dma_vr->shadow_used_split = NULL;
+	}
+
+	free(dev->mem);
+	dev->mem = NULL;
+	free(dev->guest_pages);
+	dev->guest_pages = NULL;
+
+	return -1;
+}
+
+void
+vhost_dma_remove(struct pmd_internal *dev)
+{
+	struct dma_vring *dma_vr;
+	uint16_t i;
+
+	for (i = 0; i < dev->nr_vrings; i++) {
+		dma_vr = &dev->dma_vrings[i];
+
+		if (dma_vr->dma_enabled) {
+			while (dma_vr->nr_inflight > 0)
+				process_dma_completed(dev, dma_vr);
+
+			VHOST_LOG(INFO, "Wait for outstanding DMA jobs "
+				  "of vring %u completion\n", i);
+
+			rte_rawdev_stop(dma_vr->dev_id);
+			dma_vr->dma_enabled = false;
+			dma_vr->nr_batching = 0;
+			dma_vr->dev_id = -1;
+		}
+
+		rte_free(dma_vr->shadow_used_split);
+		dma_vr->shadow_used_split = NULL;
+		rte_free(dma_vr->batch_copy_elems);
+		dma_vr->batch_copy_elems = NULL;
+		dma_vr->signalled_used_valid = false;
+		dma_vr->used_idx_hpa = 0;
+		destroy_indices(&dma_vr->indices);
+		dma_vr->max_indices = 0;
+	}
+
+	free(dev->mem);
+	dev->mem = NULL;
+	free(dev->guest_pages);
+	dev->guest_pages = NULL;
+}
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index 1f5c748..1c2e1ce 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -220,6 +220,7 @@  _LDLIBS-$(CONFIG_RTE_LIBRTE_VDEV_NETVSC_PMD) += -lrte_pmd_vdev_netvsc
 _LDLIBS-$(CONFIG_RTE_LIBRTE_VIRTIO_PMD)     += -lrte_pmd_virtio
 ifeq ($(CONFIG_RTE_LIBRTE_VHOST),y)
 _LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_VHOST)      += -lrte_pmd_vhost
+_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_VHOST_DMA)      += -lrte_pmd_vhost_dma
 ifeq ($(CONFIG_RTE_EAL_VFIO),y)
 _LDLIBS-$(CONFIG_RTE_LIBRTE_IFC_PMD) += -lrte_pmd_ifc
 endif # $(CONFIG_RTE_EAL_VFIO)