[v8,06/13] vhost: add packed ring batch dequeue
Checks
Commit Message
Add batch dequeue function like enqueue function for packed ring, batch
dequeue function will not support chained descritpors, single packet
dequeue function will handle it.
Signed-off-by: Marvin Liu <yong.liu@intel.com>
Comments
On 10/22/19 12:08 AM, Marvin Liu wrote:
> Add batch dequeue function like enqueue function for packed ring, batch
> dequeue function will not support chained descritpors, single packet
> dequeue function will handle it.
>
> Signed-off-by: Marvin Liu <yong.liu@intel.com>
Reviewed-by: Maxime Coquelin <maxime.coquelin@redhat.com>
Thanks,
Maxime
p.s. It would be better if you could generate the series with providing
the diffstat in every patch (which is default behavior of git format-
patch).
>
> diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> index a2b9221e0..67724c342 100644
> --- a/lib/librte_vhost/vhost.h
> +++ b/lib/librte_vhost/vhost.h
> @@ -39,6 +39,9 @@
>
> #define VHOST_LOG_CACHE_NR 32
>
> +#define PACKED_DESC_SINGLE_DEQUEUE_FLAG (VRING_DESC_F_NEXT | \
> + VRING_DESC_F_INDIRECT)
> +
> #define PACKED_BATCH_SIZE (RTE_CACHE_LINE_SIZE / \
> sizeof(struct vring_packed_desc))
> #define PACKED_BATCH_MASK (PACKED_BATCH_SIZE - 1)
> diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> index 87f2ae49e..76435204f 100644
> --- a/lib/librte_vhost/virtio_net.c
> +++ b/lib/librte_vhost/virtio_net.c
> @@ -1635,6 +1635,117 @@ virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
> return i;
> }
>
> +static __rte_always_inline int
> +vhost_reserve_avail_batch_packed(struct virtio_net *dev,
> + struct vhost_virtqueue *vq,
> + struct rte_mempool *mbuf_pool,
> + struct rte_mbuf **pkts,
> + uint16_t avail_idx,
> + uintptr_t *desc_addrs,
> + uint16_t *ids)
> +{
> + bool wrap = vq->avail_wrap_counter;
> + struct vring_packed_desc *descs = vq->desc_packed;
> + struct virtio_net_hdr *hdr;
> + uint64_t lens[PACKED_BATCH_SIZE];
> + uint64_t buf_lens[PACKED_BATCH_SIZE];
> + uint32_t buf_offset = dev->vhost_hlen;
> + uint16_t flags, i;
> +
> + if (unlikely(avail_idx & PACKED_BATCH_MASK))
> + return -1;
> + if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
> + return -1;
> +
> + vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
> + flags = descs[avail_idx + i].flags;
> + if (unlikely((wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
> + (wrap == !!(flags & VRING_DESC_F_USED)) ||
> + (flags & PACKED_DESC_SINGLE_DEQUEUE_FLAG)))
> + return -1;
> + }
> +
> + rte_smp_rmb();
> +
> + vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
> + lens[i] = descs[avail_idx + i].len;
> +
> + vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
> + desc_addrs[i] = vhost_iova_to_vva(dev, vq,
> + descs[avail_idx + i].addr,
> + &lens[i], VHOST_ACCESS_RW);
> + }
> +
> + vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
> + if (unlikely((lens[i] != descs[avail_idx + i].len)))
> + return -1;
> + }
> +
> + vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
> + pkts[i] = virtio_dev_pktmbuf_alloc(dev, mbuf_pool, lens[i]);
> + if (!pkts[i])
> + goto free_buf;
> + }
> +
> + vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
> + buf_lens[i] = pkts[i]->buf_len - pkts[i]->data_off;
> +
> + vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
> + if (unlikely(buf_lens[i] < (lens[i] - buf_offset)))
> + goto free_buf;
> + }
> +
> + vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
> + pkts[i]->pkt_len = descs[avail_idx + i].len - buf_offset;
> + pkts[i]->data_len = pkts[i]->pkt_len;
> + ids[i] = descs[avail_idx + i].id;
> + }
> +
> + if (virtio_net_with_host_offload(dev)) {
> + vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
> + hdr = (struct virtio_net_hdr *)(desc_addrs[i]);
> + vhost_dequeue_offload(hdr, pkts[i]);
> + }
> + }
> +
> + return 0;
> +
> +free_buf:
> + for (i = 0; i < PACKED_BATCH_SIZE; i++)
> + rte_pktmbuf_free(pkts[i]);
> +
> + return -1;
> +}
> +
> +static __rte_unused int
> +virtio_dev_tx_batch_packed(struct virtio_net *dev,
> + struct vhost_virtqueue *vq,
> + struct rte_mempool *mbuf_pool,
> + struct rte_mbuf **pkts)
> +{
> + uint16_t avail_idx = vq->last_avail_idx;
> + uint32_t buf_offset = dev->vhost_hlen;
> + uintptr_t desc_addrs[PACKED_BATCH_SIZE];
> + uint16_t ids[PACKED_BATCH_SIZE];
> + uint16_t i;
> +
> + if (vhost_reserve_avail_batch_packed(dev, vq, mbuf_pool, pkts,
> + avail_idx, desc_addrs, ids))
> + return -1;
> +
> + vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
> + rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
> +
> + vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
> + rte_memcpy(rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
> + (void *)(uintptr_t)(desc_addrs[i] + buf_offset),
> + pkts[i]->pkt_len);
> +
> + vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
> +
> + return 0;
> +}
> +
> static __rte_always_inline int
> vhost_dequeue_single_packed(struct virtio_net *dev,
> struct vhost_virtqueue *vq,
>
@@ -39,6 +39,9 @@
#define VHOST_LOG_CACHE_NR 32
+#define PACKED_DESC_SINGLE_DEQUEUE_FLAG (VRING_DESC_F_NEXT | \
+ VRING_DESC_F_INDIRECT)
+
#define PACKED_BATCH_SIZE (RTE_CACHE_LINE_SIZE / \
sizeof(struct vring_packed_desc))
#define PACKED_BATCH_MASK (PACKED_BATCH_SIZE - 1)
@@ -1635,6 +1635,117 @@ virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
return i;
}
+static __rte_always_inline int
+vhost_reserve_avail_batch_packed(struct virtio_net *dev,
+ struct vhost_virtqueue *vq,
+ struct rte_mempool *mbuf_pool,
+ struct rte_mbuf **pkts,
+ uint16_t avail_idx,
+ uintptr_t *desc_addrs,
+ uint16_t *ids)
+{
+ bool wrap = vq->avail_wrap_counter;
+ struct vring_packed_desc *descs = vq->desc_packed;
+ struct virtio_net_hdr *hdr;
+ uint64_t lens[PACKED_BATCH_SIZE];
+ uint64_t buf_lens[PACKED_BATCH_SIZE];
+ uint32_t buf_offset = dev->vhost_hlen;
+ uint16_t flags, i;
+
+ if (unlikely(avail_idx & PACKED_BATCH_MASK))
+ return -1;
+ if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size))
+ return -1;
+
+ vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+ flags = descs[avail_idx + i].flags;
+ if (unlikely((wrap != !!(flags & VRING_DESC_F_AVAIL)) ||
+ (wrap == !!(flags & VRING_DESC_F_USED)) ||
+ (flags & PACKED_DESC_SINGLE_DEQUEUE_FLAG)))
+ return -1;
+ }
+
+ rte_smp_rmb();
+
+ vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
+ lens[i] = descs[avail_idx + i].len;
+
+ vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+ desc_addrs[i] = vhost_iova_to_vva(dev, vq,
+ descs[avail_idx + i].addr,
+ &lens[i], VHOST_ACCESS_RW);
+ }
+
+ vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+ if (unlikely((lens[i] != descs[avail_idx + i].len)))
+ return -1;
+ }
+
+ vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+ pkts[i] = virtio_dev_pktmbuf_alloc(dev, mbuf_pool, lens[i]);
+ if (!pkts[i])
+ goto free_buf;
+ }
+
+ vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
+ buf_lens[i] = pkts[i]->buf_len - pkts[i]->data_off;
+
+ vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+ if (unlikely(buf_lens[i] < (lens[i] - buf_offset)))
+ goto free_buf;
+ }
+
+ vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+ pkts[i]->pkt_len = descs[avail_idx + i].len - buf_offset;
+ pkts[i]->data_len = pkts[i]->pkt_len;
+ ids[i] = descs[avail_idx + i].id;
+ }
+
+ if (virtio_net_with_host_offload(dev)) {
+ vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+ hdr = (struct virtio_net_hdr *)(desc_addrs[i]);
+ vhost_dequeue_offload(hdr, pkts[i]);
+ }
+ }
+
+ return 0;
+
+free_buf:
+ for (i = 0; i < PACKED_BATCH_SIZE; i++)
+ rte_pktmbuf_free(pkts[i]);
+
+ return -1;
+}
+
+static __rte_unused int
+virtio_dev_tx_batch_packed(struct virtio_net *dev,
+ struct vhost_virtqueue *vq,
+ struct rte_mempool *mbuf_pool,
+ struct rte_mbuf **pkts)
+{
+ uint16_t avail_idx = vq->last_avail_idx;
+ uint32_t buf_offset = dev->vhost_hlen;
+ uintptr_t desc_addrs[PACKED_BATCH_SIZE];
+ uint16_t ids[PACKED_BATCH_SIZE];
+ uint16_t i;
+
+ if (vhost_reserve_avail_batch_packed(dev, vq, mbuf_pool, pkts,
+ avail_idx, desc_addrs, ids))
+ return -1;
+
+ vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
+ rte_prefetch0((void *)(uintptr_t)desc_addrs[i]);
+
+ vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
+ rte_memcpy(rte_pktmbuf_mtod_offset(pkts[i], void *, 0),
+ (void *)(uintptr_t)(desc_addrs[i] + buf_offset),
+ pkts[i]->pkt_len);
+
+ vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
+
+ return 0;
+}
+
static __rte_always_inline int
vhost_dequeue_single_packed(struct virtio_net *dev,
struct vhost_virtqueue *vq,