[v3,10/15] vhost: optimize enqueue function of packed ring

Message ID 20190925171329.63734-11-yong.liu@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Maxime Coquelin
Headers
Series vhost packed ring performance optimization |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation fail Compilation issues

Commit Message

Marvin Liu Sept. 25, 2019, 5:13 p.m. UTC
  Optimize vhost device Tx datapath by separate functions. Packets can be
filled into one descriptor will be handled by batch and others will be
handled one by one as before.

Signed-off-by: Marvin Liu <yong.liu@intel.com>
  

Comments

Tiwei Bie Sept. 26, 2019, 5:56 a.m. UTC | #1
On Thu, Sep 26, 2019 at 01:13:24AM +0800, Marvin Liu wrote:
>  static __rte_noinline uint32_t
>  virtio_dev_rx_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
>  	struct rte_mbuf **pkts, uint32_t count)
>  {
>  	uint32_t pkt_idx = 0;
> -	uint16_t num_buffers;
> -	struct buf_vector buf_vec[BUF_VECTOR_MAX];
> -
> -	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
> -		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
> -		uint16_t nr_vec = 0;
> -		uint16_t nr_descs = 0;
> -
> -		if (unlikely(reserve_avail_buf_packed(dev, vq,
> -						pkt_len, buf_vec, &nr_vec,
> -						&num_buffers, &nr_descs) < 0)) {
> -			VHOST_LOG_DEBUG(VHOST_DATA,
> -				"(%d) failed to get enough desc from vring\n",
> -				dev->vid);
> -			vq->shadow_used_idx -= num_buffers;
> -			break;
> +	uint32_t remained = count;
> +
> +	do {
> +		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx &
> +			(vq->size - 1)]);

You can't assume packed ring size is a power of 2.
  

Patch

diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index 1b0fa2c64..c485e7f49 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -753,64 +753,6 @@  fill_vec_buf_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	return 0;
 }
 
-/*
- * Returns -1 on fail, 0 on success
- */
-static inline int
-reserve_avail_buf_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
-				uint32_t size, struct buf_vector *buf_vec,
-				uint16_t *nr_vec, uint16_t *num_buffers,
-				uint16_t *nr_descs)
-{
-	uint16_t avail_idx;
-	uint16_t vec_idx = 0;
-	uint16_t max_tries, tries = 0;
-
-	uint16_t buf_id = 0;
-	uint32_t len = 0;
-	uint16_t desc_count;
-
-	*num_buffers = 0;
-	avail_idx = vq->last_avail_idx;
-
-	if (rxvq_is_mergeable(dev))
-		max_tries = vq->size - 1;
-	else
-		max_tries = 1;
-
-	while (size > 0) {
-		/*
-		 * if we tried all available ring items, and still
-		 * can't get enough buf, it means something abnormal
-		 * happened.
-		 */
-		if (unlikely(++tries > max_tries))
-			return -1;
-
-		if (unlikely(fill_vec_buf_packed(dev, vq,
-						avail_idx, &desc_count,
-						buf_vec, &vec_idx,
-						&buf_id, &len,
-						VHOST_ACCESS_RW) < 0))
-			return -1;
-
-		len = RTE_MIN(len, size);
-		update_shadow_used_ring_packed(vq, buf_id, len, desc_count);
-		size -= len;
-
-		avail_idx += desc_count;
-		if (avail_idx >= vq->size)
-			avail_idx -= vq->size;
-
-		*nr_descs += desc_count;
-		*num_buffers += 1;
-	}
-
-	*nr_vec = vec_idx;
-
-	return 0;
-}
-
 static __rte_noinline void
 copy_vnet_hdr_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
 		struct buf_vector *buf_vec,
@@ -1089,7 +1031,7 @@  virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	return pkt_idx;
 }
 
-static __rte_unused int
+static __rte_always_inline int
 virtio_dev_rx_batch_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	 struct rte_mbuf **pkts)
 {
@@ -1176,7 +1118,7 @@  virtio_dev_rx_batch_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	return 0;
 }
 
-static __rte_unused int16_t
+static __rte_always_inline int16_t
 virtio_dev_rx_single_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	struct rte_mbuf *pkt)
 {
@@ -1205,52 +1147,36 @@  virtio_dev_rx_single_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	return 0;
 }
 
-
 static __rte_noinline uint32_t
 virtio_dev_rx_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
 	struct rte_mbuf **pkts, uint32_t count)
 {
 	uint32_t pkt_idx = 0;
-	uint16_t num_buffers;
-	struct buf_vector buf_vec[BUF_VECTOR_MAX];
-
-	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
-		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
-		uint16_t nr_vec = 0;
-		uint16_t nr_descs = 0;
-
-		if (unlikely(reserve_avail_buf_packed(dev, vq,
-						pkt_len, buf_vec, &nr_vec,
-						&num_buffers, &nr_descs) < 0)) {
-			VHOST_LOG_DEBUG(VHOST_DATA,
-				"(%d) failed to get enough desc from vring\n",
-				dev->vid);
-			vq->shadow_used_idx -= num_buffers;
-			break;
+	uint32_t remained = count;
+
+	do {
+		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx &
+			(vq->size - 1)]);
+		if (remained >= PACKED_BATCH_SIZE) {
+			if (!virtio_dev_rx_batch_packed(dev, vq, pkts)) {
+				pkt_idx += PACKED_BATCH_SIZE;
+				remained -= PACKED_BATCH_SIZE;
+				continue;
+			}
 		}
 
-		VHOST_LOG_DEBUG(VHOST_DATA, "(%d) current index %d | end index %d\n",
-			dev->vid, vq->last_avail_idx,
-			vq->last_avail_idx + num_buffers);
-
-		if (copy_mbuf_to_desc(dev, vq, pkts[pkt_idx],
-						buf_vec, nr_vec,
-						num_buffers) < 0) {
-			vq->shadow_used_idx -= num_buffers;
+		if (virtio_dev_rx_single_packed(dev, vq, pkts[pkt_idx]))
 			break;
-		}
 
-		vq->last_avail_idx += nr_descs;
-		if (vq->last_avail_idx >= vq->size) {
-			vq->last_avail_idx -= vq->size;
-			vq->avail_wrap_counter ^= 1;
-		}
-	}
-
-	do_data_copy_enqueue(dev, vq);
+		pkt_idx++;
+		remained--;
+	} while (pkt_idx < count);
 
-	if (likely(vq->shadow_used_idx)) {
-		flush_enqueue_shadow_used_ring_packed(dev, vq);
+	if (pkt_idx) {
+		if (vq->shadow_used_idx) {
+			do_data_copy_enqueue(dev, vq);
+			flush_enqueue_shadow_used_ring_packed(dev, vq);
+		}
 		vhost_vring_call_packed(dev, vq);
 	}