[RFC,v3,5/6] net/af_xdp: enable zero copy

Message ID 20180816144321.17719-6-qi.z.zhang@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Ferruh Yigit
Headers
Series [RFC,v3,1/6] net/af_xdp: new PMD driver |

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/Intel-compilation fail Compilation issues

Commit Message

Qi Zhang Aug. 16, 2018, 2:43 p.m. UTC
  Try to check if external mempool (from rx_queue_setup) is fit for
af_xdp, if it is, it will be registered to af_xdp socket directly and
there will be no packet data copy on Rx and Tx.

Signed-off-by: Qi Zhang <qi.z.zhang@intel.com>
---
 drivers/net/af_xdp/rte_eth_af_xdp.c | 158 +++++++++++++++++++++++++-----------
 1 file changed, 112 insertions(+), 46 deletions(-)
  

Patch

diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c b/drivers/net/af_xdp/rte_eth_af_xdp.c
index 69bc38536..c78c66a8c 100644
--- a/drivers/net/af_xdp/rte_eth_af_xdp.c
+++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
@@ -73,6 +73,7 @@  struct xdp_umem {
 	struct xdp_umem_uqueue fq;
 	struct xdp_umem_uqueue cq;
 	struct rte_mempool *mb_pool; /* be used to manage the buffer */
+	uint8_t zc;
 	int fd;
 };
 
@@ -258,6 +259,7 @@  struct pkt_rx_queue {
 	unsigned long rx_dropped;
 
 	struct pkt_tx_queue *pair;
+	uint8_t zc;
 };
 
 struct pkt_tx_queue {
@@ -366,20 +368,24 @@  eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 		char *pkt;
 		uint64_t addr = descs[i].addr;
 
-		mbuf = rte_pktmbuf_alloc(rxq->mb_pool);
-		rte_pktmbuf_pkt_len(mbuf) =
-			rte_pktmbuf_data_len(mbuf) =
-			descs[i].len;
-		if (mbuf) {
-			pkt = get_pkt_data(rxq->umem, addr);
-			memcpy(rte_pktmbuf_mtod(mbuf, void *),
-			       pkt, descs[i].len);
-			rx_bytes += descs[i].len;
-			bufs[count++] = mbuf;
+		if (!rxq->zc) {
+			mbuf = rte_pktmbuf_alloc(rxq->mb_pool);
+			rte_pktmbuf_pkt_len(mbuf) =
+				rte_pktmbuf_data_len(mbuf) =
+				descs[i].len;
+			if (mbuf) {
+				pkt = get_pkt_data(rxq->umem, addr);
+				memcpy(rte_pktmbuf_mtod(mbuf, void *),
+					pkt, descs[i].len);
+					rx_bytes += descs[i].len;
+					bufs[count++] = mbuf;
+			} else {
+				dropped++;
+			}
+			rte_pktmbuf_free(addr_to_mbuf(rxq->umem, addr));
 		} else {
-			dropped++;
+			bufs[count++] = addr_to_mbuf(rxq->umem, addr);
 		}
-		rte_pktmbuf_free(addr_to_mbuf(rxq->umem, addr));
 	}
 
 	rxq->rx_pkts += (rcvd - dropped);
@@ -425,14 +431,17 @@  static uint16_t
 eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 {
 	struct pkt_tx_queue *txq = queue;
+	struct xdp_umem *umem = txq->pair->umem;
 	struct xdp_uqueue *uq = &txq->tx;
 	struct xdp_umem_uqueue *cq = &txq->pair->umem->cq;
+	struct rte_mempool *mp = umem->mb_pool;
 	struct rte_mbuf *mbuf;
 	struct xdp_desc descs[ETH_AF_XDP_TX_BATCH_SIZE];
 	uint64_t addrs[ETH_AF_XDP_TX_BATCH_SIZE];
 	struct rte_mbuf *bufs_to_fill[ETH_AF_XDP_TX_BATCH_SIZE];
+	struct rte_mbuf *bufs_to_free[ETH_AF_XDP_TX_BATCH_SIZE];
 	unsigned long tx_bytes = 0;
-	int i, valid, n;
+	int i, valid, n, free, fill;
 
 	nb_pkts = nb_pkts < ETH_AF_XDP_TX_BATCH_SIZE ?
 		  nb_pkts : ETH_AF_XDP_TX_BATCH_SIZE;
@@ -446,39 +455,57 @@  eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 	}
 
 	valid = 0;
+	free = 0;
+	fill = 0;
 	for (i = 0; i < nb_pkts; i++) {
-		char *pkt;
-		unsigned int buf_len =
-			ETH_AF_XDP_FRAME_SIZE - ETH_AF_XDP_DATA_HEADROOM;
 		mbuf = bufs[i];
-		if (mbuf->pkt_len <= buf_len) {
-			bufs_to_fill[valid] =
-				rte_pktmbuf_alloc(txq->pair->umem->mb_pool);
-			if (!bufs_to_fill[valid])
-				break;
-			descs[valid].addr =
-				mbuf_to_addr(txq->pair->umem,
-						bufs_to_fill[valid]);
+		/* mbuf is in shared mempool, zero copy */
+		if (txq->pair->zc && bufs[i]->pool == mp) {
+			descs[valid].addr = mbuf_to_addr(umem, mbuf);
 			descs[valid].len = mbuf->pkt_len;
 			descs[valid].options = 0;
-			pkt = get_pkt_data(txq->pair->umem, descs[valid].addr);
-			memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *),
-			       descs[i].len);
 			valid++;
 			tx_bytes += mbuf->pkt_len;
+		} else {
+			char *pkt;
+			unsigned int buf_len =
+				ETH_AF_XDP_FRAME_SIZE -
+				ETH_AF_XDP_DATA_HEADROOM;
+			if (mbuf->pkt_len <= buf_len) {
+
+				bufs_to_fill[fill] = rte_pktmbuf_alloc(mp);
+				if (bufs_to_fill[fill] == NULL) {
+					bufs_to_free[free++] = mbuf;
+					continue;
+				}
+
+				descs[valid].addr =
+					mbuf_to_addr(umem, bufs_to_fill[fill]);
+				fill++;
+				descs[valid].len = mbuf->pkt_len;
+				descs[valid].options = 0;
+				pkt = get_pkt_data(umem, descs[valid].addr);
+				memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *),
+					descs[i].len);
+				valid++;
+				tx_bytes += mbuf->pkt_len;
+			}
+			bufs_to_free[free++] = mbuf;
 		}
 	}
 
 	if (xq_enq(uq, descs, valid)) {
-		for (i = 0; i < valid; i++)
+		/* if failed, all tmp mbufs need to be free */
+		for (i = 0; i < fill; i++)
 			rte_pktmbuf_free(bufs_to_fill[i]);
 		nb_pkts = 0;
 		valid = 0;
 		tx_bytes = 0;
 	} else {
+		/* if passed, original mbuf need to be free */
+		for (i = 0; i < free; i++)
+			rte_pktmbuf_free(bufs_to_free[i]);
 		kick_tx(txq);
-		for (i = 0; i < nb_pkts; i++)
-			rte_pktmbuf_free(bufs[i]);
 	}
 
 	txq->err_pkts += (nb_pkts - valid);
@@ -641,7 +668,7 @@  static inline uint64_t get_len(struct rte_mempool *mp)
 	return (uint64_t)(memhdr->len);
 }
 
-static struct xdp_umem *xdp_umem_configure(int sfd)
+static struct xdp_umem *xdp_umem_configure(int sfd, struct rte_mempool *mb_pool)
 {
 	int fq_size = ETH_AF_XDP_FQ_NUM_DESCS;
 	int cq_size = ETH_AF_XDP_CQ_NUM_DESCS;
@@ -655,18 +682,24 @@  static struct xdp_umem *xdp_umem_configure(int sfd)
 	if (!umem)
 		return NULL;
 
-	snprintf(pool_name, 0x100, "%s_%d", "af_xdp_ring", sfd);
-	umem->mb_pool = rte_pktmbuf_pool_create_with_flags(
-			pool_name, ETH_AF_XDP_NUM_BUFFERS,
-			250, 0,
-			ETH_AF_XDP_FRAME_SIZE - ETH_AF_XDP_MBUF_OVERHEAD,
-			MEMPOOL_F_NO_SPREAD | MEMPOOL_F_PAGE_ALIGN,
-			SOCKET_ID_ANY);
-
-	if (!umem->mb_pool || umem->mb_pool->nb_mem_chunks != 1) {
-		RTE_LOG(ERR, PMD,
-			"Failed to create rte_mempool\n");
-		goto err;
+	if (!mb_pool) {
+		snprintf(pool_name, 0x100, "%s_%d", "af_xdp_ring", sfd);
+		umem->mb_pool = rte_pktmbuf_pool_create_with_flags(
+				pool_name, ETH_AF_XDP_NUM_BUFFERS,
+				250, 0,
+				ETH_AF_XDP_FRAME_SIZE -
+				ETH_AF_XDP_MBUF_OVERHEAD,
+				MEMPOOL_F_NO_SPREAD | MEMPOOL_F_PAGE_ALIGN,
+				SOCKET_ID_ANY);
+
+		if (!umem->mb_pool || umem->mb_pool->nb_mem_chunks != 1) {
+			RTE_LOG(ERR, PMD,
+				"Failed to create rte_mempool\n");
+			goto err;
+		}
+	} else {
+		umem->mb_pool = mb_pool;
+		umem->zc = 1;
 	}
 
 	mr.addr = get_base_addr(umem->mb_pool);
@@ -753,9 +786,34 @@  static struct xdp_umem *xdp_umem_configure(int sfd)
 
 }
 
+static uint8_t
+check_mempool_zc(struct rte_mempool *mp)
+{
+	RTE_ASSERT(mp);
+
+	/* must continues */
+	if (mp->nb_mem_chunks > 1)
+		return 0;
+
+	/* check header size */
+	if (mp->header_size != RTE_CACHE_LINE_SIZE)
+		return 0;
+
+	/* check base address */
+	if ((uint64_t)get_base_addr(mp) % getpagesize() != 0)
+		return 0;
+
+	/* check chunk size */
+	if ((mp->elt_size + mp->header_size + mp->trailer_size) %
+		ETH_AF_XDP_FRAME_SIZE != 0)
+		return 0;
+
+	return 1;
+}
+
 static int
 xsk_configure(struct pkt_rx_queue *rxq, int ring_size,
-		struct xdp_umem *umem)
+		struct xdp_umem *umem, struct rte_mempool *mb_pool)
 {
 	struct pkt_tx_queue *txq = rxq->pair;
 	struct xdp_mmap_offsets off;
@@ -767,7 +825,8 @@  xsk_configure(struct pkt_rx_queue *rxq, int ring_size,
 		return -1;
 
 	if (!umem) {
-		rxq->umem = xdp_umem_configure(rxq->xsk_fd);
+		mb_pool = check_mempool_zc(mb_pool) ? mb_pool : NULL;
+		rxq->umem = xdp_umem_configure(rxq->xsk_fd, mb_pool);
 		if (!rxq->umem)
 			goto err;
 		new_umem = 1;
@@ -918,7 +977,7 @@  eth_rx_queue_setup(struct rte_eth_dev *dev,
 
 	rxq->mb_pool = mb_pool;
 
-	if (xsk_configure(rxq, nb_rx_desc, internals->umem_share)) {
+	if (xsk_configure(rxq, nb_rx_desc, internals->umem_share, mb_pool)) {
 		RTE_LOG(ERR, PMD,
 			"Failed to configure xdp socket\n");
 		return -EINVAL;
@@ -945,6 +1004,13 @@  eth_rx_queue_setup(struct rte_eth_dev *dev,
 	if (!internals->umem_share)
 		internals->umem_share = rxq->umem;
 
+	if (mb_pool == internals->umem_share->mb_pool)
+		rxq->zc = internals->umem_share->zc;
+
+	if (rxq->zc)
+		RTE_LOG(INFO, PMD,
+			"zero copy enabled on rx queue %d\n", rx_queue_id);
+
 	internals->umem_share_count++;
 	map_fd = bpf_map_get_fd_by_id(internals->xsk_map_id);