[v4,12/14] vhost: optimize dequeue function of packed ring

Message ID 20191009133849.69002-13-yong.liu@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Maxime Coquelin
Headers
Series vhost packed ring performance optimization |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Marvin Liu Oct. 9, 2019, 1:38 p.m. UTC
  Optimize vhost device Rx datapath by separate functions. No-chained
and direct descriptors will be handled by batch and other will be
handled one by one as before.

Signed-off-by: Marvin Liu <yong.liu@intel.com>
  

Patch

diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index deb9d0e39..56c2080fb 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -184,17 +184,6 @@  flush_dequeue_shadow_used_ring_packed(struct virtio_net *dev,
 	vhost_log_cache_sync(dev, vq);
 }
 
-static __rte_always_inline void
-update_shadow_used_ring_packed(struct vhost_virtqueue *vq,
-			 uint16_t desc_idx, uint32_t len, uint16_t count)
-{
-	uint16_t i = vq->shadow_used_idx++;
-
-	vq->shadow_used_packed[i].id  = desc_idx;
-	vq->shadow_used_packed[i].len = len;
-	vq->shadow_used_packed[i].count = count;
-}
-
 static __rte_always_inline void
 flush_used_batch_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	uint64_t *lens, uint16_t *ids, uint16_t flags)
@@ -378,7 +367,7 @@  flush_enqueue_packed(struct virtio_net *dev,
 	}
 }
 
-static __rte_unused void
+static __rte_always_inline void
 flush_dequeue_packed(struct virtio_net *dev, struct vhost_virtqueue *vq)
 {
 	if (!vq->shadow_used_idx)
@@ -1784,7 +1773,7 @@  vhost_dequeue_batch_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	return -1;
 }
 
-static __rte_unused int
+static __rte_always_inline int
 virtio_dev_tx_batch_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts)
 {
@@ -1859,7 +1848,7 @@  vhost_dequeue_single_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	return 0;
 }
 
-static __rte_unused int
+static __rte_always_inline int
 virtio_dev_tx_single_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts)
 {
@@ -1881,7 +1870,7 @@  virtio_dev_tx_single_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	return 0;
 }
 
-static __rte_unused int
+static __rte_always_inline int
 virtio_dev_tx_batch_packed_zmbuf(struct virtio_net *dev,
 					struct vhost_virtqueue *vq,
 					struct rte_mempool *mbuf_pool,
@@ -1940,7 +1929,7 @@  virtio_dev_tx_batch_packed_zmbuf(struct virtio_net *dev,
 	return -1;
 }
 
-static __rte_unused int
+static __rte_always_inline int
 virtio_dev_tx_single_packed_zmbuf(struct virtio_net *dev,
 	struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool,
 	struct rte_mbuf **pkts)
@@ -2017,118 +2006,73 @@  free_zmbuf(struct vhost_virtqueue *vq)
 }
 
 static __rte_noinline uint16_t
-virtio_dev_tx_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
-	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
+virtio_dev_tx_packed_zmbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
+	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint32_t count)
 {
-	uint16_t i;
-
-	if (unlikely(dev->dequeue_zero_copy)) {
-		struct zcopy_mbuf *zmbuf, *next;
-
-		for (zmbuf = TAILQ_FIRST(&vq->zmbuf_list);
-		     zmbuf != NULL; zmbuf = next) {
-			next = TAILQ_NEXT(zmbuf, next);
+	uint32_t pkt_idx = 0;
+	uint32_t remained = count;
 
-			if (mbuf_is_consumed(zmbuf->mbuf)) {
-				update_shadow_used_ring_packed(vq,
-						zmbuf->desc_idx,
-						0,
-						zmbuf->desc_count);
+	free_zmbuf(vq);
 
-				TAILQ_REMOVE(&vq->zmbuf_list, zmbuf, next);
-				restore_mbuf(zmbuf->mbuf);
-				rte_pktmbuf_free(zmbuf->mbuf);
-				put_zmbuf(zmbuf);
-				vq->nr_zmbuf -= 1;
+	do {
+		if (remained >= PACKED_BATCH_SIZE) {
+			if (virtio_dev_tx_batch_packed_zmbuf(dev, vq,
+							     mbuf_pool,
+							     &pkts[pkt_idx])) {
+				pkt_idx += PACKED_BATCH_SIZE;
+				remained -= PACKED_BATCH_SIZE;
+				continue;
 			}
 		}
+		if (virtio_dev_tx_single_packed_zmbuf(dev, vq, mbuf_pool,
+						      &pkts[pkt_idx]))
+			break;
 
-		if (likely(vq->shadow_used_idx)) {
-			flush_dequeue_shadow_used_ring_packed(dev, vq);
-			vhost_vring_call_packed(dev, vq);
-		}
-	}
-
-	VHOST_LOG_DEBUG(VHOST_DATA, "(%d) %s\n", dev->vid, __func__);
+		pkt_idx++;
+		remained--;
+	} while (remained);
 
-	count = RTE_MIN(count, MAX_PKT_BURST);
-	VHOST_LOG_DEBUG(VHOST_DATA, "(%d) about to dequeue %u buffers\n",
-			dev->vid, count);
+	if (pkt_idx)
+		vhost_vring_call_packed(dev, vq);
 
-	for (i = 0; i < count; i++) {
-		struct buf_vector buf_vec[BUF_VECTOR_MAX];
-		uint16_t buf_id;
-		uint32_t dummy_len;
-		uint16_t desc_count, nr_vec = 0;
-		int err;
+	return pkt_idx;
+}
 
-		if (unlikely(fill_vec_buf_packed(dev, vq,
-						vq->last_avail_idx, &desc_count,
-						buf_vec, &nr_vec,
-						&buf_id, &dummy_len,
-						VHOST_ACCESS_RO) < 0))
-			break;
+static __rte_noinline uint16_t
+virtio_dev_tx_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
+	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint32_t count)
+{
+	uint32_t pkt_idx = 0;
+	uint32_t remained = count;
 
-		if (likely(dev->dequeue_zero_copy == 0))
-			update_shadow_used_ring_packed(vq, buf_id, 0,
-					desc_count);
+	do {
+		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
 
-		pkts[i] = rte_pktmbuf_alloc(mbuf_pool);
-		if (unlikely(pkts[i] == NULL)) {
-			RTE_LOG(ERR, VHOST_DATA,
-				"Failed to allocate memory for mbuf.\n");
-			break;
+		if (remained >= PACKED_BATCH_SIZE) {
+			if (!virtio_dev_tx_batch_packed(dev, vq, mbuf_pool,
+							&pkts[pkt_idx])) {
+				flush_dequeue_packed(dev, vq);
+				pkt_idx += PACKED_BATCH_SIZE;
+				remained -= PACKED_BATCH_SIZE;
+				continue;
+			}
 		}
 
-		err = copy_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts[i],
-				mbuf_pool);
-		if (unlikely(err)) {
-			rte_pktmbuf_free(pkts[i]);
+		if (virtio_dev_tx_single_packed(dev, vq, mbuf_pool,
+						&pkts[pkt_idx]))
 			break;
-		}
-
-		if (unlikely(dev->dequeue_zero_copy)) {
-			struct zcopy_mbuf *zmbuf;
-
-			zmbuf = get_zmbuf(vq);
-			if (!zmbuf) {
-				rte_pktmbuf_free(pkts[i]);
-				break;
-			}
-			zmbuf->mbuf = pkts[i];
-			zmbuf->desc_idx = buf_id;
-			zmbuf->desc_count = desc_count;
 
-			/*
-			 * Pin lock the mbuf; we will check later to see
-			 * whether the mbuf is freed (when we are the last
-			 * user) or not. If that's the case, we then could
-			 * update the used ring safely.
-			 */
-			rte_mbuf_refcnt_update(pkts[i], 1);
-
-			vq->nr_zmbuf += 1;
-			TAILQ_INSERT_TAIL(&vq->zmbuf_list, zmbuf, next);
-		}
-
-		vq->last_avail_idx += desc_count;
-		if (vq->last_avail_idx >= vq->size) {
-			vq->last_avail_idx -= vq->size;
-			vq->avail_wrap_counter ^= 1;
-		}
-	}
+		pkt_idx++;
+		remained--;
+		flush_dequeue_packed(dev, vq);
+	} while (remained);
 
-	if (likely(dev->dequeue_zero_copy == 0)) {
-		do_data_copy_dequeue(vq);
-		if (unlikely(i < count))
-			vq->shadow_used_idx = i;
-		if (likely(vq->shadow_used_idx)) {
-			flush_dequeue_shadow_used_ring_packed(dev, vq);
-			vhost_vring_call_packed(dev, vq);
-		}
+	if (pkt_idx) {
+		if (vq->shadow_used_idx)
+			do_data_copy_dequeue(vq);
 	}
 
-	return i;
+	return pkt_idx;
 }
 
 uint16_t
@@ -2204,9 +2148,14 @@  rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
 		count -= 1;
 	}
 
-	if (vq_is_packed(dev))
-		count = virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count);
-	else
+	if (vq_is_packed(dev)) {
+		if (unlikely(dev->dequeue_zero_copy))
+			count = virtio_dev_tx_packed_zmbuf(dev, vq, mbuf_pool,
+							   pkts, count);
+		else
+			count = virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts,
+						     count);
+	} else
 		count = virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count);
 
 out: