[dpdk-dev,v2,5/5] lib_vhost: Add support copying scattered mbuf to vring

Message ID 1432826207-8428-6-git-send-email-changchun.ouyang@intel.com (mailing list archive)
State Superseded, archived
Headers

Commit Message

Ouyang Changchun May 28, 2015, 3:16 p.m. UTC
  Add support copying scattered mbuf to vring which is done by dev_scatter_rx,
and check the 'next' pointer in mbuf on the fly to select suitable function to rx packets.

Signed-off-by: Changchun Ouyang <changchun.ouyang@intel.com>
---
 lib/librte_vhost/vhost_rxtx.c | 116 +++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 115 insertions(+), 1 deletion(-)
  

Comments

Huawei Xie May 31, 2015, 9:10 a.m. UTC | #1
virtio_dev_rx & scatter_rx & merge-able rx should be merged and the code
could be much simpler, unless there is special performance consideration.


On 5/28/2015 11:17 PM, Ouyang, Changchun wrote:
> Add support copying scattered mbuf to vring which is done by dev_scatter_rx,
> and check the 'next' pointer in mbuf on the fly to select suitable function to rx packets.
>
> Signed-off-by: Changchun Ouyang <changchun.ouyang@intel.com>
> ---
>  lib/librte_vhost/vhost_rxtx.c | 116 +++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 115 insertions(+), 1 deletion(-)
>
> diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
> index bb56ae1..3086bb4 100644
> --- a/lib/librte_vhost/vhost_rxtx.c
> +++ b/lib/librte_vhost/vhost_rxtx.c
> @@ -46,7 +46,8 @@
>   * This function adds buffers to the virtio devices RX virtqueue. Buffers can
>   * be received from the physical port or from another virtio device. A packet
>   * count is returned to indicate the number of packets that are succesfully
> - * added to the RX queue. This function works when mergeable is disabled.
> + * added to the RX queue. This function works when mergeable is disabled and
> + * the mbuf is not scattered.
>   */
>  static inline uint32_t __attribute__((always_inline))
>  virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
> @@ -447,6 +448,103 @@ fill_buf_vec(struct vhost_virtqueue *vq, uint16_t id, uint32_t *vec_idx)
>  }
>  
>  /*
> + * This function works for scatter-gather RX.
> + */
> +static inline uint32_t __attribute__((always_inline))
> +virtio_dev_scatter_rx(struct virtio_net *dev, uint16_t queue_id,
> +	struct rte_mbuf **pkts, uint32_t count)
> +{
> +	struct vhost_virtqueue *vq;
> +	uint32_t pkt_idx = 0, entry_success = 0;
> +	uint16_t avail_idx;
> +	uint16_t res_base_idx, res_end_idx;
> +	uint8_t success = 0;
> +
> +	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_scatter_rx()\n",
> +		dev->device_fh);
use __func__
> +	if (unlikely(queue_id != VIRTIO_RXQ))
> +		LOG_DEBUG(VHOST_DATA, "mq isn't supported in this version.\n");
> +
> +	vq = dev->virtqueue[VIRTIO_RXQ];
> +	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
> +
> +	if (count == 0)
> +		return 0;
> +
> +	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
> +		uint32_t secure_len = 0;
> +		uint32_t vec_idx = 0;
> +		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + vq->vhost_hlen;
> +
> +		do {
> +			/*
> +			 * As many data cores may want access to available
> +			 * buffers, they need to be reserved.
> +			 */
> +			res_base_idx = vq->last_used_idx_res;
> +			avail_idx = *((volatile uint16_t *)&vq->avail->idx);
> +
> +			if (unlikely(res_base_idx == avail_idx)) {
> +				LOG_DEBUG(VHOST_DATA,
> +					"(%"PRIu64") Failed "
> +					"to get enough desc from "
> +					"vring\n",
> +					dev->device_fh);
> +				return pkt_idx;
> +			} else {
> +				uint16_t wrapped_idx =
> +					(res_base_idx) & (vq->size - 1);
> +				uint32_t idx = vq->avail->ring[wrapped_idx];
> +
> +				update_secure_len(vq, idx, &secure_len);
> +			}
> +
> +			if (pkt_len > secure_len) {
> +				LOG_DEBUG(VHOST_DATA,
> +					"(%"PRIu64") Failed "
> +					"to get enough desc from "
> +					"vring\n",
> +					dev->device_fh);
> +				return pkt_idx;
> +			}
The behavior for virtio_dev_rx and virtio_dev_merge_rx is totally
different. I think they should behave in the same way.
virtio_dev_rx updates used->len to zero while this one returns immediately.

Besides, with this implementation, if the caller retransmit the
mbuf(which has pkt_len larger the secure_len), it will enter endless loop.

> +
> +			/* vq->last_used_idx_res is atomically updated. */
> +			success = rte_atomic16_cmpset(&vq->last_used_idx_res,
> +							res_base_idx,
> +							res_base_idx + 1);
> +		} while (success == 0);

Here the behavior becomes different again in reserving vring entries.

> +
> +		fill_buf_vec(vq, res_base_idx, &vec_idx);
> +
> +		res_end_idx = res_base_idx + 1;
> +
> +		entry_success = copy_from_mbuf_to_vring(dev, res_base_idx,
> +			res_end_idx, pkts[pkt_idx]);
> +
> +		rte_compiler_barrier();
> +
> +		/*
> +		 * Wait until it's our turn to add our buffer
> +		 * to the used ring.
> +		 */
> +		while (unlikely(vq->last_used_idx != res_base_idx))
> +			rte_pause();
> +
> +		*(volatile uint16_t *)&vq->used->idx += entry_success;
> +		vq->last_used_idx = res_end_idx;
> +
> +		/* flush used->idx update before we read avail->flags. */
> +		rte_mb();
> +
> +		/* Kick the guest if necessary. */
> +		if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
> +			eventfd_write((int)vq->callfd, 1);
> +	}
> +
> +	return count;
> +}
> +
> +/*
>   * This function works for mergeable RX.
>   */
>  static inline uint32_t __attribute__((always_inline))
> @@ -545,12 +643,28 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
>  	return count;
>  }
>  
> +/*
> + * Return 1 if any mbuf is scattered, otherwise return 0.
> + */
> +static inline uint32_t __attribute__((always_inline))
> +check_scatter(struct rte_mbuf **pkts, uint16_t count)
> +{
> +	uint32_t i;
> +	for (i = 0; i < count; i++) {
> +		if (pkts[i]->next != NULL)
> +			return 1;
> +	}
> +	return 0;
> +}
> +
>  uint16_t
>  rte_vhost_enqueue_burst(struct virtio_net *dev, uint16_t queue_id,
>  	struct rte_mbuf **pkts, uint16_t count)
>  {
>  	if (unlikely(dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF)))
>  		return virtio_dev_merge_rx(dev, queue_id, pkts, count);
> +	else if (unlikely(check_scatter(pkts, count) == 1))
> +		return virtio_dev_scatter_rx(dev, queue_id, pkts, count);
>  	else
>  		return virtio_dev_rx(dev, queue_id, pkts, count);
>  }
  
Ouyang Changchun May 31, 2015, 1:07 p.m. UTC | #2
> -----Original Message-----
> From: Xie, Huawei
> Sent: Sunday, May 31, 2015 5:11 PM
> To: Ouyang, Changchun; dev@dpdk.org
> Cc: Cao, Waterman
> Subject: Re: [PATCH v2 5/5] lib_vhost: Add support copying scattered mbuf
> to vring
> 
> virtio_dev_rx & scatter_rx & merge-able rx should be merged and the code
> could be much simpler, unless there is special performance consideration.
>
Then, any specific suggestion on how to merge them?
I do consider the performance influence here, so I think it deserve to have 3 implementation for different cases.
 
> 
> On 5/28/2015 11:17 PM, Ouyang, Changchun wrote:
> > Add support copying scattered mbuf to vring which is done by
> > dev_scatter_rx, and check the 'next' pointer in mbuf on the fly to select
> suitable function to rx packets.
> >
> > Signed-off-by: Changchun Ouyang <changchun.ouyang@intel.com>
> > ---
> >  lib/librte_vhost/vhost_rxtx.c | 116
> > +++++++++++++++++++++++++++++++++++++++++-
> >  1 file changed, 115 insertions(+), 1 deletion(-)
> >
> > diff --git a/lib/librte_vhost/vhost_rxtx.c
> > b/lib/librte_vhost/vhost_rxtx.c index bb56ae1..3086bb4 100644
> > --- a/lib/librte_vhost/vhost_rxtx.c
> > +++ b/lib/librte_vhost/vhost_rxtx.c
> > @@ -46,7 +46,8 @@
> >   * This function adds buffers to the virtio devices RX virtqueue. Buffers can
> >   * be received from the physical port or from another virtio device. A
> packet
> >   * count is returned to indicate the number of packets that are
> > succesfully
> > - * added to the RX queue. This function works when mergeable is disabled.
> > + * added to the RX queue. This function works when mergeable is
> > + disabled and
> > + * the mbuf is not scattered.
> >   */
> >  static inline uint32_t __attribute__((always_inline))
> > virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id, @@ -447,6
> > +448,103 @@ fill_buf_vec(struct vhost_virtqueue *vq, uint16_t id,
> > uint32_t *vec_idx)  }
> >
> >  /*
> > + * This function works for scatter-gather RX.
> > + */
> > +static inline uint32_t __attribute__((always_inline))
> > +virtio_dev_scatter_rx(struct virtio_net *dev, uint16_t queue_id,
> > +	struct rte_mbuf **pkts, uint32_t count) {
> > +	struct vhost_virtqueue *vq;
> > +	uint32_t pkt_idx = 0, entry_success = 0;
> > +	uint16_t avail_idx;
> > +	uint16_t res_base_idx, res_end_idx;
> > +	uint8_t success = 0;
> > +
> > +	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_scatter_rx()\n",
> > +		dev->device_fh);
> use __func__
> > +	if (unlikely(queue_id != VIRTIO_RXQ))
> > +		LOG_DEBUG(VHOST_DATA, "mq isn't supported in this
> version.\n");
> > +
> > +	vq = dev->virtqueue[VIRTIO_RXQ];
> > +	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
> > +
> > +	if (count == 0)
> > +		return 0;
> > +
> > +	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
> > +		uint32_t secure_len = 0;
> > +		uint32_t vec_idx = 0;
> > +		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + vq->vhost_hlen;
> > +
> > +		do {
> > +			/*
> > +			 * As many data cores may want access to available
> > +			 * buffers, they need to be reserved.
> > +			 */
> > +			res_base_idx = vq->last_used_idx_res;
> > +			avail_idx = *((volatile uint16_t *)&vq->avail->idx);
> > +
> > +			if (unlikely(res_base_idx == avail_idx)) {
> > +				LOG_DEBUG(VHOST_DATA,
> > +					"(%"PRIu64") Failed "
> > +					"to get enough desc from "
> > +					"vring\n",
> > +					dev->device_fh);
> > +				return pkt_idx;
> > +			} else {
> > +				uint16_t wrapped_idx =
> > +					(res_base_idx) & (vq->size - 1);
> > +				uint32_t idx = vq->avail->ring[wrapped_idx];
> > +
> > +				update_secure_len(vq, idx, &secure_len);
> > +			}
> > +
> > +			if (pkt_len > secure_len) {
> > +				LOG_DEBUG(VHOST_DATA,
> > +					"(%"PRIu64") Failed "
> > +					"to get enough desc from "
> > +					"vring\n",
> > +					dev->device_fh);
> > +				return pkt_idx;
> > +			}
> The behavior for virtio_dev_rx and virtio_dev_merge_rx is totally different. I
> think they should behave in the same way.
Why they have to work as same way?

> virtio_dev_rx updates used->len to zero while this one returns immediately.
> 
Yes, if it is uncompleted packets, I think it comes from your comments about dropping the packets if the room is not
Big enough to contain the whole packet.

> Besides, with this implementation, if the caller retransmit the mbuf(which
> has pkt_len larger the secure_len), it will enter endless loop.
Why the caller retransmit the mbuf? I think this is caller's bad, then endless loop just catch that issue in caller.

> 
> > +
> > +			/* vq->last_used_idx_res is atomically updated. */
> > +			success = rte_atomic16_cmpset(&vq-
> >last_used_idx_res,
> > +							res_base_idx,
> > +							res_base_idx + 1);
> > +		} while (success == 0);
> 
> Here the behavior becomes different again in reserving vring entries.
> 
> > +
> > +		fill_buf_vec(vq, res_base_idx, &vec_idx);
> > +
> > +		res_end_idx = res_base_idx + 1;
> > +
> > +		entry_success = copy_from_mbuf_to_vring(dev,
> res_base_idx,
> > +			res_end_idx, pkts[pkt_idx]);
> > +
> > +		rte_compiler_barrier();
> > +
> > +		/*
> > +		 * Wait until it's our turn to add our buffer
> > +		 * to the used ring.
> > +		 */
> > +		while (unlikely(vq->last_used_idx != res_base_idx))
> > +			rte_pause();
> > +
> > +		*(volatile uint16_t *)&vq->used->idx += entry_success;
> > +		vq->last_used_idx = res_end_idx;
> > +
> > +		/* flush used->idx update before we read avail->flags. */
> > +		rte_mb();
> > +
> > +		/* Kick the guest if necessary. */
> > +		if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
> > +			eventfd_write((int)vq->callfd, 1);
> > +	}
> > +
> > +	return count;
> > +}
> > +
> > +/*
> >   * This function works for mergeable RX.
> >   */
> >  static inline uint32_t __attribute__((always_inline)) @@ -545,12
> > +643,28 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t
> queue_id,
> >  	return count;
> >  }
> >
> > +/*
> > + * Return 1 if any mbuf is scattered, otherwise return 0.
> > + */
> > +static inline uint32_t __attribute__((always_inline))
> > +check_scatter(struct rte_mbuf **pkts, uint16_t count) {
> > +	uint32_t i;
> > +	for (i = 0; i < count; i++) {
> > +		if (pkts[i]->next != NULL)
> > +			return 1;
> > +	}
> > +	return 0;
> > +}
> > +
> >  uint16_t
> >  rte_vhost_enqueue_burst(struct virtio_net *dev, uint16_t queue_id,
> >  	struct rte_mbuf **pkts, uint16_t count)  {
> >  	if (unlikely(dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF)))
> >  		return virtio_dev_merge_rx(dev, queue_id, pkts, count);
> > +	else if (unlikely(check_scatter(pkts, count) == 1))
> > +		return virtio_dev_scatter_rx(dev, queue_id, pkts, count);
> >  	else
> >  		return virtio_dev_rx(dev, queue_id, pkts, count);  }
  

Patch

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index bb56ae1..3086bb4 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -46,7 +46,8 @@ 
  * This function adds buffers to the virtio devices RX virtqueue. Buffers can
  * be received from the physical port or from another virtio device. A packet
  * count is returned to indicate the number of packets that are succesfully
- * added to the RX queue. This function works when mergeable is disabled.
+ * added to the RX queue. This function works when mergeable is disabled and
+ * the mbuf is not scattered.
  */
 static inline uint32_t __attribute__((always_inline))
 virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
@@ -447,6 +448,103 @@  fill_buf_vec(struct vhost_virtqueue *vq, uint16_t id, uint32_t *vec_idx)
 }
 
 /*
+ * This function works for scatter-gather RX.
+ */
+static inline uint32_t __attribute__((always_inline))
+virtio_dev_scatter_rx(struct virtio_net *dev, uint16_t queue_id,
+	struct rte_mbuf **pkts, uint32_t count)
+{
+	struct vhost_virtqueue *vq;
+	uint32_t pkt_idx = 0, entry_success = 0;
+	uint16_t avail_idx;
+	uint16_t res_base_idx, res_end_idx;
+	uint8_t success = 0;
+
+	LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_scatter_rx()\n",
+		dev->device_fh);
+	if (unlikely(queue_id != VIRTIO_RXQ))
+		LOG_DEBUG(VHOST_DATA, "mq isn't supported in this version.\n");
+
+	vq = dev->virtqueue[VIRTIO_RXQ];
+	count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
+
+	if (count == 0)
+		return 0;
+
+	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
+		uint32_t secure_len = 0;
+		uint32_t vec_idx = 0;
+		uint32_t pkt_len = pkts[pkt_idx]->pkt_len + vq->vhost_hlen;
+
+		do {
+			/*
+			 * As many data cores may want access to available
+			 * buffers, they need to be reserved.
+			 */
+			res_base_idx = vq->last_used_idx_res;
+			avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+
+			if (unlikely(res_base_idx == avail_idx)) {
+				LOG_DEBUG(VHOST_DATA,
+					"(%"PRIu64") Failed "
+					"to get enough desc from "
+					"vring\n",
+					dev->device_fh);
+				return pkt_idx;
+			} else {
+				uint16_t wrapped_idx =
+					(res_base_idx) & (vq->size - 1);
+				uint32_t idx = vq->avail->ring[wrapped_idx];
+
+				update_secure_len(vq, idx, &secure_len);
+			}
+
+			if (pkt_len > secure_len) {
+				LOG_DEBUG(VHOST_DATA,
+					"(%"PRIu64") Failed "
+					"to get enough desc from "
+					"vring\n",
+					dev->device_fh);
+				return pkt_idx;
+			}
+
+			/* vq->last_used_idx_res is atomically updated. */
+			success = rte_atomic16_cmpset(&vq->last_used_idx_res,
+							res_base_idx,
+							res_base_idx + 1);
+		} while (success == 0);
+
+		fill_buf_vec(vq, res_base_idx, &vec_idx);
+
+		res_end_idx = res_base_idx + 1;
+
+		entry_success = copy_from_mbuf_to_vring(dev, res_base_idx,
+			res_end_idx, pkts[pkt_idx]);
+
+		rte_compiler_barrier();
+
+		/*
+		 * Wait until it's our turn to add our buffer
+		 * to the used ring.
+		 */
+		while (unlikely(vq->last_used_idx != res_base_idx))
+			rte_pause();
+
+		*(volatile uint16_t *)&vq->used->idx += entry_success;
+		vq->last_used_idx = res_end_idx;
+
+		/* flush used->idx update before we read avail->flags. */
+		rte_mb();
+
+		/* Kick the guest if necessary. */
+		if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
+			eventfd_write((int)vq->callfd, 1);
+	}
+
+	return count;
+}
+
+/*
  * This function works for mergeable RX.
  */
 static inline uint32_t __attribute__((always_inline))
@@ -545,12 +643,28 @@  virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
 	return count;
 }
 
+/*
+ * Return 1 if any mbuf is scattered, otherwise return 0.
+ */
+static inline uint32_t __attribute__((always_inline))
+check_scatter(struct rte_mbuf **pkts, uint16_t count)
+{
+	uint32_t i;
+	for (i = 0; i < count; i++) {
+		if (pkts[i]->next != NULL)
+			return 1;
+	}
+	return 0;
+}
+
 uint16_t
 rte_vhost_enqueue_burst(struct virtio_net *dev, uint16_t queue_id,
 	struct rte_mbuf **pkts, uint16_t count)
 {
 	if (unlikely(dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF)))
 		return virtio_dev_merge_rx(dev, queue_id, pkts, count);
+	else if (unlikely(check_scatter(pkts, count) == 1))
+		return virtio_dev_scatter_rx(dev, queue_id, pkts, count);
 	else
 		return virtio_dev_rx(dev, queue_id, pkts, count);
 }