[RFC,v2] vhost: add support async dequeue for packed ring

Message ID 20210914094143.18116-1-cheng1.jiang@intel.com (mailing list archive)
State Changes Requested, archived
Delegated to: Maxime Coquelin
Headers
Series [RFC,v2] vhost: add support async dequeue for packed ring |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation warning apply issues

Commit Message

Jiang, Cheng1 Sept. 14, 2021, 9:41 a.m. UTC
  This patch implements asynchronous dequeue data path for packed ring.

Signed-off-by: Cheng Jiang <cheng1.jiang@intel.com>
---
It's based on these 2 patches:
1. vhost: remove copy threshold for async vhost
http://patches.dpdk.org/project/dpdk/patch/1629463466-450012-1-git-send-email-jiayu.hu@intel.com/
2. vhost: support async dequeue for split ring
http://patches.dpdk.org/project/dpdk/patch/20210906204837.112466-2-wenwux.ma@intel.com/

v2:
 * fixed some issues

 lib/vhost/virtio_net.c | 325 ++++++++++++++++++++++++++++++++++++++---
 1 file changed, 302 insertions(+), 23 deletions(-)
  

Patch

diff --git a/lib/vhost/virtio_net.c b/lib/vhost/virtio_net.c
index e0159b53e3..9a842ce8f4 100644
--- a/lib/vhost/virtio_net.c
+++ b/lib/vhost/virtio_net.c
@@ -1654,7 +1654,7 @@  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 }
 
 static __rte_always_inline void
-vhost_update_used_packed(struct vhost_virtqueue *vq,
+vhost_enqueue_update_used_packed(struct vhost_virtqueue *vq,
 			struct vring_used_elem_packed *shadow_ring,
 			uint16_t count)
 {
@@ -1970,22 +1970,66 @@  write_back_completed_descs_split(struct vhost_virtqueue *vq, uint16_t n_descs)
 	} while (nr_left > 0);
 }
 
+static __rte_always_inline void
+vhost_dequeue_update_used_packed(struct vhost_virtqueue *vq,
+			struct vring_used_elem_packed *shadow_ring,
+			uint16_t count)
+{
+	uint16_t i;
+	uint16_t flags;
+	uint16_t head_idx = vq->last_used_idx;
+	uint16_t head_flags = 0;
+
+	for (i = 0; i < count; i++)
+		vq->desc_packed[vq->last_used_idx + i].id = shadow_ring[i].id;
+
+	/* The ordering for storing desc flags needs to be enforced. */
+	rte_atomic_thread_fence(__ATOMIC_RELEASE);
+
+	for (i = 0; i < count; i++) {
+		flags = vq->desc_packed[vq->last_used_idx].flags;
+		if (vq->used_wrap_counter) {
+			flags |= VRING_DESC_F_USED;
+			flags |= VRING_DESC_F_AVAIL;
+		} else {
+			flags &= ~VRING_DESC_F_USED;
+			flags &= ~VRING_DESC_F_AVAIL;
+		}
+
+		if (i > 0)
+			vq->desc_packed[vq->last_used_idx].flags = flags;
+		else
+			head_flags = flags;
+
+		vq_inc_last_used_packed(vq, 1);
+	}
+
+	vq->desc_packed[head_idx].flags = head_flags;
+}
+
 static __rte_always_inline void
 write_back_completed_descs_packed(struct vhost_virtqueue *vq,
-				uint16_t n_buffers)
+				uint16_t n_buffers, bool is_txq)
 {
 	uint16_t nr_left = n_buffers;
 	uint16_t from, to;
+	void (*update_used_packed)(struct vhost_virtqueue *vq,
+				struct vring_used_elem_packed *shadow_ring, uint16_t count);
+
+	if (is_txq)
+		update_used_packed = vhost_enqueue_update_used_packed;
+	else
+		update_used_packed = vhost_dequeue_update_used_packed;
 
 	do {
 		from = vq->last_async_buffer_idx_packed;
 		to = (from + nr_left) % vq->size;
 		if (to > from) {
-			vhost_update_used_packed(vq, vq->async_buffers_packed + from, to - from);
+			update_used_packed(vq, vq->async_buffers_packed + from, to - from);
 			vq->last_async_buffer_idx_packed += nr_left;
 			nr_left = 0;
 		} else {
-			vhost_update_used_packed(vq, vq->async_buffers_packed + from,
+			update_used_packed(vq, vq->async_buffers_packed + from,
 				vq->size - from);
 			vq->last_async_buffer_idx_packed = 0;
 			nr_left -= vq->size - from;
@@ -2049,7 +2093,7 @@  vhost_poll_enqueue_completed(struct virtio_net *dev, uint16_t queue_id,
 
 	if (likely(vq->enabled && vq->access_ok)) {
 		if (vq_is_packed(dev)) {
-			write_back_completed_descs_packed(vq, n_buffers);
+			write_back_completed_descs_packed(vq, n_buffers, 1);
 
 			vhost_vring_call_packed(dev, vq);
 		} else {
@@ -3328,7 +3372,7 @@  async_desc_to_mbuf(struct virtio_net *dev,
 }
 
 static __rte_always_inline uint16_t
-async_poll_dequeue_completed_split(struct virtio_net *dev,
+async_poll_dequeue_completed(struct virtio_net *dev,
 		struct vhost_virtqueue *vq, uint16_t queue_id,
 		struct rte_mbuf **pkts, uint16_t count, bool legacy_ol_flags)
 {
@@ -3336,7 +3380,7 @@  async_poll_dequeue_completed_split(struct virtio_net *dev,
 	uint16_t start_idx, pkt_idx, from;
 	struct async_inflight_info *pkts_info;
 
-	pkt_idx = vq->async_pkts_idx & (vq->size - 1);
+	pkt_idx = vq->async_pkts_idx % vq->size;
 	pkts_info = vq->async_pkts_info;
 	start_idx = virtio_dev_rx_async_get_info_idx(pkt_idx, vq->size,
 			vq->async_pkts_inflight_n);
@@ -3360,7 +3404,7 @@  async_poll_dequeue_completed_split(struct virtio_net *dev,
 	n_pkts_put = RTE_MIN(count, n_pkts_cpl);
 
 	for (pkt_idx = 0; pkt_idx < n_pkts_put; pkt_idx++) {
-		from = (start_idx + pkt_idx) & (vq->size - 1);
+		from = (start_idx + pkt_idx) % vq->size;
 		pkts[pkt_idx] = pkts_info[from].mbuf;
 
 		if (pkts_info[from].nethdr.valid) {
@@ -3370,9 +3414,14 @@  async_poll_dequeue_completed_split(struct virtio_net *dev,
 	}
 
 	/* write back completed descs to used ring and update used idx */
-	write_back_completed_descs_split(vq, n_pkts_put);
-	__atomic_add_fetch(&vq->used->idx, n_pkts_put, __ATOMIC_RELEASE);
-	vhost_vring_call_split(dev, vq);
+	if (vq_is_packed(dev)) {
+		write_back_completed_descs_packed(vq, n_pkts_put, 0);
+		vhost_vring_call_packed(dev, vq);
+	} else {
+		write_back_completed_descs_split(vq, n_pkts_put);
+		__atomic_add_fetch(&vq->used->idx, n_pkts_put, __ATOMIC_RELEASE);
+		vhost_vring_call_split(dev, vq);
+	}
 
 	vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
 	vq->async_pkts_inflight_n -= n_pkts_put;
@@ -3554,10 +3603,9 @@  virtio_dev_tx_async_split(struct virtio_net *dev,
 	vq->async_pkts_idx += pkt_idx;
 
 out:
-	if (vq->async_pkts_inflight_n > 0) {
-		nr_done_pkts = async_poll_dequeue_completed_split(dev, vq,
-					queue_id, pkts, count, legacy_ol_flags);
-	}
+	if (vq->async_pkts_inflight_n > 0)
+		nr_done_pkts = async_poll_dequeue_completed(dev, vq, queue_id, pkts,
+					count, legacy_ol_flags);
 
 	return nr_done_pkts;
 }
@@ -3584,6 +3632,230 @@  virtio_dev_tx_async_split_compliant(struct virtio_net *dev,
 				pkts, count, false);
 }
 
+static __rte_always_inline void
+vhost_async_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
+				   uint16_t buf_id)
+{
+	uint16_t idx = vq->async_buffer_idx_packed;
+
+	vq->async_buffers_packed[idx].id  = buf_id;
+	vq->async_buffers_packed[idx].len = 0;
+
+	vq->async_buffer_idx_packed++;
+	if (vq->async_buffer_idx_packed >= vq->size)
+		vq->async_buffer_idx_packed -= vq->size;
+
+}
+
+static __rte_always_inline int
+virtio_dev_tx_async_single_packed(struct virtio_net *dev,
+			struct vhost_virtqueue *vq,
+			struct rte_mempool *mbuf_pool,
+			struct rte_mbuf *pkts,
+			struct iovec *src_iovec, struct iovec *dst_iovec,
+			struct rte_vhost_iov_iter *src_it,
+			struct rte_vhost_iov_iter *dst_it,
+			struct async_nethdr *nethdr,
+			int nr_iovec)
+{
+	int err;
+	uint16_t buf_id, desc_count = 0;
+	uint16_t nr_vec = 0;
+	uint32_t buf_len;
+	struct buf_vector buf_vec[BUF_VECTOR_MAX];
+	static bool allocerr_warned;
+
+	if (unlikely(fill_vec_buf_packed(dev, vq,
+					 vq->last_avail_idx, &desc_count,
+					 buf_vec, &nr_vec,
+					 &buf_id, &buf_len,
+					 VHOST_ACCESS_RO) < 0))
+		return -1;
+
+	if (unlikely(virtio_dev_pktmbuf_prep(dev, pkts, buf_len))) {
+		if (!allocerr_warned) {
+			VHOST_LOG_DATA(ERR,
+				"Failed mbuf alloc of size %d from %s on %s.\n",
+				buf_len, mbuf_pool->name, dev->ifname);
+			allocerr_warned = true;
+		}
+		return -1;
+	}
+
+	err = async_desc_to_mbuf(dev, buf_vec, nr_vec, pkts, mbuf_pool,
+				src_iovec, dst_iovec, src_it, dst_it, nethdr, nr_iovec);
+	if (unlikely(err)) {
+		rte_pktmbuf_free(pkts);
+		if (!allocerr_warned) {
+			VHOST_LOG_DATA(ERR,
+				"Failed to copy desc to mbuf on %s.\n",
+				dev->ifname);
+			allocerr_warned = true;
+		}
+		return -1;
+	}
+
+	/* update async shadow packed ring */
+	vhost_async_shadow_dequeue_single_packed(vq, buf_id);
+
+	return err;
+}
+
+static __rte_always_inline uint16_t
+virtio_dev_tx_async_packed(struct virtio_net *dev,
+		struct vhost_virtqueue *vq, uint16_t queue_id,
+		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+		uint16_t count, bool legacy_ol_flags)
+{
+	uint16_t pkt_idx;
+	uint16_t nr_async_burst = 0;
+	uint16_t slot_idx = 0;
+	uint16_t nr_done_pkts = 0;
+	uint16_t pkt_err = 0;
+	uint16_t iovec_idx = 0, it_idx = 0;
+	struct rte_vhost_iov_iter *it_pool = vq->it_pool;
+	struct iovec *vec_pool = vq->vec_pool;
+	struct iovec *src_iovec = vec_pool;
+	struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
+	struct rte_vhost_async_desc tdes[MAX_PKT_BURST];
+	struct async_inflight_info *pkts_info = vq->async_pkts_info;
+	struct rte_mbuf *pkts_prealloc[MAX_PKT_BURST];
+
+	VHOST_LOG_DATA(DEBUG, "(%d) about to dequeue %u buffers\n",
+			dev->vid, count);
+
+	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts_prealloc, count))
+		goto out;
+
+	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
+		struct rte_mbuf *pkt = pkts_prealloc[pkt_idx];
+
+		rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
+
+		slot_idx = (vq->async_pkts_idx + pkt_idx) % vq->size;
+		if (unlikely(virtio_dev_tx_async_single_packed(dev, vq, mbuf_pool, pkt,
+				&src_iovec[iovec_idx], &dst_iovec[iovec_idx], &it_pool[it_idx],
+				&it_pool[it_idx + 1], &pkts_info[slot_idx].nethdr,
+				(VHOST_MAX_ASYNC_VEC >> 1) - iovec_idx))) {
+			rte_pktmbuf_free_bulk(&pkts_prealloc[pkt_idx], count - pkt_idx);
+			break;
+		}
+
+		async_fill_desc(&tdes[nr_async_burst], &it_pool[it_idx], &it_pool[it_idx + 1]);
+		pkts_info[slot_idx].mbuf = pkt;
+		nr_async_burst++;
+
+		iovec_idx += it_pool[it_idx].nr_segs;
+		it_idx += 2;
+
+		vq_inc_last_avail_packed(vq, 1);
+
+		if (unlikely((nr_async_burst >= VHOST_ASYNC_BATCH_THRESHOLD) ||
+				((VHOST_MAX_ASYNC_VEC >> 1) - iovec_idx < BUF_VECTOR_MAX))) {
+			uint16_t nr_pkts;
+			int32_t ret;
+
+			ret = vq->async_ops.transfer_data(dev->vid, queue_id,
+					tdes, 0, nr_async_burst);
+			if (unlikely(ret < 0)) {
+				VHOST_LOG_DATA(ERR, "(%d) async channel submit error\n", dev->vid);
+				ret = 0;
+			}
+			nr_pkts = ret;
+
+			vq->async_pkts_inflight_n += nr_pkts;
+			it_idx = 0;
+			iovec_idx = 0;
+
+			if (unlikely(nr_pkts < nr_async_burst)) {
+				pkt_err = nr_async_burst - nr_pkts;
+				nr_async_burst = 0;
+				pkt_idx++;
+				break;
+			}
+			nr_async_burst = 0;
+		}
+	}
+
+	if (nr_async_burst) {
+		uint16_t nr_pkts;
+		int32_t ret;
+
+		ret = vq->async_ops.transfer_data(dev->vid, queue_id, tdes, 0, nr_async_burst);
+		if (unlikely(ret < 0)) {
+			VHOST_LOG_DATA(ERR, "(%d) async channel submit error\n", dev->vid);
+			ret = 0;
+		}
+		nr_pkts = ret;
+
+		vq->async_pkts_inflight_n += nr_pkts;
+
+		if (unlikely(nr_pkts < nr_async_burst))
+			pkt_err = nr_async_burst - nr_pkts;
+	}
+
+	if (unlikely(pkt_err)) {
+		uint16_t nr_err_dma = pkt_err;
+
+		pkt_idx -= nr_err_dma;
+
+		/**
+		 * recover DMA-copy related structures and free pktmbufs
+		 * for DMA-error pkts.
+		 */
+		if (vq->async_buffer_idx_packed >= nr_err_dma)
+			vq->async_buffer_idx_packed -= nr_err_dma;
+		else
+			vq->async_buffer_idx_packed += vq->size - nr_err_dma;
+
+		while (nr_err_dma-- > 0) {
+			rte_pktmbuf_free(pkts_info[slot_idx % vq->size].mbuf);
+			slot_idx--;
+		}
+
+		/* recover available ring */
+		if (vq->last_avail_idx >= pkt_err) {
+			vq->last_avail_idx -= pkt_err;
+		} else {
+			vq->last_avail_idx += vq->size - pkt_err;
+			vq->avail_wrap_counter ^= 1;
+		}
+	}
+
+	vq->async_pkts_idx += pkt_idx;
+	if (vq->async_pkts_idx >= vq->size)
+		vq->async_pkts_idx -= vq->size;
+
+out:
+	if (vq->async_pkts_inflight_n > 0)
+		nr_done_pkts = async_poll_dequeue_completed(dev, vq,
+					queue_id, pkts, count, legacy_ol_flags);
+
+	return nr_done_pkts;
+}
+
+__rte_noinline
+static uint16_t
+virtio_dev_tx_async_packed_legacy(struct virtio_net *dev,
+		struct vhost_virtqueue *vq, uint16_t queue_id,
+		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+		uint16_t count)
+{
+	return virtio_dev_tx_async_packed(dev, vq, queue_id, mbuf_pool,
+				pkts, count, true);
+}
+
+__rte_noinline
+static uint16_t
+virtio_dev_tx_async_packed_compliant(struct virtio_net *dev,
+		struct vhost_virtqueue *vq, uint16_t queue_id,
+		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+		uint16_t count)
+{
+	return virtio_dev_tx_async_packed(dev, vq, queue_id, mbuf_pool,
+				pkts, count, false);
+}
+
 uint16_t
 rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
@@ -3669,15 +3941,22 @@  rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
 		count -= 1;
 	}
 
-	if (unlikely(vq_is_packed(dev)))
-		return 0;
+	if (vq_is_packed(dev)) {
+		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
+			count = virtio_dev_tx_async_packed_legacy(dev, vq, queue_id,
+					mbuf_pool, pkts, count);
+		else
+			count = virtio_dev_tx_async_packed_compliant(dev, vq, queue_id,
+					mbuf_pool, pkts, count);
+	} else {
+		if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
+			count = virtio_dev_tx_async_split_legacy(dev, vq, queue_id,
+					mbuf_pool, pkts, count);
+		else
+			count = virtio_dev_tx_async_split_compliant(dev, vq, queue_id,
+					mbuf_pool, pkts, count);
+	}
 
-	if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
-		count = virtio_dev_tx_async_split_legacy(dev, vq, queue_id,
-				mbuf_pool, pkts, count);
-	else
-		count = virtio_dev_tx_async_split_compliant(dev, vq, queue_id,
-				mbuf_pool, pkts, count);
 
 out:
 	*nr_inflight = vq->async_pkts_inflight_n;