[1/4] raw/ntb: setup ntb queue

Message ID 20190905053933.27929-2-xiaoyun.li@intel.com (mailing list archive)
State Superseded, archived
Headers
Series enable FIFO for NTB |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation fail Compilation issues
ci/iol-dpdk_compile_spdk success Compile Testing PASS
ci/iol-dpdk_compile success Compile Testing PASS
ci/iol-dpdk_compile_ovs success Compile Testing PASS
ci/intel-Performance success Performance Testing PASS
ci/mellanox-Performance success Performance Testing PASS

Commit Message

Li, Xiaoyun Sept. 5, 2019, 5:39 a.m. UTC
Setup and init ntb txq and rxq. And negotiate queue information
with the peer. If queue size and number of queues are not
consistent on both sides, return error.

Signed-off-by: Xiaoyun Li <xiaoyun.li@intel.com>
---
 doc/guides/rawdevs/ntb.rst             |  39 +-
 doc/guides/rel_notes/release_19_11.rst |   4 +
 drivers/raw/ntb/Makefile               |   3 +
 drivers/raw/ntb/meson.build            |   1 +
 drivers/raw/ntb/ntb.c                  | 705 ++++++++++++++++++-------
 drivers/raw/ntb/ntb.h                  | 151 ++++--
 drivers/raw/ntb/ntb_hw_intel.c         |  26 +-
 drivers/raw/ntb/rte_pmd_ntb.h          |  43 ++
 8 files changed, 718 insertions(+), 254 deletions(-)
 create mode 100644 drivers/raw/ntb/rte_pmd_ntb.h
  

Patch

diff --git a/doc/guides/rawdevs/ntb.rst b/doc/guides/rawdevs/ntb.rst
index 0a61ec03d..99e7db441 100644
--- a/doc/guides/rawdevs/ntb.rst
+++ b/doc/guides/rawdevs/ntb.rst
@@ -45,8 +45,45 @@  to use, i.e. igb_uio, vfio. The ``dpdk-devbind.py`` script can be used to
 show devices status and to bind them to a suitable kernel driver. They will
 appear under the category of "Misc (rawdev) devices".
 
+Ring Layout
+-----------
+
+Since read/write remote system's memory are through PCI bus, remote read
+is much more expensive than remote write. Thus, the enqueue and dequeue
+based on ntb ring should avoid remote read. The ring layout for ntb is
+like the following:
+- Ring Format:
+  desc_ring:
+      0               16                                              64
+      +---------------------------------------------------------------+
+      |                        buffer address                         |
+      +---------------+-----------------------------------------------+
+      | buffer length |                      resv                     |
+      +---------------+-----------------------------------------------+
+  used_ring:
+      0               16              32
+      +---------------+---------------+
+      | packet length |     flags     |
+      +---------------+---------------+
+- Ring Layout
+      +------------------------+   +------------------------+
+      | used_ring              |   | desc_ring              |
+      | +---+                  |   | +---+                  |
+      | |   |                  |   | |   |                  |
+      | +---+      +--------+  |   | +---+                  |
+      | |   | ---> | buffer | <+---+-|   |                  |
+      | +---+      +--------+  |   | +---+                  |
+      | |   |                  |   | |   |                  |
+      | +---+                  |   | +---+                  |
+      |  ...                   |   |  ...                   |
+      |                        |   |                        |
+      |            +---------+ |   |            +---------+ |
+      |            | tx_tail | |   |            | rx_tail | |
+      | System A   +---------+ |   | System B   +---------+ |
+      +------------------------+   +------------------------+
+                    <---------traffic---------
+
 Limitation
 ----------
 
-- The FIFO hasn't been introduced and will come in 19.11 release.
 - This PMD only supports Intel Skylake platform.
diff --git a/doc/guides/rel_notes/release_19_11.rst b/doc/guides/rel_notes/release_19_11.rst
index 8490d897c..7ac3d5ca6 100644
--- a/doc/guides/rel_notes/release_19_11.rst
+++ b/doc/guides/rel_notes/release_19_11.rst
@@ -56,6 +56,10 @@  New Features
      Also, make sure to start the actual text at the margin.
      =========================================================
 
+   * **Introduced FIFO for NTB PMD.**
+
+     Introduced FIFO for NTB (Non-transparent Bridge) PMD to support
+     packet based processing.
 
 Removed Items
 -------------
diff --git a/drivers/raw/ntb/Makefile b/drivers/raw/ntb/Makefile
index 6fe2aaf40..814cd05ca 100644
--- a/drivers/raw/ntb/Makefile
+++ b/drivers/raw/ntb/Makefile
@@ -25,4 +25,7 @@  LIBABIVER := 1
 SRCS-$(CONFIG_RTE_LIBRTE_PMD_NTB_RAWDEV) += ntb.c
 SRCS-$(CONFIG_RTE_LIBRTE_PMD_NTB_RAWDEV) += ntb_hw_intel.c
 
+# install this header file
+SYMLINK-$(CONFIG_RTE_LIBRTE_PMD_NTB_RAWDEV)-include := rte_pmd_ntb.h
+
 include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/drivers/raw/ntb/meson.build b/drivers/raw/ntb/meson.build
index 7f39437f8..7a7d26126 100644
--- a/drivers/raw/ntb/meson.build
+++ b/drivers/raw/ntb/meson.build
@@ -5,4 +5,5 @@  deps += ['rawdev', 'mbuf', 'mempool',
 	 'pci', 'bus_pci']
 sources = files('ntb.c',
                 'ntb_hw_intel.c')
+install_headers('rte_pmd_ntb.h')
 allow_experimental_apis = true
diff --git a/drivers/raw/ntb/ntb.c b/drivers/raw/ntb/ntb.c
index bfecce1e4..124c82a95 100644
--- a/drivers/raw/ntb/ntb.c
+++ b/drivers/raw/ntb/ntb.c
@@ -12,6 +12,7 @@ 
 #include <rte_eal.h>
 #include <rte_log.h>
 #include <rte_pci.h>
+#include <rte_mbuf.h>
 #include <rte_bus_pci.h>
 #include <rte_memzone.h>
 #include <rte_memcpy.h>
@@ -19,6 +20,7 @@ 
 #include <rte_rawdev_pmd.h>
 
 #include "ntb_hw_intel.h"
+#include "rte_pmd_ntb.h"
 #include "ntb.h"
 
 int ntb_logtype;
@@ -28,48 +30,7 @@  static const struct rte_pci_id pci_id_ntb_map[] = {
 	{ .vendor_id = 0, /* sentinel */ },
 };
 
-static int
-ntb_set_mw(struct rte_rawdev *dev, int mw_idx, uint64_t mw_size)
-{
-	struct ntb_hw *hw = dev->dev_private;
-	char mw_name[RTE_MEMZONE_NAMESIZE];
-	const struct rte_memzone *mz;
-	int ret = 0;
-
-	if (hw->ntb_ops->mw_set_trans == NULL) {
-		NTB_LOG(ERR, "Not supported to set mw.");
-		return -ENOTSUP;
-	}
-
-	snprintf(mw_name, sizeof(mw_name), "ntb_%d_mw_%d",
-		 dev->dev_id, mw_idx);
-
-	mz = rte_memzone_lookup(mw_name);
-	if (mz)
-		return 0;
-
-	/**
-	 * Hardware requires that mapped memory base address should be
-	 * aligned with EMBARSZ and needs continuous memzone.
-	 */
-	mz = rte_memzone_reserve_aligned(mw_name, mw_size, dev->socket_id,
-				RTE_MEMZONE_IOVA_CONTIG, hw->mw_size[mw_idx]);
-	if (!mz) {
-		NTB_LOG(ERR, "Cannot allocate aligned memzone.");
-		return -EIO;
-	}
-	hw->mz[mw_idx] = mz;
-
-	ret = (*hw->ntb_ops->mw_set_trans)(dev, mw_idx, mz->iova, mw_size);
-	if (ret) {
-		NTB_LOG(ERR, "Cannot set mw translation.");
-		return ret;
-	}
-
-	return ret;
-}
-
-static void
+static inline void
 ntb_link_cleanup(struct rte_rawdev *dev)
 {
 	struct ntb_hw *hw = dev->dev_private;
@@ -89,20 +50,94 @@  ntb_link_cleanup(struct rte_rawdev *dev)
 	}
 
 	/* Clear mw so that peer cannot access local memory.*/
-	for (i = 0; i < hw->mw_cnt; i++) {
+	for (i = 0; i < hw->used_mw_num; i++) {
 		status = (*hw->ntb_ops->mw_set_trans)(dev, i, 0, 0);
 		if (status)
 			NTB_LOG(ERR, "Failed to clean mw.");
 	}
 }
 
+static inline int
+ntb_handshake_work(const struct rte_rawdev *dev)
+{
+	struct ntb_hw *hw = dev->dev_private;
+	uint32_t val;
+	int ret, i;
+
+	if (hw->ntb_ops->spad_write == NULL ||
+	    hw->ntb_ops->mw_set_trans == NULL) {
+		NTB_LOG(ERR, "Scratchpad/MW setting is not supported.");
+		return -ENOTSUP;
+	}
+
+	/* Tell peer the mw info of local side. */
+	ret = (*hw->ntb_ops->spad_write)(dev, SPAD_NUM_MWS, 1, hw->mw_cnt);
+	if (ret < 0)
+		return ret;
+	for (i = 0; i < hw->mw_cnt; i++) {
+		NTB_LOG(INFO, "Local %u mw size: 0x%"PRIx64"", i,
+				hw->mw_size[i]);
+		val = hw->mw_size[i] >> 32;
+		ret = (*hw->ntb_ops->spad_write)(dev, SPAD_MW0_SZ_H + 2 * i,
+						 1, val);
+		if (ret < 0)
+			return ret;
+		val = hw->mw_size[i];
+		ret = (*hw->ntb_ops->spad_write)(dev, SPAD_MW0_SZ_L + 2 * i,
+						 1, val);
+		if (ret < 0)
+			return ret;
+	}
+
+	/* Tell peer about the queue info and map memory to the peer. */
+	ret = (*hw->ntb_ops->spad_write)(dev, SPAD_Q_SZ, 1, hw->queue_size);
+	if (ret < 0)
+		return ret;
+	ret = (*hw->ntb_ops->spad_write)(dev, SPAD_NUM_QPS, 1,
+					 hw->queue_pairs);
+	if (ret < 0)
+		return ret;
+	ret = (*hw->ntb_ops->spad_write)(dev, SPAD_USED_MWS, 1,
+					 hw->used_mw_num);
+	if (ret < 0)
+		return ret;
+	for (i = 0; i < hw->used_mw_num; i++) {
+		val = (uint64_t)(hw->mz[i]->addr) >> 32;
+		ret = (*hw->ntb_ops->spad_write)(dev, SPAD_MW0_BA_H + 2 * i,
+						 1, val);
+		if (ret < 0)
+			return ret;
+		val = (uint64_t)(hw->mz[i]->addr);
+		ret = (*hw->ntb_ops->spad_write)(dev, SPAD_MW0_BA_L + 2 * i,
+						 1, val);
+		if (ret < 0)
+			return ret;
+	}
+
+	for (i = 0; i < hw->used_mw_num; i++) {
+		ret = (*hw->ntb_ops->mw_set_trans)(dev, i, hw->mz[i]->iova,
+						   hw->mz[i]->len);
+		if (ret < 0)
+			return ret;
+	}
+
+	/* Ring doorbell 0 to tell peer the device is ready. */
+	ret = (*hw->ntb_ops->peer_db_set)(dev, 0);
+	if (ret < 0)
+		return ret;
+
+	return 0;
+}
+
 static void
 ntb_dev_intr_handler(void *param)
 {
 	struct rte_rawdev *dev = (struct rte_rawdev *)param;
 	struct ntb_hw *hw = dev->dev_private;
-	uint32_t mw_size_h, mw_size_l;
+	uint32_t val_h, val_l;
+	uint64_t peer_mw_size;
 	uint64_t db_bits = 0;
+	uint8_t peer_mw_cnt;
 	int i = 0;
 
 	if (hw->ntb_ops->db_read == NULL ||
@@ -118,7 +153,7 @@  ntb_dev_intr_handler(void *param)
 
 	/* Doorbell 0 is for peer device ready. */
 	if (db_bits & 1) {
-		NTB_LOG(DEBUG, "DB0: Peer device is up.");
+		NTB_LOG(INFO, "DB0: Peer device is up.");
 		/* Clear received doorbell. */
 		(*hw->ntb_ops->db_clear)(dev, 1);
 
@@ -129,47 +164,44 @@  ntb_dev_intr_handler(void *param)
 		if (hw->peer_dev_up)
 			return;
 
-		if (hw->ntb_ops->spad_read == NULL ||
-		    hw->ntb_ops->spad_write == NULL) {
-			NTB_LOG(ERR, "Scratchpad is not supported.");
+		if (hw->ntb_ops->spad_read == NULL) {
+			NTB_LOG(ERR, "Scratchpad read is not supported.");
+			return;
+		}
+
+		/* Check if mw setting on the peer is the same as local. */
+		peer_mw_cnt = (*hw->ntb_ops->spad_read)(dev, SPAD_NUM_MWS, 0);
+		if (peer_mw_cnt != hw->mw_cnt) {
+			NTB_LOG(ERR, "Both mw cnt must be the same.");
 			return;
 		}
 
-		hw->peer_mw_cnt = (*hw->ntb_ops->spad_read)
-				  (dev, SPAD_NUM_MWS, 0);
-		hw->peer_mw_size = rte_zmalloc("uint64_t",
-				   hw->peer_mw_cnt * sizeof(uint64_t), 0);
 		for (i = 0; i < hw->mw_cnt; i++) {
-			mw_size_h = (*hw->ntb_ops->spad_read)
-				    (dev, SPAD_MW0_SZ_H + 2 * i, 0);
-			mw_size_l = (*hw->ntb_ops->spad_read)
-				    (dev, SPAD_MW0_SZ_L + 2 * i, 0);
-			hw->peer_mw_size[i] = ((uint64_t)mw_size_h << 32) |
-					      mw_size_l;
+			val_h = (*hw->ntb_ops->spad_read)
+				(dev, SPAD_MW0_SZ_H + 2 * i, 0);
+			val_l = (*hw->ntb_ops->spad_read)
+				(dev, SPAD_MW0_SZ_L + 2 * i, 0);
+			peer_mw_size = ((uint64_t)val_h << 32) | val_l;
 			NTB_LOG(DEBUG, "Peer %u mw size: 0x%"PRIx64"", i,
-					hw->peer_mw_size[i]);
+					peer_mw_size);
+			if (peer_mw_size != hw->mw_size[i]) {
+				NTB_LOG(ERR, "Mw config must be the same.");
+				return;
+			}
 		}
 
 		hw->peer_dev_up = 1;
 
 		/**
-		 * Handshake with peer. Spad_write only works when both
-		 * devices are up. So write spad again when db is received.
-		 * And set db again for the later device who may miss
+		 * Handshake with peer. Spad_write & mw_set_trans only works
+		 * when both devices are up. So write spad again when db is
+		 * received. And set db again for the later device who may miss
 		 * the 1st db.
 		 */
-		for (i = 0; i < hw->mw_cnt; i++) {
-			(*hw->ntb_ops->spad_write)(dev, SPAD_NUM_MWS,
-						   1, hw->mw_cnt);
-			mw_size_h = hw->mw_size[i] >> 32;
-			(*hw->ntb_ops->spad_write)(dev, SPAD_MW0_SZ_H + 2 * i,
-						   1, mw_size_h);
-
-			mw_size_l = hw->mw_size[i];
-			(*hw->ntb_ops->spad_write)(dev, SPAD_MW0_SZ_L + 2 * i,
-						   1, mw_size_l);
+		if (ntb_handshake_work(dev) < 0) {
+			NTB_LOG(ERR, "Handshake work failed.");
+			return;
 		}
-		(*hw->ntb_ops->peer_db_set)(dev, 0);
 
 		/* To get the link info. */
 		if (hw->ntb_ops->get_link_status == NULL) {
@@ -183,7 +215,7 @@  ntb_dev_intr_handler(void *param)
 	}
 
 	if (db_bits & (1 << 1)) {
-		NTB_LOG(DEBUG, "DB1: Peer device is down.");
+		NTB_LOG(INFO, "DB1: Peer device is down.");
 		/* Clear received doorbell. */
 		(*hw->ntb_ops->db_clear)(dev, 2);
 
@@ -197,7 +229,7 @@  ntb_dev_intr_handler(void *param)
 	}
 
 	if (db_bits & (1 << 2)) {
-		NTB_LOG(DEBUG, "DB2: Peer device agrees dev to be down.");
+		NTB_LOG(INFO, "DB2: Peer device agrees dev to be down.");
 		/* Clear received doorbell. */
 		(*hw->ntb_ops->db_clear)(dev, (1 << 2));
 		hw->peer_dev_up = 0;
@@ -206,24 +238,228 @@  ntb_dev_intr_handler(void *param)
 }
 
 static void
-ntb_queue_conf_get(struct rte_rawdev *dev __rte_unused,
-		   uint16_t queue_id __rte_unused,
-		   rte_rawdev_obj_t queue_conf __rte_unused)
+ntb_queue_conf_get(struct rte_rawdev *dev,
+		   uint16_t queue_id,
+		   rte_rawdev_obj_t queue_conf)
+{
+	struct ntb_queue_conf *q_conf = queue_conf;
+	struct ntb_hw *hw = dev->dev_private;
+
+	q_conf->tx_free_thresh = hw->tx_queues[queue_id]->tx_free_thresh;
+	q_conf->nb_desc = hw->rx_queues[queue_id]->nb_rx_desc;
+	q_conf->rx_mp = hw->rx_queues[queue_id]->mpool;
+}
+
+static void
+ntb_rxq_release_mbufs(struct ntb_rx_queue *q)
+{
+	int i;
+
+	if (!q || !q->sw_ring) {
+		NTB_LOG(ERR, "Pointer to rxq or sw_ring is NULL");
+		return;
+	}
+
+	for (i = 0; i < q->nb_rx_desc; i++) {
+		if (q->sw_ring[i].mbuf) {
+			rte_pktmbuf_free_seg(q->sw_ring[i].mbuf);
+			q->sw_ring[i].mbuf = NULL;
+		}
+	}
+}
+
+static void
+ntb_rxq_release(struct ntb_rx_queue *rxq)
+{
+	if (!rxq) {
+		NTB_LOG(ERR, "Pointer to rxq is NULL");
+		return;
+	}
+
+	ntb_rxq_release_mbufs(rxq);
+
+	rte_free(rxq->sw_ring);
+	rte_free(rxq);
+}
+
+static int
+ntb_rxq_setup(struct rte_rawdev *dev,
+	      uint16_t qp_id,
+	      rte_rawdev_obj_t queue_conf)
+{
+	struct ntb_queue_conf *rxq_conf = queue_conf;
+	struct ntb_hw *hw = dev->dev_private;
+	struct ntb_rx_queue *rxq;
+
+	/* Allocate the rx queue data structure */
+	rxq = rte_zmalloc_socket("ntb rx queue",
+				 sizeof(struct ntb_rx_queue),
+				 RTE_CACHE_LINE_SIZE,
+				 dev->socket_id);
+	if (!rxq) {
+		NTB_LOG(ERR, "Failed to allocate memory for "
+			    "rx queue data structure.");
+		return -ENOMEM;
+	}
+
+	if (rxq_conf->rx_mp == NULL) {
+		NTB_LOG(ERR, "Invalid null mempool pointer.");
+		return -EINVAL;
+	}
+	rxq->nb_rx_desc = rxq_conf->nb_desc;
+	rxq->mpool = rxq_conf->rx_mp;
+	rxq->port_id = dev->dev_id;
+	rxq->queue_id = qp_id;
+	rxq->hw = hw;
+
+	/* Allocate the software ring. */
+	rxq->sw_ring =
+		rte_zmalloc_socket("ntb rx sw ring",
+				   sizeof(struct ntb_rx_entry) *
+				   rxq->nb_rx_desc,
+				   RTE_CACHE_LINE_SIZE,
+				   dev->socket_id);
+	if (!rxq->sw_ring) {
+		ntb_rxq_release(rxq);
+		NTB_LOG(ERR, "Failed to allocate memory for SW ring");
+		return -ENOMEM;
+	}
+
+	hw->rx_queues[qp_id] = rxq;
+
+	return 0;
+}
+
+static void
+ntb_txq_release_mbufs(struct ntb_tx_queue *q)
+{
+	int i;
+
+	if (!q || !q->sw_ring) {
+		NTB_LOG(ERR, "Pointer to txq or sw_ring is NULL");
+		return;
+	}
+
+	for (i = 0; i < q->nb_tx_desc; i++) {
+		if (q->sw_ring[i].mbuf) {
+			rte_pktmbuf_free_seg(q->sw_ring[i].mbuf);
+			q->sw_ring[i].mbuf = NULL;
+		}
+	}
+}
+
+static void
+ntb_txq_release(struct ntb_tx_queue *txq)
 {
+	if (!txq) {
+		NTB_LOG(ERR, "Pointer to txq is NULL");
+		return;
+	}
+
+	ntb_txq_release_mbufs(txq);
+
+	rte_free(txq->sw_ring);
+	rte_free(txq);
 }
 
 static int
-ntb_queue_setup(struct rte_rawdev *dev __rte_unused,
-		uint16_t queue_id __rte_unused,
-		rte_rawdev_obj_t queue_conf __rte_unused)
+ntb_txq_setup(struct rte_rawdev *dev,
+	      uint16_t qp_id,
+	      rte_rawdev_obj_t queue_conf)
 {
+	struct ntb_queue_conf *txq_conf = queue_conf;
+	struct ntb_hw *hw = dev->dev_private;
+	struct ntb_tx_queue *txq;
+	uint16_t i, prev;
+
+	/* Allocate the TX queue data structure. */
+	txq = rte_zmalloc_socket("ntb tx queue",
+				  sizeof(struct ntb_tx_queue),
+				  RTE_CACHE_LINE_SIZE,
+				  dev->socket_id);
+	if (!txq) {
+		NTB_LOG(ERR, "Failed to allocate memory for "
+			    "tx queue structure");
+		return -ENOMEM;
+	}
+
+	txq->nb_tx_desc = txq_conf->nb_desc;
+	txq->port_id = dev->dev_id;
+	txq->queue_id = qp_id;
+	txq->hw = hw;
+
+	/* Allocate software ring */
+	txq->sw_ring =
+		rte_zmalloc_socket("ntb tx sw ring",
+				   sizeof(struct ntb_tx_entry) *
+				   txq->nb_tx_desc,
+				   RTE_CACHE_LINE_SIZE,
+				   dev->socket_id);
+	if (!txq->sw_ring) {
+		ntb_txq_release(txq);
+		NTB_LOG(ERR, "Failed to allocate memory for SW TX ring");
+		return -ENOMEM;
+	}
+
+	prev = txq->nb_tx_desc - 1;
+	for (i = 0; i < txq->nb_tx_desc; i++) {
+		txq->sw_ring[i].mbuf = NULL;
+		txq->sw_ring[i].last_id = i;
+		txq->sw_ring[prev].next_id = i;
+		prev = i;
+	}
+
+	txq->tx_free_thresh = txq_conf->tx_free_thresh ?
+			      txq_conf->tx_free_thresh :
+			      NTB_DFLT_TX_FREE_THRESH;
+	if (txq->tx_free_thresh >= txq->nb_tx_desc - 3) {
+		NTB_LOG(ERR, "tx_free_thresh must be less than nb_desc - 3. "
+			"(tx_free_thresh=%u qp_id=%u)", txq->tx_free_thresh,
+			qp_id);
+		return -EINVAL;
+	}
+
+	hw->tx_queues[qp_id] = txq;
+
 	return 0;
 }
 
+
+static int
+ntb_queue_setup(struct rte_rawdev *dev,
+		uint16_t queue_id,
+		rte_rawdev_obj_t queue_conf)
+{
+	struct ntb_hw *hw = dev->dev_private;
+	int ret;
+
+	if (queue_id > hw->queue_pairs)
+		return -EINVAL;
+
+	ret = ntb_txq_setup(dev, queue_id, queue_conf);
+	if (ret < 0)
+		return ret;
+
+	ret = ntb_rxq_setup(dev, queue_id, queue_conf);
+
+	return ret;
+}
+
 static int
-ntb_queue_release(struct rte_rawdev *dev __rte_unused,
-		  uint16_t queue_id __rte_unused)
+ntb_queue_release(struct rte_rawdev *dev, uint16_t queue_id)
 {
+	struct ntb_hw *hw = dev->dev_private;
+	struct ntb_tx_queue *txq;
+	struct ntb_rx_queue *rxq;
+
+	if (queue_id > hw->queue_pairs)
+		return -EINVAL;
+
+	txq = hw->tx_queues[queue_id];
+	rxq = hw->rx_queues[queue_id];
+	ntb_txq_release(txq);
+	ntb_rxq_release(rxq);
+
 	return 0;
 }
 
@@ -234,6 +470,77 @@  ntb_queue_count(struct rte_rawdev *dev)
 	return hw->queue_pairs;
 }
 
+static int
+ntb_queue_init(struct rte_rawdev *dev, uint16_t qp_id)
+{
+	struct ntb_hw *hw = dev->dev_private;
+	struct ntb_rx_queue *rxq = hw->rx_queues[qp_id];
+	struct ntb_tx_queue *txq = hw->tx_queues[qp_id];
+	volatile struct ntb_header *local_hdr;
+	struct ntb_header *remote_hdr;
+	uint16_t q_size = hw->queue_size;
+	uint32_t hdr_offset;
+	void *bar_addr;
+	uint16_t i;
+
+	if (hw->ntb_ops->get_peer_mw_addr == NULL) {
+		NTB_LOG(ERR, "Failed to get mapped peer addr.");
+		return -EINVAL;
+	}
+
+	/* Put queue info into the start of shared memory. */
+	hdr_offset = hw->hdr_size_per_queue * qp_id;
+	local_hdr = (volatile struct ntb_header *)
+		    ((uint64_t)hw->mz[0]->addr + hdr_offset);
+	bar_addr = (*hw->ntb_ops->get_peer_mw_addr)(dev, 0);
+	if (bar_addr == NULL)
+		return -EINVAL;
+	remote_hdr = (struct ntb_header *)
+		     ((uint64_t)bar_addr + hdr_offset);
+
+	/* rxq init. */
+	rxq->rx_desc_ring = (struct ntb_desc *)
+			    (&remote_hdr->desc_ring);
+	rxq->rx_used_ring = (volatile struct ntb_used *)
+			    (&local_hdr->desc_ring[q_size]);
+	rxq->avail_cnt = &remote_hdr->avail_cnt;
+	rxq->used_cnt = &local_hdr->used_cnt;
+
+	for (i = 0; i < rxq->nb_rx_desc - 1; i++) {
+		struct rte_mbuf *mbuf = rte_mbuf_raw_alloc(rxq->mpool);
+		if (unlikely(!mbuf)) {
+			NTB_LOG(ERR, "Failed to allocate mbuf for RX");
+			return -ENOMEM;
+		}
+		mbuf->port = dev->dev_id;
+
+		rxq->sw_ring[i].mbuf = mbuf;
+
+		rxq->rx_desc_ring[i].addr = rte_pktmbuf_mtod(mbuf, uint64_t);
+		rxq->rx_desc_ring[i].len = mbuf->buf_len - RTE_PKTMBUF_HEADROOM;
+	}
+	rte_wmb();
+	*rxq->avail_cnt = rxq->nb_rx_desc - 1;
+	rxq->last_avail = rxq->nb_rx_desc - 1;
+	rxq->last_used = 0;
+
+	/* txq init */
+	txq->tx_desc_ring = (volatile struct ntb_desc *)
+			    (&local_hdr->desc_ring);
+	txq->tx_used_ring = (struct ntb_used *)
+			    (&remote_hdr->desc_ring[q_size]);
+	txq->avail_cnt = &local_hdr->avail_cnt;
+	txq->used_cnt = &remote_hdr->used_cnt;
+
+	rte_wmb();
+	*txq->used_cnt = 0;
+	txq->last_used = 0;
+	txq->last_avail = 0;
+	txq->nb_tx_free = txq->nb_tx_desc - 1;
+
+	return 0;
+}
+
 static int
 ntb_enqueue_bufs(struct rte_rawdev *dev,
 		 struct rte_rawdev_buf **buffers,
@@ -278,58 +585,51 @@  static void
 ntb_dev_info_get(struct rte_rawdev *dev, rte_rawdev_obj_t dev_info)
 {
 	struct ntb_hw *hw = dev->dev_private;
-	struct ntb_attr *ntb_attrs = dev_info;
+	struct ntb_dev_info *info = dev_info;
 
-	strncpy(ntb_attrs[NTB_TOPO_ID].name, NTB_TOPO_NAME, NTB_ATTR_NAME_LEN);
-	switch (hw->topo) {
-	case NTB_TOPO_B2B_DSD:
-		strncpy(ntb_attrs[NTB_TOPO_ID].value, "B2B DSD",
-			NTB_ATTR_VAL_LEN);
-		break;
-	case NTB_TOPO_B2B_USD:
-		strncpy(ntb_attrs[NTB_TOPO_ID].value, "B2B USD",
-			NTB_ATTR_VAL_LEN);
-		break;
-	default:
-		strncpy(ntb_attrs[NTB_TOPO_ID].value, "Unsupported",
-			NTB_ATTR_VAL_LEN);
-	}
+	info->mw_cnt = hw->mw_cnt;
+	info->mw_size = hw->mw_size;
 
-	strncpy(ntb_attrs[NTB_LINK_STATUS_ID].name, NTB_LINK_STATUS_NAME,
-		NTB_ATTR_NAME_LEN);
-	snprintf(ntb_attrs[NTB_LINK_STATUS_ID].value, NTB_ATTR_VAL_LEN,
-		 "%d", hw->link_status);
-
-	strncpy(ntb_attrs[NTB_SPEED_ID].name, NTB_SPEED_NAME,
-		NTB_ATTR_NAME_LEN);
-	snprintf(ntb_attrs[NTB_SPEED_ID].value, NTB_ATTR_VAL_LEN,
-		 "%d", hw->link_speed);
-
-	strncpy(ntb_attrs[NTB_WIDTH_ID].name, NTB_WIDTH_NAME,
-		NTB_ATTR_NAME_LEN);
-	snprintf(ntb_attrs[NTB_WIDTH_ID].value, NTB_ATTR_VAL_LEN,
-		 "%d", hw->link_width);
-
-	strncpy(ntb_attrs[NTB_MW_CNT_ID].name, NTB_MW_CNT_NAME,
-		NTB_ATTR_NAME_LEN);
-	snprintf(ntb_attrs[NTB_MW_CNT_ID].value, NTB_ATTR_VAL_LEN,
-		 "%d", hw->mw_cnt);
+	/**
+	 * Intel hardware requires that mapped memory base address should be
+	 * aligned with EMBARSZ and needs continuous memzone.
+	 */
+	info->mw_size_align = (uint8_t)(hw->pci_dev->id.vendor_id ==
+					NTB_INTEL_VENDOR_ID);
 
-	strncpy(ntb_attrs[NTB_DB_CNT_ID].name, NTB_DB_CNT_NAME,
-		NTB_ATTR_NAME_LEN);
-	snprintf(ntb_attrs[NTB_DB_CNT_ID].value, NTB_ATTR_VAL_LEN,
-		 "%d", hw->db_cnt);
+	if (!hw->queue_size || !hw->queue_pairs) {
+		NTB_LOG(ERR, "No queue size and queue num assigned.");
+		return;
+	}
 
-	strncpy(ntb_attrs[NTB_SPAD_CNT_ID].name, NTB_SPAD_CNT_NAME,
-		NTB_ATTR_NAME_LEN);
-	snprintf(ntb_attrs[NTB_SPAD_CNT_ID].value, NTB_ATTR_VAL_LEN,
-		 "%d", hw->spad_cnt);
+	hw->hdr_size_per_queue = RTE_ALIGN(sizeof(struct ntb_header) +
+				hw->queue_size * sizeof(struct ntb_desc) +
+				hw->queue_size * sizeof(struct ntb_used),
+				RTE_CACHE_LINE_SIZE);
+	info->ntb_hdr_size = hw->hdr_size_per_queue * hw->queue_pairs;
 }
 
 static int
-ntb_dev_configure(const struct rte_rawdev *dev __rte_unused,
-		  rte_rawdev_obj_t config __rte_unused)
+ntb_dev_configure(const struct rte_rawdev *dev, rte_rawdev_obj_t config)
 {
+	struct ntb_dev_config *conf = config;
+	struct ntb_hw *hw = dev->dev_private;
+	int ret;
+
+	hw->queue_pairs	= conf->num_queues;
+	hw->queue_size = conf->queue_size;
+	hw->used_mw_num = conf->mz_num;
+	hw->mz = conf->mz_list;
+	hw->rx_queues = rte_zmalloc("ntb_rx_queues",
+			sizeof(struct ntb_rx_queue *) * hw->queue_pairs, 0);
+	hw->tx_queues = rte_zmalloc("ntb_tx_queues",
+			sizeof(struct ntb_tx_queue *) * hw->queue_pairs, 0);
+
+	/* Start handshake with the peer. */
+	ret = ntb_handshake_work(dev);
+	if (ret < 0)
+		return ret;
+
 	return 0;
 }
 
@@ -337,21 +637,52 @@  static int
 ntb_dev_start(struct rte_rawdev *dev)
 {
 	struct ntb_hw *hw = dev->dev_private;
-	int ret, i;
+	uint32_t peer_base_l, peer_val;
+	uint64_t peer_base_h;
+	uint32_t i;
+	int ret;
 
-	/* TODO: init queues and start queues. */
+	if (!hw->link_status || !hw->peer_dev_up)
+		return -EINVAL;
 
-	/* Map memory of bar_size to remote. */
-	hw->mz = rte_zmalloc("struct rte_memzone *",
-			     hw->mw_cnt * sizeof(struct rte_memzone *), 0);
-	for (i = 0; i < hw->mw_cnt; i++) {
-		ret = ntb_set_mw(dev, i, hw->mw_size[i]);
+	for (i = 0; i < hw->queue_pairs; i++) {
+		ret = ntb_queue_init(dev, i);
 		if (ret) {
-			NTB_LOG(ERR, "Fail to set mw.");
+			NTB_LOG(ERR, "Failed to init queue.");
 			return ret;
 		}
 	}
 
+	hw->peer_mw_base = rte_zmalloc("ntb_peer_mw_base", hw->mw_cnt *
+					sizeof(uint64_t), 0);
+
+	if (hw->ntb_ops->spad_read == NULL)
+		return -ENOTSUP;
+
+	peer_val = (*hw->ntb_ops->spad_read)(dev, SPAD_Q_SZ, 0);
+	if (peer_val != hw->queue_size) {
+		NTB_LOG(ERR, "Inconsistent queue size! (local: %u peer: %u)",
+			hw->queue_size, peer_val);
+		return -EINVAL;
+	}
+
+	peer_val = (*hw->ntb_ops->spad_read)(dev, SPAD_NUM_QPS, 0);
+	if (peer_val != hw->queue_pairs) {
+		NTB_LOG(ERR, "Inconsistent number of queues! (local: %u peer:"
+			" %u)", hw->queue_pairs, peer_val);
+		return -EINVAL;
+	}
+
+	hw->peer_used_mws = (*hw->ntb_ops->spad_read)(dev, SPAD_USED_MWS, 0);
+
+	for (i = 0; i < hw->peer_used_mws; i++) {
+		peer_base_h = (*hw->ntb_ops->spad_read)(dev,
+				SPAD_MW0_BA_H + 2 * i, 0);
+		peer_base_l = (*hw->ntb_ops->spad_read)(dev,
+				SPAD_MW0_BA_L + 2 * i, 0);
+		hw->peer_mw_base[i] = (peer_base_h << 32) + peer_base_l;
+	}
+
 	dev->started = 1;
 
 	return 0;
@@ -361,10 +692,10 @@  static void
 ntb_dev_stop(struct rte_rawdev *dev)
 {
 	struct ntb_hw *hw = dev->dev_private;
+	struct ntb_rx_queue *rxq;
+	struct ntb_tx_queue *txq;
 	uint32_t time_out;
-	int status;
-
-	/* TODO: stop rx/tx queues. */
+	int status, i;
 
 	if (!hw->peer_dev_up)
 		goto clean;
@@ -405,6 +736,13 @@  ntb_dev_stop(struct rte_rawdev *dev)
 	if (status)
 		NTB_LOG(ERR, "Failed to clear doorbells.");
 
+	for (i = 0; i < hw->queue_pairs; i++) {
+		rxq = hw->rx_queues[i];
+		txq = hw->tx_queues[i];
+		ntb_rxq_release_mbufs(rxq);
+		ntb_txq_release_mbufs(txq);
+	}
+
 	dev->started = 0;
 }
 
@@ -413,12 +751,15 @@  ntb_dev_close(struct rte_rawdev *dev)
 {
 	struct ntb_hw *hw = dev->dev_private;
 	struct rte_intr_handle *intr_handle;
-	int ret = 0;
+	int i;
 
 	if (dev->started)
 		ntb_dev_stop(dev);
 
-	/* TODO: free queues. */
+	/* free queues */
+	for (i = 0; i < hw->queue_pairs; i++)
+		ntb_queue_release(dev, i);
+	hw->queue_pairs = 0;
 
 	intr_handle = &hw->pci_dev->intr_handle;
 	/* Clean datapath event and vec mapping */
@@ -434,7 +775,7 @@  ntb_dev_close(struct rte_rawdev *dev)
 	rte_intr_callback_unregister(intr_handle,
 				     ntb_dev_intr_handler, dev);
 
-	return ret;
+	return 0;
 }
 
 static int
@@ -445,7 +786,7 @@  ntb_dev_reset(struct rte_rawdev *rawdev __rte_unused)
 
 static int
 ntb_attr_set(struct rte_rawdev *dev, const char *attr_name,
-				 uint64_t attr_value)
+	     uint64_t attr_value)
 {
 	struct ntb_hw *hw;
 	int index;
@@ -463,7 +804,21 @@  ntb_attr_set(struct rte_rawdev *dev, const char *attr_name,
 		index = atoi(&attr_name[NTB_SPAD_USER_LEN]);
 		(*hw->ntb_ops->spad_write)(dev, hw->spad_user_list[index],
 					   1, attr_value);
-		NTB_LOG(INFO, "Set attribute (%s) Value (%" PRIu64 ")",
+		NTB_LOG(DEBUG, "Set attribute (%s) Value (%" PRIu64 ")",
+			attr_name, attr_value);
+		return 0;
+	}
+
+	if (!strncmp(attr_name, NTB_QUEUE_SZ_NAME, NTB_ATTR_NAME_LEN)) {
+		hw->queue_size = attr_value;
+		NTB_LOG(DEBUG, "Set attribute (%s) Value (%" PRIu64 ")",
+			attr_name, attr_value);
+		return 0;
+	}
+
+	if (!strncmp(attr_name, NTB_QUEUE_NUM_NAME, NTB_ATTR_NAME_LEN)) {
+		hw->queue_pairs = attr_value;
+		NTB_LOG(DEBUG, "Set attribute (%s) Value (%" PRIu64 ")",
 			attr_name, attr_value);
 		return 0;
 	}
@@ -475,7 +830,7 @@  ntb_attr_set(struct rte_rawdev *dev, const char *attr_name,
 
 static int
 ntb_attr_get(struct rte_rawdev *dev, const char *attr_name,
-				 uint64_t *attr_value)
+	     uint64_t *attr_value)
 {
 	struct ntb_hw *hw;
 	int index;
@@ -489,49 +844,50 @@  ntb_attr_get(struct rte_rawdev *dev, const char *attr_name,
 
 	if (!strncmp(attr_name, NTB_TOPO_NAME, NTB_ATTR_NAME_LEN)) {
 		*attr_value = hw->topo;
-		NTB_LOG(INFO, "Attribute (%s) Value (%" PRIu64 ")",
+		NTB_LOG(DEBUG, "Attribute (%s) Value (%" PRIu64 ")",
 			attr_name, *attr_value);
 		return 0;
 	}
 
 	if (!strncmp(attr_name, NTB_LINK_STATUS_NAME, NTB_ATTR_NAME_LEN)) {
-		*attr_value = hw->link_status;
-		NTB_LOG(INFO, "Attribute (%s) Value (%" PRIu64 ")",
+		/* hw->link_status only indicates hw link status. */
+		*attr_value = hw->link_status && hw->peer_dev_up;
+		NTB_LOG(DEBUG, "Attribute (%s) Value (%" PRIu64 ")",
 			attr_name, *attr_value);
 		return 0;
 	}
 
 	if (!strncmp(attr_name, NTB_SPEED_NAME, NTB_ATTR_NAME_LEN)) {
 		*attr_value = hw->link_speed;
-		NTB_LOG(INFO, "Attribute (%s) Value (%" PRIu64 ")",
+		NTB_LOG(DEBUG, "Attribute (%s) Value (%" PRIu64 ")",
 			attr_name, *attr_value);
 		return 0;
 	}
 
 	if (!strncmp(attr_name, NTB_WIDTH_NAME, NTB_ATTR_NAME_LEN)) {
 		*attr_value = hw->link_width;
-		NTB_LOG(INFO, "Attribute (%s) Value (%" PRIu64 ")",
+		NTB_LOG(DEBUG, "Attribute (%s) Value (%" PRIu64 ")",
 			attr_name, *attr_value);
 		return 0;
 	}
 
 	if (!strncmp(attr_name, NTB_MW_CNT_NAME, NTB_ATTR_NAME_LEN)) {
 		*attr_value = hw->mw_cnt;
-		NTB_LOG(INFO, "Attribute (%s) Value (%" PRIu64 ")",
+		NTB_LOG(DEBUG, "Attribute (%s) Value (%" PRIu64 ")",
 			attr_name, *attr_value);
 		return 0;
 	}
 
 	if (!strncmp(attr_name, NTB_DB_CNT_NAME, NTB_ATTR_NAME_LEN)) {
 		*attr_value = hw->db_cnt;
-		NTB_LOG(INFO, "Attribute (%s) Value (%" PRIu64 ")",
+		NTB_LOG(DEBUG, "Attribute (%s) Value (%" PRIu64 ")",
 			attr_name, *attr_value);
 		return 0;
 	}
 
 	if (!strncmp(attr_name, NTB_SPAD_CNT_NAME, NTB_ATTR_NAME_LEN)) {
 		*attr_value = hw->spad_cnt;
-		NTB_LOG(INFO, "Attribute (%s) Value (%" PRIu64 ")",
+		NTB_LOG(DEBUG, "Attribute (%s) Value (%" PRIu64 ")",
 			attr_name, *attr_value);
 		return 0;
 	}
@@ -542,7 +898,7 @@  ntb_attr_get(struct rte_rawdev *dev, const char *attr_name,
 		index = atoi(&attr_name[NTB_SPAD_USER_LEN]);
 		*attr_value = (*hw->ntb_ops->spad_read)(dev,
 				hw->spad_user_list[index], 0);
-		NTB_LOG(INFO, "Attribute (%s) Value (%" PRIu64 ")",
+		NTB_LOG(DEBUG, "Attribute (%s) Value (%" PRIu64 ")",
 			attr_name, *attr_value);
 		return 0;
 	}
@@ -585,6 +941,7 @@  ntb_xstats_reset(struct rte_rawdev *dev __rte_unused,
 	return 0;
 }
 
+
 static const struct rte_rawdev_ops ntb_ops = {
 	.dev_info_get         = ntb_dev_info_get,
 	.dev_configure        = ntb_dev_configure,
@@ -615,7 +972,6 @@  ntb_init_hw(struct rte_rawdev *dev, struct rte_pci_device *pci_dev)
 {
 	struct ntb_hw *hw = dev->dev_private;
 	struct rte_intr_handle *intr_handle;
-	uint32_t val;
 	int ret, i;
 
 	hw->pci_dev = pci_dev;
@@ -688,45 +1044,6 @@  ntb_init_hw(struct rte_rawdev *dev, struct rte_pci_device *pci_dev)
 	/* enable uio intr after callback register */
 	rte_intr_enable(intr_handle);
 
-	if (hw->ntb_ops->spad_write == NULL) {
-		NTB_LOG(ERR, "Scratchpad is not supported.");
-		return -ENOTSUP;
-	}
-	/* Tell peer the mw_cnt of local side. */
-	ret = (*hw->ntb_ops->spad_write)(dev, SPAD_NUM_MWS, 1, hw->mw_cnt);
-	if (ret) {
-		NTB_LOG(ERR, "Failed to tell peer mw count.");
-		return ret;
-	}
-
-	/* Tell peer each mw size on local side. */
-	for (i = 0; i < hw->mw_cnt; i++) {
-		NTB_LOG(DEBUG, "Local %u mw size: 0x%"PRIx64"", i,
-				hw->mw_size[i]);
-		val = hw->mw_size[i] >> 32;
-		ret = (*hw->ntb_ops->spad_write)
-				(dev, SPAD_MW0_SZ_H + 2 * i, 1, val);
-		if (ret) {
-			NTB_LOG(ERR, "Failed to tell peer mw size.");
-			return ret;
-		}
-
-		val = hw->mw_size[i];
-		ret = (*hw->ntb_ops->spad_write)
-				(dev, SPAD_MW0_SZ_L + 2 * i, 1, val);
-		if (ret) {
-			NTB_LOG(ERR, "Failed to tell peer mw size.");
-			return ret;
-		}
-	}
-
-	/* Ring doorbell 0 to tell peer the device is ready. */
-	ret = (*hw->ntb_ops->peer_db_set)(dev, 0);
-	if (ret) {
-		NTB_LOG(ERR, "Failed to tell peer device is probed.");
-		return ret;
-	}
-
 	return ret;
 }
 
@@ -839,5 +1156,5 @@  RTE_INIT(ntb_init_log)
 {
 	ntb_logtype = rte_log_register("pmd.raw.ntb");
 	if (ntb_logtype >= 0)
-		rte_log_set_level(ntb_logtype, RTE_LOG_DEBUG);
+		rte_log_set_level(ntb_logtype, RTE_LOG_INFO);
 }
diff --git a/drivers/raw/ntb/ntb.h b/drivers/raw/ntb/ntb.h
index d355231b0..0ad20aed3 100644
--- a/drivers/raw/ntb/ntb.h
+++ b/drivers/raw/ntb/ntb.h
@@ -2,8 +2,8 @@ 
  * Copyright(c) 2019 Intel Corporation.
  */
 
-#ifndef _NTB_RAWDEV_H_
-#define _NTB_RAWDEV_H_
+#ifndef _NTB_H_
+#define _NTB_H_
 
 #include <stdbool.h>
 
@@ -19,38 +19,13 @@  extern int ntb_logtype;
 /* Device IDs */
 #define NTB_INTEL_DEV_ID_B2B_SKX    0x201C
 
-#define NTB_TOPO_NAME               "topo"
-#define NTB_LINK_STATUS_NAME        "link_status"
-#define NTB_SPEED_NAME              "speed"
-#define NTB_WIDTH_NAME              "width"
-#define NTB_MW_CNT_NAME             "mw_count"
-#define NTB_DB_CNT_NAME             "db_count"
-#define NTB_SPAD_CNT_NAME           "spad_count"
 /* Reserved to app to use. */
 #define NTB_SPAD_USER               "spad_user_"
 #define NTB_SPAD_USER_LEN           (sizeof(NTB_SPAD_USER) - 1)
-#define NTB_SPAD_USER_MAX_NUM       10
+#define NTB_SPAD_USER_MAX_NUM       4
 #define NTB_ATTR_NAME_LEN           30
-#define NTB_ATTR_VAL_LEN            30
-#define NTB_ATTR_MAX                20
-
-/* NTB Attributes */
-struct ntb_attr {
-	/**< Name of the attribute */
-	char name[NTB_ATTR_NAME_LEN];
-	/**< Value or reference of value of attribute */
-	char value[NTB_ATTR_NAME_LEN];
-};
 
-enum ntb_attr_idx {
-	NTB_TOPO_ID = 0,
-	NTB_LINK_STATUS_ID,
-	NTB_SPEED_ID,
-	NTB_WIDTH_ID,
-	NTB_MW_CNT_ID,
-	NTB_DB_CNT_ID,
-	NTB_SPAD_CNT_ID,
-};
+#define NTB_DFLT_TX_FREE_THRESH     256
 
 enum ntb_topo {
 	NTB_TOPO_NONE = 0,
@@ -87,10 +62,15 @@  enum ntb_spad_idx {
 	SPAD_NUM_MWS = 1,
 	SPAD_NUM_QPS,
 	SPAD_Q_SZ,
+	SPAD_USED_MWS,
 	SPAD_MW0_SZ_H,
 	SPAD_MW0_SZ_L,
 	SPAD_MW1_SZ_H,
 	SPAD_MW1_SZ_L,
+	SPAD_MW0_BA_H,
+	SPAD_MW0_BA_L,
+	SPAD_MW1_BA_H,
+	SPAD_MW1_BA_L,
 };
 
 /**
@@ -110,26 +90,97 @@  enum ntb_spad_idx {
  * @vector_bind: Bind vector source [intr] to msix vector [msix].
  */
 struct ntb_dev_ops {
-	int (*ntb_dev_init)(struct rte_rawdev *dev);
-	void *(*get_peer_mw_addr)(struct rte_rawdev *dev, int mw_idx);
-	int (*mw_set_trans)(struct rte_rawdev *dev, int mw_idx,
+	int (*ntb_dev_init)(const struct rte_rawdev *dev);
+	void *(*get_peer_mw_addr)(const struct rte_rawdev *dev, int mw_idx);
+	int (*mw_set_trans)(const struct rte_rawdev *dev, int mw_idx,
 			    uint64_t addr, uint64_t size);
-	int (*get_link_status)(struct rte_rawdev *dev);
-	int (*set_link)(struct rte_rawdev *dev, bool up);
-	uint32_t (*spad_read)(struct rte_rawdev *dev, int spad, bool peer);
-	int (*spad_write)(struct rte_rawdev *dev, int spad,
+	int (*get_link_status)(const struct rte_rawdev *dev);
+	int (*set_link)(const struct rte_rawdev *dev, bool up);
+	uint32_t (*spad_read)(const struct rte_rawdev *dev, int spad,
+			      bool peer);
+	int (*spad_write)(const struct rte_rawdev *dev, int spad,
 			  bool peer, uint32_t spad_v);
-	uint64_t (*db_read)(struct rte_rawdev *dev);
-	int (*db_clear)(struct rte_rawdev *dev, uint64_t db_bits);
-	int (*db_set_mask)(struct rte_rawdev *dev, uint64_t db_mask);
-	int (*peer_db_set)(struct rte_rawdev *dev, uint8_t db_bit);
-	int (*vector_bind)(struct rte_rawdev *dev, uint8_t intr, uint8_t msix);
+	uint64_t (*db_read)(const struct rte_rawdev *dev);
+	int (*db_clear)(const struct rte_rawdev *dev, uint64_t db_bits);
+	int (*db_set_mask)(const struct rte_rawdev *dev, uint64_t db_mask);
+	int (*peer_db_set)(const struct rte_rawdev *dev, uint8_t db_bit);
+	int (*vector_bind)(const struct rte_rawdev *dev, uint8_t intr,
+			   uint8_t msix);
+};
+
+struct ntb_desc {
+	uint64_t addr; /* buffer addr */
+	uint16_t len;  /* buffer length */
+	uint16_t rsv1;
+	uint32_t rsv2;
+};
+
+struct ntb_used {
+	uint16_t len;     /* buffer length */
+#define NTB_FLAG_EOP    1 /* end of packet */
+	uint16_t flags;   /* flags */
+};
+
+struct ntb_rx_entry {
+	struct rte_mbuf *mbuf;
+};
+
+struct ntb_rx_queue {
+	struct ntb_desc *rx_desc_ring;
+	volatile struct ntb_used *rx_used_ring;
+	uint16_t *avail_cnt;
+	volatile uint16_t *used_cnt;
+	uint16_t last_avail;
+	uint16_t last_used;
+	uint16_t nb_rx_desc;
+
+	uint16_t rx_free_thresh;
+
+	struct rte_mempool *mpool; /**< mempool for mbuf allocation */
+	struct ntb_rx_entry *sw_ring;
+
+	uint16_t queue_id;         /**< DPDK queue index. */
+	uint16_t port_id;          /**< Device port identifier. */
+
+	struct ntb_hw *hw;
+};
+
+struct ntb_tx_entry {
+	struct rte_mbuf *mbuf;
+	uint16_t next_id;
+	uint16_t last_id;
+};
+
+struct ntb_tx_queue {
+	volatile struct ntb_desc *tx_desc_ring;
+	struct ntb_used *tx_used_ring;
+	volatile uint16_t *avail_cnt;
+	uint16_t *used_cnt;
+	uint16_t last_avail;          /**< Next need to be free. */
+	uint16_t last_used;           /**< Next need to be sent. */
+	uint16_t nb_tx_desc;
+
+	/**< Total number of TX descriptors ready to be allocated. */
+	uint16_t nb_tx_free;
+	uint16_t tx_free_thresh;
+
+	struct ntb_tx_entry *sw_ring;
+
+	uint16_t queue_id;            /**< DPDK queue index. */
+	uint16_t port_id;             /**< Device port identifier. */
+
+	struct ntb_hw *hw;
+};
+
+struct ntb_header {
+	uint16_t avail_cnt __rte_cache_aligned;
+	uint16_t used_cnt __rte_cache_aligned;
+	struct ntb_desc desc_ring[] __rte_cache_aligned;
 };
 
 /* ntb private data. */
 struct ntb_hw {
 	uint8_t mw_cnt;
-	uint8_t peer_mw_cnt;
 	uint8_t db_cnt;
 	uint8_t spad_cnt;
 
@@ -147,18 +198,26 @@  struct ntb_hw {
 	struct rte_pci_device *pci_dev;
 	char *hw_addr;
 
-	uint64_t *mw_size;
-	uint64_t *peer_mw_size;
 	uint8_t peer_dev_up;
+	uint64_t *mw_size;
+	/* remote mem base addr */
+	uint64_t *peer_mw_base;
 
 	uint16_t queue_pairs;
 	uint16_t queue_size;
+	uint32_t hdr_size_per_queue;
+
+	struct ntb_rx_queue **rx_queues;
+	struct ntb_tx_queue **tx_queues;
 
-	/**< mem zone to populate RX ring. */
+	/* memzone to populate RX ring. */
 	const struct rte_memzone **mz;
+	uint8_t used_mw_num;
+
+	uint8_t peer_used_mws;
 
 	/* Reserve several spad for app to use. */
 	int spad_user_list[NTB_SPAD_USER_MAX_NUM];
 };
 
-#endif /* _NTB_RAWDEV_H_ */
+#endif /* _NTB_H_ */
diff --git a/drivers/raw/ntb/ntb_hw_intel.c b/drivers/raw/ntb/ntb_hw_intel.c
index 21eaa8511..0e73f1609 100644
--- a/drivers/raw/ntb/ntb_hw_intel.c
+++ b/drivers/raw/ntb/ntb_hw_intel.c
@@ -26,7 +26,7 @@  static enum xeon_ntb_bar intel_ntb_bar[] = {
 };
 
 static int
-intel_ntb_dev_init(struct rte_rawdev *dev)
+intel_ntb_dev_init(const struct rte_rawdev *dev)
 {
 	struct ntb_hw *hw = dev->dev_private;
 	uint8_t reg_val, bar;
@@ -77,7 +77,7 @@  intel_ntb_dev_init(struct rte_rawdev *dev)
 	hw->db_cnt = XEON_DB_COUNT;
 	hw->spad_cnt = XEON_SPAD_COUNT;
 
-	hw->mw_size = rte_zmalloc("uint64_t",
+	hw->mw_size = rte_zmalloc("ntb_mw_size",
 				  hw->mw_cnt * sizeof(uint64_t), 0);
 	for (i = 0; i < hw->mw_cnt; i++) {
 		bar = intel_ntb_bar[i];
@@ -94,7 +94,7 @@  intel_ntb_dev_init(struct rte_rawdev *dev)
 }
 
 static void *
-intel_ntb_get_peer_mw_addr(struct rte_rawdev *dev, int mw_idx)
+intel_ntb_get_peer_mw_addr(const struct rte_rawdev *dev, int mw_idx)
 {
 	struct ntb_hw *hw = dev->dev_private;
 	uint8_t bar;
@@ -116,7 +116,7 @@  intel_ntb_get_peer_mw_addr(struct rte_rawdev *dev, int mw_idx)
 }
 
 static int
-intel_ntb_mw_set_trans(struct rte_rawdev *dev, int mw_idx,
+intel_ntb_mw_set_trans(const struct rte_rawdev *dev, int mw_idx,
 		       uint64_t addr, uint64_t size)
 {
 	struct ntb_hw *hw = dev->dev_private;
@@ -163,7 +163,7 @@  intel_ntb_mw_set_trans(struct rte_rawdev *dev, int mw_idx,
 }
 
 static int
-intel_ntb_get_link_status(struct rte_rawdev *dev)
+intel_ntb_get_link_status(const struct rte_rawdev *dev)
 {
 	struct ntb_hw *hw = dev->dev_private;
 	uint16_t reg_val;
@@ -195,7 +195,7 @@  intel_ntb_get_link_status(struct rte_rawdev *dev)
 }
 
 static int
-intel_ntb_set_link(struct rte_rawdev *dev, bool up)
+intel_ntb_set_link(const struct rte_rawdev *dev, bool up)
 {
 	struct ntb_hw *hw = dev->dev_private;
 	uint32_t ntb_ctrl, reg_off;
@@ -221,7 +221,7 @@  intel_ntb_set_link(struct rte_rawdev *dev, bool up)
 }
 
 static uint32_t
-intel_ntb_spad_read(struct rte_rawdev *dev, int spad, bool peer)
+intel_ntb_spad_read(const struct rte_rawdev *dev, int spad, bool peer)
 {
 	struct ntb_hw *hw = dev->dev_private;
 	uint32_t spad_v, reg_off;
@@ -241,7 +241,7 @@  intel_ntb_spad_read(struct rte_rawdev *dev, int spad, bool peer)
 }
 
 static int
-intel_ntb_spad_write(struct rte_rawdev *dev, int spad,
+intel_ntb_spad_write(const struct rte_rawdev *dev, int spad,
 		     bool peer, uint32_t spad_v)
 {
 	struct ntb_hw *hw = dev->dev_private;
@@ -263,7 +263,7 @@  intel_ntb_spad_write(struct rte_rawdev *dev, int spad,
 }
 
 static uint64_t
-intel_ntb_db_read(struct rte_rawdev *dev)
+intel_ntb_db_read(const struct rte_rawdev *dev)
 {
 	struct ntb_hw *hw = dev->dev_private;
 	uint64_t db_off, db_bits;
@@ -278,7 +278,7 @@  intel_ntb_db_read(struct rte_rawdev *dev)
 }
 
 static int
-intel_ntb_db_clear(struct rte_rawdev *dev, uint64_t db_bits)
+intel_ntb_db_clear(const struct rte_rawdev *dev, uint64_t db_bits)
 {
 	struct ntb_hw *hw = dev->dev_private;
 	uint64_t db_off;
@@ -293,7 +293,7 @@  intel_ntb_db_clear(struct rte_rawdev *dev, uint64_t db_bits)
 }
 
 static int
-intel_ntb_db_set_mask(struct rte_rawdev *dev, uint64_t db_mask)
+intel_ntb_db_set_mask(const struct rte_rawdev *dev, uint64_t db_mask)
 {
 	struct ntb_hw *hw = dev->dev_private;
 	uint64_t db_m_off;
@@ -312,7 +312,7 @@  intel_ntb_db_set_mask(struct rte_rawdev *dev, uint64_t db_mask)
 }
 
 static int
-intel_ntb_peer_db_set(struct rte_rawdev *dev, uint8_t db_idx)
+intel_ntb_peer_db_set(const struct rte_rawdev *dev, uint8_t db_idx)
 {
 	struct ntb_hw *hw = dev->dev_private;
 	uint32_t db_off;
@@ -332,7 +332,7 @@  intel_ntb_peer_db_set(struct rte_rawdev *dev, uint8_t db_idx)
 }
 
 static int
-intel_ntb_vector_bind(struct rte_rawdev *dev, uint8_t intr, uint8_t msix)
+intel_ntb_vector_bind(const struct rte_rawdev *dev, uint8_t intr, uint8_t msix)
 {
 	struct ntb_hw *hw = dev->dev_private;
 	uint8_t reg_off;
diff --git a/drivers/raw/ntb/rte_pmd_ntb.h b/drivers/raw/ntb/rte_pmd_ntb.h
new file mode 100644
index 000000000..6591ce793
--- /dev/null
+++ b/drivers/raw/ntb/rte_pmd_ntb.h
@@ -0,0 +1,43 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation.
+ */
+
+#ifndef _RTE_PMD_NTB_H_
+#define _RTE_PMD_NTB_H_
+
+/* App needs to set/get these attrs */
+#define NTB_QUEUE_SZ_NAME           "queue_size"
+#define NTB_QUEUE_NUM_NAME          "queue_num"
+#define NTB_TOPO_NAME               "topo"
+#define NTB_LINK_STATUS_NAME        "link_status"
+#define NTB_SPEED_NAME              "speed"
+#define NTB_WIDTH_NAME              "width"
+#define NTB_MW_CNT_NAME             "mw_count"
+#define NTB_DB_CNT_NAME             "db_count"
+#define NTB_SPAD_CNT_NAME           "spad_count"
+
+#define NTB_MAX_DESC_SIZE           1024
+#define NTB_MIN_DESC_SIZE           64
+
+struct ntb_dev_info {
+	uint32_t ntb_hdr_size;
+	/**< memzone needs to be mw size align or not. */
+	uint8_t mw_size_align;
+	uint8_t mw_cnt;
+	uint64_t *mw_size;
+};
+
+struct ntb_dev_config {
+	uint16_t num_queues;
+	uint16_t queue_size;
+	uint8_t mz_num;
+	const struct rte_memzone **mz_list;
+};
+
+struct ntb_queue_conf {
+	uint16_t nb_desc;
+	uint16_t tx_free_thresh;
+	struct rte_mempool *rx_mp;
+};
+
+#endif /* _RTE_PMD_NTB_H_ */