net/vhost: support asynchronous data path

Message ID 20220814150636.2260317-1-jiayu.hu@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Maxime Coquelin
Headers
Series net/vhost: support asynchronous data path |

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/Intel-compilation fail Compilation issues
ci/github-robot: build fail github build: failed
ci/iol-testing fail build patch failure

Commit Message

Hu, Jiayu Aug. 14, 2022, 3:06 p.m. UTC
  Vhost asynchronous data-path offloads packet copy from the CPU
to the DMA engine. As a result, large packet copy can be accelerated
by the DMA engine, and vhost can free CPU cycles for higher level
functions.

In this patch, we enable asynchronous data-path for vhostpmd.
Asynchronous data path is enabled per tx/rx queue, and users need
to specify the DMA device used by the tx/rx queue. Each tx/rx queue
only supports to use one DMA device, but one DMA device can be shared
among multiple tx/rx queues of different vhostpmd ports.

Two PMD parameters are added:
- dmas:	specify the used DMA device for a tx/rx queue
	(Default: no queues enable asynchronous data path)
- dma-ring-size: DMA ring size.
	(Default: 2048).

Here is an example:
--vdev 'eth_vhost0,iface=./s0,dmas=[txq0@0000:00.01.0;rxq0@0000:00.01.1],dma-ring-size=4096'

Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
Signed-off-by: Yuan Wang <yuanx.wang@intel.com>
Signed-off-by: Wenwu Ma <wenwux.ma@intel.com>
---
 drivers/net/vhost/meson.build     |   1 +
 drivers/net/vhost/rte_eth_vhost.c | 488 ++++++++++++++++++++++++++++--
 drivers/net/vhost/rte_eth_vhost.h |  14 +
 3 files changed, 470 insertions(+), 33 deletions(-)
  

Patch

diff --git a/drivers/net/vhost/meson.build b/drivers/net/vhost/meson.build
index f481a3a4b8..22a0ab3a58 100644
--- a/drivers/net/vhost/meson.build
+++ b/drivers/net/vhost/meson.build
@@ -9,4 +9,5 @@  endif
 
 deps += 'vhost'
 sources = files('rte_eth_vhost.c')
+testpmd_sources = files('vhost_testpmd.c')
 headers = files('rte_eth_vhost.h')
diff --git a/drivers/net/vhost/rte_eth_vhost.c b/drivers/net/vhost/rte_eth_vhost.c
index 7e512d94bf..361e5d66c6 100644
--- a/drivers/net/vhost/rte_eth_vhost.c
+++ b/drivers/net/vhost/rte_eth_vhost.c
@@ -17,6 +17,8 @@ 
 #include <rte_kvargs.h>
 #include <rte_vhost.h>
 #include <rte_spinlock.h>
+#include <rte_vhost_async.h>
+#include <rte_dmadev.h>
 
 #include "rte_eth_vhost.h"
 
@@ -36,8 +38,13 @@  enum {VIRTIO_RXQ, VIRTIO_TXQ, VIRTIO_QNUM};
 #define ETH_VHOST_LINEAR_BUF		"linear-buffer"
 #define ETH_VHOST_EXT_BUF		"ext-buffer"
 #define ETH_VHOST_LEGACY_OL_FLAGS	"legacy-ol-flags"
+#define ETH_VHOST_DMA_ARG		"dmas"
+#define ETH_VHOST_DMA_RING_SIZE		"dma-ring-size"
 #define VHOST_MAX_PKT_BURST 32
 
+#define INVALID_DMA_ID		-1
+#define DEFAULT_DMA_RING_SIZE	2048
+
 static const char *valid_arguments[] = {
 	ETH_VHOST_IFACE_ARG,
 	ETH_VHOST_QUEUES_ARG,
@@ -48,6 +55,8 @@  static const char *valid_arguments[] = {
 	ETH_VHOST_LINEAR_BUF,
 	ETH_VHOST_EXT_BUF,
 	ETH_VHOST_LEGACY_OL_FLAGS,
+	ETH_VHOST_DMA_ARG,
+	ETH_VHOST_DMA_RING_SIZE,
 	NULL
 };
 
@@ -79,8 +88,39 @@  struct vhost_queue {
 	struct vhost_stats stats;
 	int intr_enable;
 	rte_spinlock_t intr_lock;
+
+	/* Flag of enabling async data path */
+	bool async_register;
+	/* DMA device ID */
+	int16_t dma_id;
+	/**
+	 * For a Rx queue, "txq" points to its peer Tx queue.
+	 * For a Tx queue, "txq" is never used.
+	 */
+	struct vhost_queue *txq;
+	/* Array to keep DMA completed packets */
+	struct rte_mbuf *cmpl_pkts[VHOST_MAX_PKT_BURST];
 };
 
+struct dma_input_info {
+	int16_t dmas[RTE_MAX_QUEUES_PER_PORT * 2];
+	uint16_t dma_ring_size;
+};
+
+static int16_t configured_dmas[RTE_DMADEV_DEFAULT_MAX];
+static int dma_count;
+
+/**
+ * By default, its Rx path to call rte_vhost_poll_enqueue_completed() for enqueue operations.
+ * However, Rx function is never been called in testpmd "txonly" mode, thus causing virtio
+ * cannot receive DMA completed packets. To make txonly mode work correctly, we provide a
+ * command in testpmd to call rte_vhost_poll_enqueue_completed() in Tx path.
+ *
+ * When set async_tx_poll_completed to true, Tx path calls rte_vhost_poll_enqueue_completed();
+ * otherwise, Rx path calls it.
+ */
+bool async_tx_poll_completed;
+
 struct pmd_internal {
 	rte_atomic32_t dev_attached;
 	char *iface_name;
@@ -93,6 +133,10 @@  struct pmd_internal {
 	bool vlan_strip;
 	bool rx_sw_csum;
 	bool tx_sw_csum;
+	struct {
+		int16_t dma_id;
+		bool async_register;
+	} queue_dmas[RTE_MAX_QUEUES_PER_PORT * 2];
 };
 
 struct internal_list {
@@ -123,6 +167,17 @@  struct rte_vhost_vring_state {
 
 static struct rte_vhost_vring_state *vring_states[RTE_MAX_ETHPORTS];
 
+static bool
+dma_is_configured(int16_t dma_id)
+{
+	int i;
+
+	for (i = 0; i < dma_count; i++)
+		if (configured_dmas[i] == dma_id)
+			return true;
+	return false;
+}
+
 static int
 vhost_dev_xstats_reset(struct rte_eth_dev *dev)
 {
@@ -395,6 +450,17 @@  vhost_dev_rx_sw_csum(struct rte_mbuf *mbuf)
 	mbuf->ol_flags |= RTE_MBUF_F_RX_L4_CKSUM_GOOD;
 }
 
+static inline void
+vhost_tx_free_completed(uint16_t vid, uint16_t virtqueue_id, int16_t dma_id,
+		struct rte_mbuf **pkts, uint16_t count)
+{
+	uint16_t i, ret;
+
+	ret = rte_vhost_poll_enqueue_completed(vid, virtqueue_id, pkts, count, dma_id, 0);
+	for (i = 0; likely(i < ret); i++)
+		rte_pktmbuf_free(pkts[i]);
+}
+
 static uint16_t
 eth_vhost_rx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
 {
@@ -403,7 +469,7 @@  eth_vhost_rx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
 	uint16_t nb_receive = nb_bufs;
 
 	if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0))
-		return 0;
+		goto tx_poll;
 
 	rte_atomic32_set(&r->while_queuing, 1);
 
@@ -411,19 +477,36 @@  eth_vhost_rx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
 		goto out;
 
 	/* Dequeue packets from guest TX queue */
-	while (nb_receive) {
-		uint16_t nb_pkts;
-		uint16_t num = (uint16_t)RTE_MIN(nb_receive,
-						 VHOST_MAX_PKT_BURST);
-
-		nb_pkts = rte_vhost_dequeue_burst(r->vid, r->virtqueue_id,
-						  r->mb_pool, &bufs[nb_rx],
-						  num);
-
-		nb_rx += nb_pkts;
-		nb_receive -= nb_pkts;
-		if (nb_pkts < num)
-			break;
+	if (!r->async_register) {
+		while (nb_receive) {
+			uint16_t nb_pkts;
+			uint16_t num = (uint16_t)RTE_MIN(nb_receive,
+							VHOST_MAX_PKT_BURST);
+
+			nb_pkts = rte_vhost_dequeue_burst(r->vid, r->virtqueue_id,
+						r->mb_pool, &bufs[nb_rx],
+						num);
+
+			nb_rx += nb_pkts;
+			nb_receive -= nb_pkts;
+			if (nb_pkts < num)
+				break;
+		}
+	} else {
+		while (nb_receive) {
+			uint16_t nb_pkts;
+			uint16_t num = (uint16_t)RTE_MIN(nb_receive, VHOST_MAX_PKT_BURST);
+			int nr_inflight;
+
+			nb_pkts = rte_vhost_async_try_dequeue_burst(r->vid, r->virtqueue_id,
+						r->mb_pool, &bufs[nb_rx], num, &nr_inflight,
+						r->dma_id, 0);
+
+			nb_rx += nb_pkts;
+			nb_receive -= nb_pkts;
+			if (nb_pkts < num)
+				break;
+		}
 	}
 
 	r->stats.pkts += nb_rx;
@@ -444,6 +527,17 @@  eth_vhost_rx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
 out:
 	rte_atomic32_set(&r->while_queuing, 0);
 
+tx_poll:
+	/**
+	 * Poll and free completed packets for the virtqueue of Tx queue.
+	 * Note that we access Tx queue's virtqueue, which is protected
+	 * by vring lock.
+	 */
+	if (!async_tx_poll_completed && r->txq->async_register) {
+		vhost_tx_free_completed(r->vid, r->txq->virtqueue_id, r->txq->dma_id,
+				r->cmpl_pkts, VHOST_MAX_PKT_BURST);
+	}
+
 	return nb_rx;
 }
 
@@ -485,31 +579,53 @@  eth_vhost_tx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
 	}
 
 	/* Enqueue packets to guest RX queue */
-	while (nb_send) {
-		uint16_t nb_pkts;
-		uint16_t num = (uint16_t)RTE_MIN(nb_send,
-						 VHOST_MAX_PKT_BURST);
+	if (!r->async_register) {
+		while (nb_send) {
+			uint16_t nb_pkts;
+			uint16_t num = (uint16_t)RTE_MIN(nb_send, VHOST_MAX_PKT_BURST);
+
+			nb_pkts = rte_vhost_enqueue_burst(r->vid, r->virtqueue_id,
+						&bufs[nb_tx], num);
+
+			nb_tx += nb_pkts;
+			nb_send -= nb_pkts;
+			if (nb_pkts < num)
+				break;
+		}
 
-		nb_pkts = rte_vhost_enqueue_burst(r->vid, r->virtqueue_id,
-						  &bufs[nb_tx], num);
+		for (i = 0; likely(i < nb_tx); i++) {
+			nb_bytes += bufs[i]->pkt_len;
+			rte_pktmbuf_free(bufs[i]);
+		}
 
-		nb_tx += nb_pkts;
-		nb_send -= nb_pkts;
-		if (nb_pkts < num)
-			break;
-	}
+	} else {
+		while (nb_send) {
+			uint16_t nb_pkts;
+			uint16_t num = (uint16_t)RTE_MIN(nb_send, VHOST_MAX_PKT_BURST);
 
-	for (i = 0; likely(i < nb_tx); i++)
-		nb_bytes += bufs[i]->pkt_len;
+			nb_pkts = rte_vhost_submit_enqueue_burst(r->vid, r->virtqueue_id,
+							&bufs[nb_tx], num, r->dma_id, 0);
 
-	nb_missed = nb_bufs - nb_tx;
+			nb_tx += nb_pkts;
+			nb_send -= nb_pkts;
+			if (nb_pkts < num)
+				break;
+		}
 
+		for (i = 0; likely(i < nb_tx); i++)
+			nb_bytes += bufs[i]->pkt_len;
+
+		if (unlikely(async_tx_poll_completed)) {
+			vhost_tx_free_completed(r->vid, r->virtqueue_id, r->dma_id, r->cmpl_pkts,
+					VHOST_MAX_PKT_BURST);
+		}
+	}
+
+	nb_missed = nb_bufs - nb_tx;
 	r->stats.pkts += nb_tx;
 	r->stats.bytes += nb_bytes;
 	r->stats.missed_pkts += nb_missed;
 
-	for (i = 0; likely(i < nb_tx); i++)
-		rte_pktmbuf_free(bufs[i]);
 out:
 	rte_atomic32_set(&r->while_queuing, 0);
 
@@ -797,6 +913,8 @@  queue_setup(struct rte_eth_dev *eth_dev, struct pmd_internal *internal)
 		vq->vid = internal->vid;
 		vq->internal = internal;
 		vq->port = eth_dev->data->port_id;
+		if (i < eth_dev->data->nb_tx_queues)
+			vq->txq = eth_dev->data->tx_queues[i];
 	}
 	for (i = 0; i < eth_dev->data->nb_tx_queues; i++) {
 		vq = eth_dev->data->tx_queues[i];
@@ -982,6 +1100,9 @@  vring_state_changed(int vid, uint16_t vring, int enable)
 	struct rte_vhost_vring_state *state;
 	struct rte_eth_dev *eth_dev;
 	struct internal_list *list;
+	struct vhost_queue *queue;
+	struct pmd_internal *internal;
+	int qid;
 	char ifname[PATH_MAX];
 
 	rte_vhost_get_ifname(vid, ifname, sizeof(ifname));
@@ -1010,6 +1131,65 @@  vring_state_changed(int vid, uint16_t vring, int enable)
 
 	update_queuing_status(eth_dev, false);
 
+	qid = vring / VIRTIO_QNUM;
+	if (vring % VIRTIO_QNUM == VIRTIO_RXQ)
+		queue = eth_dev->data->tx_queues[qid];
+	else
+		queue = eth_dev->data->rx_queues[qid];
+
+	if (!queue)
+		goto skip;
+
+	internal = eth_dev->data->dev_private;
+
+	/* Register async data path for the queue assigned valid DMA device */
+	if (internal->queue_dmas[queue->virtqueue_id].dma_id == INVALID_DMA_ID)
+		goto skip;
+
+	if (enable && !queue->async_register) {
+		if (rte_vhost_async_channel_register_thread_unsafe(vid, vring)) {
+			VHOST_LOG(ERR, "Failed to register async for vid-%u vring-%u!\n", vid,
+					vring);
+			return -1;
+		}
+
+		queue->async_register = true;
+		internal->queue_dmas[vring].async_register = true;
+
+		VHOST_LOG(INFO, "Succeed to register async for vid-%u vring-%u\n", vid, vring);
+	}
+
+	if (!enable && queue->async_register) {
+		struct rte_mbuf *pkts[VHOST_MAX_PKT_BURST];
+		uint16_t ret, i, nr_done = 0;
+		uint16_t dma_id = queue->dma_id;
+
+		while (rte_vhost_async_get_inflight_thread_unsafe(vid, vring) > 0) {
+			ret = rte_vhost_clear_queue_thread_unsafe(vid, vring, pkts,
+					VHOST_MAX_PKT_BURST, dma_id, 0);
+
+			for (i = 0; i < ret ; i++)
+				rte_pktmbuf_free(pkts[i]);
+
+			nr_done += ret;
+		}
+
+		VHOST_LOG(INFO, "Completed %u in-flight pkts for vid-%u vring-%u\n", nr_done, vid,
+				vring);
+
+		if (rte_vhost_async_channel_unregister_thread_unsafe(vid, vring)) {
+			VHOST_LOG(ERR, "Failed to unregister async for vid-%u vring-%u\n", vid,
+					vring);
+			return -1;
+		}
+
+		queue->async_register = false;
+		internal->queue_dmas[vring].async_register = false;
+
+		VHOST_LOG(INFO, "Succeed to unregister async for vid-%u vring-%u\n", vid, vring);
+	}
+
+skip:
 	VHOST_LOG(INFO, "vring%u is %s\n",
 			vring, enable ? "enabled" : "disabled");
 
@@ -1214,11 +1394,37 @@  eth_dev_stop(struct rte_eth_dev *dev)
 	return 0;
 }
 
+static inline int
+async_clear_virtqueue(uint16_t vid, uint16_t virtqueue_id, int16_t dma_id)
+{
+	struct rte_mbuf *pkts[VHOST_MAX_PKT_BURST];
+	uint16_t i, ret, nr_done = 0;
+
+	while (rte_vhost_async_get_inflight(vid, virtqueue_id) > 0) {
+		ret = rte_vhost_clear_queue(vid, virtqueue_id, pkts, VHOST_MAX_PKT_BURST, dma_id,
+				0);
+		for (i = 0; i < ret ; i++)
+			rte_pktmbuf_free(pkts[i]);
+
+		nr_done += ret;
+	}
+	VHOST_LOG(INFO, "Completed %u pkts for vid-%u vring-%u\n", nr_done, vid, virtqueue_id);
+
+	if (rte_vhost_async_channel_unregister(vid, virtqueue_id)) {
+		VHOST_LOG(ERR, "Failed to unregister async for vid-%u vring-%u\n", vid,
+				virtqueue_id);
+		return -1;
+	}
+
+	return nr_done;
+}
+
 static int
 eth_dev_close(struct rte_eth_dev *dev)
 {
 	struct pmd_internal *internal;
 	struct internal_list *list;
+	struct vhost_queue *queue;
 	unsigned int i, ret;
 
 	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
@@ -1232,6 +1438,27 @@  eth_dev_close(struct rte_eth_dev *dev)
 
 	list = find_internal_resource(internal->iface_name);
 	if (list) {
+		/* Make sure all in-flight packets are completed before destroy virtio */
+		if (dev->data->rx_queues) {
+			for (i = 0; i < dev->data->nb_rx_queues; i++) {
+				queue = dev->data->rx_queues[i];
+				if (queue->async_register) {
+					async_clear_virtqueue(queue->vid, queue->virtqueue_id,
+							queue->dma_id);
+				}
+			}
+		}
+
+		if (dev->data->tx_queues) {
+			for (i = 0; i < dev->data->nb_tx_queues; i++) {
+				queue = dev->data->tx_queues[i];
+				if (queue->async_register) {
+					async_clear_virtqueue(queue->vid, queue->virtqueue_id,
+							queue->dma_id);
+				}
+			}
+		}
+
 		rte_vhost_driver_unregister(internal->iface_name);
 		pthread_mutex_lock(&internal_list_lock);
 		TAILQ_REMOVE(&internal_list, list, next);
@@ -1266,6 +1493,7 @@  eth_rx_queue_setup(struct rte_eth_dev *dev, uint16_t rx_queue_id,
 		   struct rte_mempool *mb_pool)
 {
 	struct vhost_queue *vq;
+	struct pmd_internal *internal = dev->data->dev_private;
 
 	vq = rte_zmalloc_socket(NULL, sizeof(struct vhost_queue),
 			RTE_CACHE_LINE_SIZE, socket_id);
@@ -1276,6 +1504,8 @@  eth_rx_queue_setup(struct rte_eth_dev *dev, uint16_t rx_queue_id,
 
 	vq->mb_pool = mb_pool;
 	vq->virtqueue_id = rx_queue_id * VIRTIO_QNUM + VIRTIO_TXQ;
+	vq->async_register = internal->queue_dmas[vq->virtqueue_id].async_register;
+	vq->dma_id = internal->queue_dmas[vq->virtqueue_id].dma_id;
 	rte_spinlock_init(&vq->intr_lock);
 	dev->data->rx_queues[rx_queue_id] = vq;
 
@@ -1289,6 +1519,7 @@  eth_tx_queue_setup(struct rte_eth_dev *dev, uint16_t tx_queue_id,
 		   const struct rte_eth_txconf *tx_conf __rte_unused)
 {
 	struct vhost_queue *vq;
+	struct pmd_internal *internal = dev->data->dev_private;
 
 	vq = rte_zmalloc_socket(NULL, sizeof(struct vhost_queue),
 			RTE_CACHE_LINE_SIZE, socket_id);
@@ -1298,6 +1529,8 @@  eth_tx_queue_setup(struct rte_eth_dev *dev, uint16_t tx_queue_id,
 	}
 
 	vq->virtqueue_id = tx_queue_id * VIRTIO_QNUM + VIRTIO_RXQ;
+	vq->async_register = internal->queue_dmas[vq->virtqueue_id].async_register;
+	vq->dma_id = internal->queue_dmas[vq->virtqueue_id].dma_id;
 	rte_spinlock_init(&vq->intr_lock);
 	dev->data->tx_queues[tx_queue_id] = vq;
 
@@ -1508,13 +1741,14 @@  static const struct eth_dev_ops ops = {
 static int
 eth_dev_vhost_create(struct rte_vdev_device *dev, char *iface_name,
 	int16_t queues, const unsigned int numa_node, uint64_t flags,
-	uint64_t disable_flags)
+	uint64_t disable_flags, struct dma_input_info *dma_input)
 {
 	const char *name = rte_vdev_device_name(dev);
 	struct rte_eth_dev_data *data;
 	struct pmd_internal *internal = NULL;
 	struct rte_eth_dev *eth_dev = NULL;
 	struct rte_ether_addr *eth_addr = NULL;
+	int i = 0;
 
 	VHOST_LOG(INFO, "Creating VHOST-USER backend on numa socket %u\n",
 		numa_node);
@@ -1563,6 +1797,12 @@  eth_dev_vhost_create(struct rte_vdev_device *dev, char *iface_name,
 	eth_dev->rx_pkt_burst = eth_vhost_rx;
 	eth_dev->tx_pkt_burst = eth_vhost_tx;
 
+	for (i = 0; i < RTE_MAX_QUEUES_PER_PORT * 2; i++) {
+		/* Invalid DMA ID indicates the queue does not want to enable async data path */
+		internal->queue_dmas[i].dma_id = dma_input->dmas[i];
+		internal->queue_dmas[i].async_register = false;
+	}
+
 	rte_eth_dev_probing_finish(eth_dev);
 	return 0;
 
@@ -1602,6 +1842,153 @@  open_int(const char *key __rte_unused, const char *value, void *extra_args)
 	return 0;
 }
 
+static int
+init_dma(int16_t dma_id, uint16_t ring_size)
+{
+	struct rte_dma_info info;
+	struct rte_dma_conf dev_config = { .nb_vchans = 1 };
+	struct rte_dma_vchan_conf qconf = {
+		.direction = RTE_DMA_DIR_MEM_TO_MEM,
+	};
+	int ret = 0;
+
+	if (dma_is_configured(dma_id))
+		goto out;
+
+	if (rte_dma_info_get(dma_id, &info) != 0) {
+		VHOST_LOG(ERR, "dma %u get info failed\n", dma_id);
+		ret = -1;
+		goto out;
+	}
+
+	if (info.max_vchans < 1) {
+		VHOST_LOG(ERR, "No channels available on dma %d\n", dma_id);
+		ret = -1;
+		goto out;
+	}
+
+	if (rte_dma_configure(dma_id, &dev_config) != 0) {
+		VHOST_LOG(ERR, "dma %u configure failed\n", dma_id);
+		ret = -1;
+		goto out;
+	}
+
+	rte_dma_info_get(dma_id, &info);
+	if (info.nb_vchans != 1) {
+		VHOST_LOG(ERR, "dma %u has no queues\n", dma_id);
+		ret = -1;
+		goto out;
+	}
+
+	qconf.nb_desc = RTE_MIN(ring_size, info.max_desc);
+	if (rte_dma_vchan_setup(dma_id, 0, &qconf) != 0) {
+		VHOST_LOG(ERR, "dma %u queue setup failed\n", dma_id);
+		ret = -1;
+		goto out;
+	}
+
+	if (rte_dma_start(dma_id) != 0) {
+		VHOST_LOG(ERR, "dma %u start failed\n", dma_id);
+		ret = -1;
+		goto out;
+	}
+
+	configured_dmas[dma_count++] = dma_id;
+
+out:
+	return ret;
+}
+
+static int
+open_dma(const char *key __rte_unused, const char *value, void *extra_args)
+{
+	struct dma_input_info *dma_input = extra_args;
+	char *input = strndup(value, strlen(value) + 1);
+	char *addrs = input;
+	char *ptrs[2];
+	char *start, *end, *substr;
+	uint16_t qid, virtqueue_id;
+	int16_t dma_id;
+	int ret = 0;
+
+	while (isblank(*addrs))
+		addrs++;
+	if (*addrs == '\0') {
+		VHOST_LOG(ERR, "No input DMA addresses\n");
+		ret = -1;
+		goto out;
+	}
+
+	/* process DMA devices within bracket. */
+	addrs++;
+	substr = strtok(addrs, ";]");
+	if (!substr) {
+		VHOST_LOG(ERR, "No input DMA addresse\n");
+		ret = -1;
+		goto out;
+	}
+
+	do {
+		rte_strsplit(substr, strlen(substr), ptrs, 2, '@');
+
+		char *txq, *rxq;
+		bool is_txq;
+
+		txq = strstr(ptrs[0], "txq");
+		rxq = strstr(ptrs[0], "rxq");
+		if (txq == NULL && rxq == NULL) {
+			VHOST_LOG(ERR, "Illegal queue\n");
+			ret = -1;
+			goto out;
+		} else if (txq) {
+			is_txq = true;
+			start = txq;
+		} else {
+			is_txq = false;
+			start = rxq;
+		}
+
+		start += 3;
+		qid = strtol(start, &end, 0);
+		if (end == start) {
+			VHOST_LOG(ERR, "No input queue ID\n");
+			ret = -1;
+			goto out;
+		}
+
+		virtqueue_id = is_txq ? qid * 2 + VIRTIO_RXQ : qid * 2 + VIRTIO_TXQ;
+
+		dma_id = rte_dma_get_dev_id_by_name(ptrs[1]);
+		if (dma_id < 0) {
+			VHOST_LOG(ERR, "Fail to find DMA device %s.\n", ptrs[1]);
+			ret = -1;
+			goto out;
+		}
+
+		ret = init_dma(dma_id, dma_input->dma_ring_size);
+		if (ret != 0) {
+			VHOST_LOG(ERR, "Fail to initialize DMA %u\n", dma_id);
+			ret = -1;
+			break;
+		}
+
+		dma_input->dmas[virtqueue_id] = dma_id;
+
+		substr = strtok(NULL, ";]");
+	} while (substr);
+
+	for (int i = 0; i < dma_count; i++) {
+		if (rte_vhost_async_dma_configure(configured_dmas[i], 0) < 0) {
+			VHOST_LOG(ERR, "Fail to configure DMA %u to vhost\n", configured_dmas[i]);
+			ret = -1;
+		}
+	}
+
+out:
+	free(input);
+	return ret;
+}
+
 static int
 rte_pmd_vhost_probe(struct rte_vdev_device *dev)
 {
@@ -1620,6 +2007,10 @@  rte_pmd_vhost_probe(struct rte_vdev_device *dev)
 	int legacy_ol_flags = 0;
 	struct rte_eth_dev *eth_dev;
 	const char *name = rte_vdev_device_name(dev);
+	struct dma_input_info dma_input;
+
+	memset(dma_input.dmas, INVALID_DMA_ID, sizeof(dma_input.dmas));
+	dma_input.dma_ring_size = DEFAULT_DMA_RING_SIZE;
 
 	VHOST_LOG(INFO, "Initializing pmd_vhost for %s\n", name);
 
@@ -1735,6 +2126,35 @@  rte_pmd_vhost_probe(struct rte_vdev_device *dev)
 			goto out_free;
 	}
 
+	if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_RING_SIZE) == 1) {
+		ret = rte_kvargs_process(kvlist, ETH_VHOST_DMA_RING_SIZE,
+				&open_int, &dma_input.dma_ring_size);
+		if (ret < 0)
+			goto out_free;
+
+		if (!rte_is_power_of_2(dma_input.dma_ring_size)) {
+			dma_input.dma_ring_size = rte_align32pow2(dma_input.dma_ring_size);
+			VHOST_LOG(INFO, "Convert dma_ring_size to the power of two %u\n",
+					dma_input.dma_ring_size);
+		}
+	}
+
+	if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_ARG) == 1) {
+		ret = rte_kvargs_process(kvlist, ETH_VHOST_DMA_ARG,
+					 &open_dma, &dma_input);
+		if (ret < 0) {
+			VHOST_LOG(ERR, "Failed to parse %s\n", ETH_VHOST_DMA_ARG);
+			goto out_free;
+		}
+
+		flags |= RTE_VHOST_USER_ASYNC_COPY;
+		/**
+		 * Don't support live migration when enable
+		 * DMA acceleration.
+		 */
+		disable_flags |= (1ULL << VHOST_F_LOG_ALL);
+	}
+
 	if (legacy_ol_flags == 0)
 		flags |= RTE_VHOST_USER_NET_COMPLIANT_OL_FLAGS;
 
@@ -1742,7 +2162,7 @@  rte_pmd_vhost_probe(struct rte_vdev_device *dev)
 		dev->device.numa_node = rte_socket_id();
 
 	ret = eth_dev_vhost_create(dev, iface_name, queues,
-				   dev->device.numa_node, flags, disable_flags);
+			dev->device.numa_node, flags, disable_flags, &dma_input);
 	if (ret == -1)
 		VHOST_LOG(ERR, "Failed to create %s\n", name);
 
@@ -1786,4 +2206,6 @@  RTE_PMD_REGISTER_PARAM_STRING(net_vhost,
 	"postcopy-support=<0|1> "
 	"tso=<0|1> "
 	"linear-buffer=<0|1> "
-	"ext-buffer=<0|1>");
+	"ext-buffer=<0|1> "
+	"dma-ring-size=<int>"
+	"dmas=[txq0@dma_addr;rxq0@dma_addr] ");
diff --git a/drivers/net/vhost/rte_eth_vhost.h b/drivers/net/vhost/rte_eth_vhost.h
index 0e68b9f668..96e0e8e7be 100644
--- a/drivers/net/vhost/rte_eth_vhost.h
+++ b/drivers/net/vhost/rte_eth_vhost.h
@@ -14,6 +14,8 @@  extern "C" {
 
 #include <rte_vhost.h>
 
+extern bool async_tx_poll_completed;
+
 /*
  * Event description.
  */
@@ -52,6 +54,18 @@  int rte_eth_vhost_get_queue_event(uint16_t port_id,
  */
 int rte_eth_vhost_get_vid_from_port_id(uint16_t port_id);
 
+/**
+ * Ask Tx side to poll completed packets
+ *
+ * @param flag
+ *  flag.
+ */
+static __rte_always_inline void
+rte_eth_vhost_async_tx_poll_completed(bool enable)
+{
+	async_tx_poll_completed = enable;
+}
+
 #ifdef __cplusplus
 }
 #endif