[dpdk-dev,3/5] virtio: use indirect ring elements

Message ID 1445231772-17467-4-git-send-email-stephen@networkplumber.org (mailing list archive)
State Changes Requested, archived
Headers

Commit Message

Stephen Hemminger Oct. 19, 2015, 5:16 a.m. UTC
  The virtio ring in QEMU/KVM is usually limited to 256 entries
and the normal way that virtio driver was queuing mbufs required
nsegs + 1 ring elements. By using the indirect ring element feature
if available, each packet will take only one ring slot even for
multi-segment packets.

Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
---
 drivers/net/virtio/virtio_ethdev.c | 38 +++++++++++++++++------
 drivers/net/virtio/virtio_ethdev.h |  3 +-
 drivers/net/virtio/virtio_rxtx.c   | 62 +++++++++++++++++++++++++++-----------
 drivers/net/virtio/virtqueue.h     | 19 ++++++++++++
 4 files changed, 94 insertions(+), 28 deletions(-)
  

Comments

Huawei Xie Oct. 19, 2015, 1:19 p.m. UTC | #1
On 10/19/2015 1:16 PM, Stephen Hemminger wrote:
> The virtio ring in QEMU/KVM is usually limited to 256 entries
> and the normal way that virtio driver was queuing mbufs required
> nsegs + 1 ring elements. By using the indirect ring element feature
> if available, each packet will take only one ring slot even for
> multi-segment packets.

[...]


>  static int
> -virtqueue_enqueue_xmit(struct virtqueue *txvq, struct rte_mbuf *cookie)
> +virtqueue_enqueue_xmit(struct virtqueue *txvq, struct rte_mbuf *cookie,
> +		       int use_indirect)
>  {
>  	struct vq_desc_extra *dxp;
>  	struct vring_desc *start_dp;
>  	uint16_t seg_num = cookie->nb_segs;
> -	uint16_t needed = 1 + seg_num;
> +	uint16_t needed = use_indirect ? 1 : 1 + seg_num;
>  	uint16_t head_idx, idx;
> -	uint16_t head_size = txvq->hw->vtnet_hdr_size;
> +	unsigned long offs;
>  
>  	if (unlikely(txvq->vq_free_cnt == 0))
>  		return -ENOSPC;
> @@ -220,12 +221,29 @@ virtqueue_enqueue_xmit(struct virtqueue *txvq, struct rte_mbuf *cookie)
>  	dxp = &txvq->vq_descx[idx];
>  	dxp->cookie = (void *)cookie;
>  	dxp->ndescs = needed;
> -
>  	start_dp = txvq->vq_ring.desc;
> -	start_dp[idx].addr =
> -		txvq->virtio_net_hdr_mem + idx * head_size;
> -	start_dp[idx].len = (uint32_t)head_size;
> -	start_dp[idx].flags = VRING_DESC_F_NEXT;
> +
> +	if (use_indirect) {
> +		struct virtio_tx_region *txr
> +			= txvq->virtio_net_hdr_mz->addr;
> +
> +		offs = idx * sizeof(struct virtio_tx_region)
> +			+ offsetof(struct virtio_tx_region, tx_indir);
> +
> +		start_dp[idx].addr  = txvq->virtio_net_hdr_mem + offs;
> +		start_dp[idx].len   = (seg_num + 1) * sizeof(struct vring_desc);
a. In indirect mode, as we always use one descriptor, could we allocate
one fixed descriptor as what i did in RX/TX ring layout optimization? :).
b. If not a, we could cache the descriptor, avoid update unless the
fields are different. In current implementation of free desc list, we
could make them always use the same tx desc for the same ring slot. I am
to submit a patch for normal rx path.
c. Could we initialize the length of all tx descriptors to be
VIRTIO_MAX_INDIRECT * sizeof(struct vring_desc)? Is maximum length ok
here? Does the spec require that the length field reflects the length of
real used descs, as we already have the next field to indicate the last
descriptor.
 

> +		start_dp[idx].flags = VRING_DESC_F_INDIRECT;
> +
> +		start_dp = txr[idx].tx_indir;
> +		idx = 0;
> +	} else {
> +		offs = idx * sizeof(struct virtio_tx_region)
> +			+ offsetof(struct virtio_tx_region, tx_hdr);
> +
> +		start_dp[idx].addr  = txvq->virtio_net_hdr_mem + offs;
> +		start_dp[idx].len   = txvq->hw->vtnet_hdr_size;
> +		start_dp[idx].flags = VRING_DESC_F_NEXT;
> +	}
>  
>  	for (; ((seg_num > 0) && (cookie != NULL)); seg_num--) {
>  		idx = start_dp[idx].next;
This looks weird to me. Why skip the first user provided descriptor?
idx = 0
idx = start_dp[idx].next
start_dp[idx].addr = ...

> @@ -236,7 +254,12 @@ virtqueue_enqueue_xmit(struct virtqueue *txvq, struct rte_mbuf *cookie)
>  	}
>  
>  	start_dp[idx].flags &= ~VRING_DESC_F_NEXT;
> -	idx = start_dp[idx].next;
> +
> +	if (use_indirect)
> +		idx = txvq->vq_ring.desc[head_idx].next;
> +	else
> +		idx = start_dp[idx].next;
> +
>  	txvq->vq_desc_head_idx = idx;
>  	if (txvq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END)
>  		txvq->vq_desc_tail_idx = idx;
> @@ -261,7 +284,7 @@ static void
>  virtio_dev_vring_start(struct virtqueue *vq, int queue_type)
>  {
>  	struct rte_mbuf *m;
> -	int i, nbufs, error, size = vq->vq_nentries;
> +	int nbufs, error, size = vq->vq_nentries;
>  	struct vring *vr = &vq->vq_ring;
>  	uint8_t *ring_mem = vq->vq_ring_virt_mem;
>  
> @@ -279,10 +302,7 @@ virtio_dev_vring_start(struct virtqueue *vq, int queue_type)
>  	vq->vq_free_cnt = vq->vq_nentries;
>  	memset(vq->vq_descx, 0, sizeof(struct vq_desc_extra) * vq->vq_nentries);
>  
> -	/* Chain all the descriptors in the ring with an END */
> -	for (i = 0; i < size - 1; i++)
> -		vr->desc[i].next = (uint16_t)(i + 1);
> -	vr->desc[i].next = VQ_RING_DESC_CHAIN_END;
> +	vring_desc_init(vr->desc, size);
>  
>  	/*
>  	 * Disable device(host) interrupting guest
> @@ -760,7 +780,15 @@ virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
>  
>  	for (nb_tx = 0; nb_tx < nb_pkts; nb_tx++) {
>  		struct rte_mbuf *txm = tx_pkts[nb_tx];
> -		int need = txm->nb_segs - txvq->vq_free_cnt + 1;
> +		int use_indirect, slots, need;
> +
> +		use_indirect = vtpci_with_feature(txvq->hw,
> +						  VIRTIO_RING_F_INDIRECT_DESC)
> +			&& (txm->nb_segs < VIRTIO_MAX_TX_INDIRECT);
> +
> +		/* How many ring entries are needed to this Tx? */
"how many descs" is more accurate , at least to me, ring entries/slots
means entries/slots in avail ring.
If it is OK, s/slots/descs/ as well.

> +		slots = use_indirect ? 1 : 1 + txm->nb_segs;
> +		need = slots - txvq->vq_free_cnt;
>  
>  		/* Positive value indicates it need free vring descriptors */
>  		if (need > 0) {
> @@ -769,7 +797,7 @@ virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
>  			need = RTE_MIN(need, (int)nb_used);
>  
>  			virtio_xmit_cleanup(txvq, need);
> -			need = txm->nb_segs - txvq->vq_free_cnt + 1;
> +			need = slots - txvq->vq_free_cnt;
>  			if (unlikely(need > 0)) {
>  				PMD_TX_LOG(ERR,
>  					   "No free tx descriptors to transmit");
> @@ -787,7 +815,7 @@ virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
>  		}
>  
>  		/* Enqueue Packet buffers */
> -		error = virtqueue_enqueue_xmit(txvq, txm);
> +		error = virtqueue_enqueue_xmit(txvq, txm, use_indirect);
>  		if (unlikely(error)) {
>  			if (error == ENOSPC)
>  				PMD_TX_LOG(ERR, "virtqueue_enqueue Free count = 0");
> diff --git a/drivers/net/virtio/virtqueue.h b/drivers/net/virtio/virtqueue.h
> index 7789411..fe3fa66 100644
> --- a/drivers/net/virtio/virtqueue.h
> +++ b/drivers/net/virtio/virtqueue.h
> @@ -237,6 +237,25 @@ struct virtio_net_hdr_mrg_rxbuf {
>  	uint16_t num_buffers; /**< Number of merged rx buffers */
>  };
>  
> +/* Region reserved to allow for transmit header and indirect ring */
> +#define VIRTIO_MAX_TX_INDIRECT 8
> +struct virtio_tx_region {
> +	struct virtio_net_hdr_mrg_rxbuf tx_hdr;
Any reason to use merge-able header here?
> +	struct vring_desc tx_indir[VIRTIO_MAX_TX_INDIRECT]
> +			   __attribute__((__aligned__(16)));
WARNING: __aligned(size) is preferred over __attribute__((aligned(size)))
[...]
  
Stephen Hemminger Oct. 19, 2015, 3:47 p.m. UTC | #2
On Mon, 19 Oct 2015 13:19:50 +0000
"Xie, Huawei" <huawei.xie@intel.com> wrote:

> >  static int
> > -virtqueue_enqueue_xmit(struct virtqueue *txvq, struct rte_mbuf *cookie)
> > +virtqueue_enqueue_xmit(struct virtqueue *txvq, struct rte_mbuf *cookie,
> > +		       int use_indirect)
> >  {
> >  	struct vq_desc_extra *dxp;
> >  	struct vring_desc *start_dp;
> >  	uint16_t seg_num = cookie->nb_segs;
> > -	uint16_t needed = 1 + seg_num;
> > +	uint16_t needed = use_indirect ? 1 : 1 + seg_num;
> >  	uint16_t head_idx, idx;
> > -	uint16_t head_size = txvq->hw->vtnet_hdr_size;
> > +	unsigned long offs;
> >  
> >  	if (unlikely(txvq->vq_free_cnt == 0))
> >  		return -ENOSPC;
> > @@ -220,12 +221,29 @@ virtqueue_enqueue_xmit(struct virtqueue *txvq, struct rte_mbuf *cookie)
> >  	dxp = &txvq->vq_descx[idx];
> >  	dxp->cookie = (void *)cookie;
> >  	dxp->ndescs = needed;
> > -
> >  	start_dp = txvq->vq_ring.desc;
> > -	start_dp[idx].addr =
> > -		txvq->virtio_net_hdr_mem + idx * head_size;
> > -	start_dp[idx].len = (uint32_t)head_size;
> > -	start_dp[idx].flags = VRING_DESC_F_NEXT;
> > +
> > +	if (use_indirect) {
> > +		struct virtio_tx_region *txr
> > +			= txvq->virtio_net_hdr_mz->addr;
> > +
> > +		offs = idx * sizeof(struct virtio_tx_region)
> > +			+ offsetof(struct virtio_tx_region, tx_indir);
> > +
> > +		start_dp[idx].addr  = txvq->virtio_net_hdr_mem + offs;
> > +		start_dp[idx].len   = (seg_num + 1) * sizeof(struct vring_desc);  
> a. In indirect mode, as we always use one descriptor, could we allocate
> one fixed descriptor as what i did in RX/TX ring layout optimization? :).

The code can not assume all packets will be in indirect mode. If using
any_layout, then some packets will use that. Also if you give a packet
where nb_segs is very large, then it falls back to old mode.
Also some hosts (like vhost) don't support indirect.

> b. If not a, we could cache the descriptor, avoid update unless the
> fields are different. In current implementation of free desc list, we
> could make them always use the same tx desc for the same ring slot. I am
> to submit a patch for normal rx path.

See above

> c. Could we initialize the length of all tx descriptors to be
> VIRTIO_MAX_INDIRECT * sizeof(struct vring_desc)? Is maximum length ok
> here? Does the spec require that the length field reflects the length of
> real used descs, as we already have the next field to indicate the last
> descriptor.

The largest VIRTIO_MAX_INDIRECT possible is very large 4K


> 
> > +		start_dp[idx].flags = VRING_DESC_F_INDIRECT;
> > +
> > +		start_dp = txr[idx].tx_indir;
> > +		idx = 0;
> > +	} else {
> > +		offs = idx * sizeof(struct virtio_tx_region)
> > +			+ offsetof(struct virtio_tx_region, tx_hdr);
> > +
> > +		start_dp[idx].addr  = txvq->virtio_net_hdr_mem + offs;
> > +		start_dp[idx].len   = txvq->hw->vtnet_hdr_size;
> > +		start_dp[idx].flags = VRING_DESC_F_NEXT;
> > +	}
> >  
> >  	for (; ((seg_num > 0) && (cookie != NULL)); seg_num--) {
> >  		idx = start_dp[idx].next;  
> This looks weird to me. Why skip the first user provided descriptor?
> idx = 0
> idx = start_dp[idx].next
> start_dp[idx].addr = ...

The first descriptor (0) is initialized once to point to the static
all zeros tx header. Then code skips to second entry to initailize the
first data block.

> 
> > @@ -236,7 +254,12 @@ virtqueue_enqueue_xmit(struct virtqueue *txvq, struct rte_mbuf *cookie)
> >  	}
> >  
> >  	start_dp[idx].flags &= ~VRING_DESC_F_NEXT;
> > -	idx = start_dp[idx].next;
> > +
> > +	if (use_indirect)
> > +		idx = txvq->vq_ring.desc[head_idx].next;
> > +	else
> > +		idx = start_dp[idx].next;
> > +
> >  	txvq->vq_desc_head_idx = idx;
> >  	if (txvq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END)
> >  		txvq->vq_desc_tail_idx = idx;
> > @@ -261,7 +284,7 @@ static void
> >  virtio_dev_vring_start(struct virtqueue *vq, int queue_type)
> >  {
> >  	struct rte_mbuf *m;
> > -	int i, nbufs, error, size = vq->vq_nentries;
> > +	int nbufs, error, size = vq->vq_nentries;
> >  	struct vring *vr = &vq->vq_ring;
> >  	uint8_t *ring_mem = vq->vq_ring_virt_mem;
> >  
> > @@ -279,10 +302,7 @@ virtio_dev_vring_start(struct virtqueue *vq, int queue_type)
> >  	vq->vq_free_cnt = vq->vq_nentries;
> >  	memset(vq->vq_descx, 0, sizeof(struct vq_desc_extra) * vq->vq_nentries);
> >  
> > -	/* Chain all the descriptors in the ring with an END */
> > -	for (i = 0; i < size - 1; i++)
> > -		vr->desc[i].next = (uint16_t)(i + 1);
> > -	vr->desc[i].next = VQ_RING_DESC_CHAIN_END;
> > +	vring_desc_init(vr->desc, size);
> >  
> >  	/*
> >  	 * Disable device(host) interrupting guest
> > @@ -760,7 +780,15 @@ virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
> >  
> >  	for (nb_tx = 0; nb_tx < nb_pkts; nb_tx++) {
> >  		struct rte_mbuf *txm = tx_pkts[nb_tx];
> > -		int need = txm->nb_segs - txvq->vq_free_cnt + 1;
> > +		int use_indirect, slots, need;
> > +
> > +		use_indirect = vtpci_with_feature(txvq->hw,
> > +						  VIRTIO_RING_F_INDIRECT_DESC)
> > +			&& (txm->nb_segs < VIRTIO_MAX_TX_INDIRECT);
> > +
> > +		/* How many ring entries are needed to this Tx? */  
> "how many descs" is more accurate , at least to me, ring entries/slots
> means entries/slots in avail ring.
> If it is OK, s/slots/descs/ as well.

The virtio spec doesn't use the words descriptors. that is more an Intel
driver terminolgy.

> 
> > +		slots = use_indirect ? 1 : 1 + txm->nb_segs;
> > +		need = slots - txvq->vq_free_cnt;
> >  
> >  		/* Positive value indicates it need free vring descriptors */
> >  		if (need > 0) {
> > @@ -769,7 +797,7 @@ virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
> >  			need = RTE_MIN(need, (int)nb_used);
> >  
> >  			virtio_xmit_cleanup(txvq, need);
> > -			need = txm->nb_segs - txvq->vq_free_cnt + 1;
> > +			need = slots - txvq->vq_free_cnt;
> >  			if (unlikely(need > 0)) {
> >  				PMD_TX_LOG(ERR,
> >  					   "No free tx descriptors to transmit");
> > @@ -787,7 +815,7 @@ virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
> >  		}
> >  
> >  		/* Enqueue Packet buffers */
> > -		error = virtqueue_enqueue_xmit(txvq, txm);
> > +		error = virtqueue_enqueue_xmit(txvq, txm, use_indirect);
> >  		if (unlikely(error)) {
> >  			if (error == ENOSPC)
> >  				PMD_TX_LOG(ERR, "virtqueue_enqueue Free count = 0");
> > diff --git a/drivers/net/virtio/virtqueue.h b/drivers/net/virtio/virtqueue.h
> > index 7789411..fe3fa66 100644
> > --- a/drivers/net/virtio/virtqueue.h
> > +++ b/drivers/net/virtio/virtqueue.h
> > @@ -237,6 +237,25 @@ struct virtio_net_hdr_mrg_rxbuf {
> >  	uint16_t num_buffers; /**< Number of merged rx buffers */
> >  };
> >  
> > +/* Region reserved to allow for transmit header and indirect ring */
> > +#define VIRTIO_MAX_TX_INDIRECT 8
> > +struct virtio_tx_region {
> > +	struct virtio_net_hdr_mrg_rxbuf tx_hdr;  
> Any reason to use merge-able header here?
> > +	struct vring_desc tx_indir[VIRTIO_MAX_TX_INDIRECT]
> > +			   __attribute__((__aligned__(16)));  
> WARNING: __aligned(size) is preferred over __attribute__((aligned(size)))
> [...]
  
Huawei Xie Oct. 19, 2015, 4:18 p.m. UTC | #3
On 10/19/2015 11:47 PM, Stephen Hemminger wrote:
> On Mon, 19 Oct 2015 13:19:50 +0000
> "Xie, Huawei" <huawei.xie@intel.com> wrote:
>
>>>  static int
>>> -virtqueue_enqueue_xmit(struct virtqueue *txvq, struct rte_mbuf *cookie)
>>> +virtqueue_enqueue_xmit(struct virtqueue *txvq, struct rte_mbuf *cookie,
>>> +		       int use_indirect)
>>>  {
>>>  	struct vq_desc_extra *dxp;
>>>  	struct vring_desc *start_dp;
>>>  	uint16_t seg_num = cookie->nb_segs;
>>> -	uint16_t needed = 1 + seg_num;
>>> +	uint16_t needed = use_indirect ? 1 : 1 + seg_num;
>>>  	uint16_t head_idx, idx;
>>> -	uint16_t head_size = txvq->hw->vtnet_hdr_size;
>>> +	unsigned long offs;
>>>  
>>>  	if (unlikely(txvq->vq_free_cnt == 0))
>>>  		return -ENOSPC;
>>> @@ -220,12 +221,29 @@ virtqueue_enqueue_xmit(struct virtqueue *txvq, struct rte_mbuf *cookie)
>>>  	dxp = &txvq->vq_descx[idx];
>>>  	dxp->cookie = (void *)cookie;
>>>  	dxp->ndescs = needed;
>>> -
>>>  	start_dp = txvq->vq_ring.desc;
>>> -	start_dp[idx].addr =
>>> -		txvq->virtio_net_hdr_mem + idx * head_size;
>>> -	start_dp[idx].len = (uint32_t)head_size;
>>> -	start_dp[idx].flags = VRING_DESC_F_NEXT;
>>> +
>>> +	if (use_indirect) {
>>> +		struct virtio_tx_region *txr
>>> +			= txvq->virtio_net_hdr_mz->addr;
>>> +
>>> +		offs = idx * sizeof(struct virtio_tx_region)
>>> +			+ offsetof(struct virtio_tx_region, tx_indir);
>>> +
>>> +		start_dp[idx].addr  = txvq->virtio_net_hdr_mem + offs;
>>> +		start_dp[idx].len   = (seg_num + 1) * sizeof(struct vring_desc);  
>> a. In indirect mode, as we always use one descriptor, could we allocate
>> one fixed descriptor as what i did in RX/TX ring layout optimization? :).
> The code can not assume all packets will be in indirect mode. If using
> any_layout, then some packets will use that. Also if you give a packet
> where nb_segs is very large, then it falls back to old mode.
> Also some hosts (like vhost) don't support indirect.
Agree.
With always one descriptor, it is ok.
For the packets with more than VIRTIO_MAX_INDIRECT segs, we fall to old
mode.
>
>> b. If not a, we could cache the descriptor, avoid update unless the
>> fields are different. In current implementation of free desc list, we
>> could make them always use the same tx desc for the same ring slot. I am
>> to submit a patch for normal rx path.
> See above
I don't mean using fixed descriptors. Even in general implementation,
the desc allocated for the ring entry would be normally the same.
So we cache the descriptor id(in the case only one desc is used) last
allocated for each avail ring entry , compare, if the same, skip the store.
This introduces some overhead, but considering vhost has to fetch this
L1M cache line from virtio's processing core, it might be worth.
Mst has posted a patch for virtio's testcase.
Maybe it makes more sense for RX as currently we always uses one descriptor.
If you mean the "only one descriptor" again, skip this comment.
>
>> c. Could we initialize the length of all tx descriptors to be
>> VIRTIO_MAX_INDIRECT * sizeof(struct vring_desc)? Is maximum length ok
>> here? Does the spec require that the length field reflects the length of
>> real used descs, as we already have the next field to indicate the last
>> descriptor.
> The largest VIRTIO_MAX_INDIRECT possible is very large 4K
instead of

start_dp[idx].len   = (seg_num + 1) * sizeof(struct vring_desc)
Is it ok to use
start_dp[idx].len   = 4096 * sizeof(struct vring_desc)?

>
>
>>> +		start_dp[idx].flags = VRING_DESC_F_INDIRECT;
>>> +
>>> +		start_dp = txr[idx].tx_indir;
>>> +		idx = 0;
>>> +	} else {
>>> +		offs = idx * sizeof(struct virtio_tx_region)
>>> +			+ offsetof(struct virtio_tx_region, tx_hdr);
>>> +
>>> +		start_dp[idx].addr  = txvq->virtio_net_hdr_mem + offs;
>>> +		start_dp[idx].len   = txvq->hw->vtnet_hdr_size;
>>> +		start_dp[idx].flags = VRING_DESC_F_NEXT;
>>> +	}
>>>  
>>>  	for (; ((seg_num > 0) && (cookie != NULL)); seg_num--) {
>>>  		idx = start_dp[idx].next;  
>> This looks weird to me. Why skip the first user provided descriptor?
>> idx = 0
>> idx = start_dp[idx].next
>> start_dp[idx].addr = ...
> The first descriptor (0) is initialized once to point to the static
> all zeros tx header. Then code skips to second entry to initailize the
> first data block.
Ah, forget the header.
>>> @@ -236,7 +254,12 @@ virtqueue_enqueue_xmit(struct virtqueue *txvq, struct rte_mbuf *cookie)
>>>  	}
>>>  
>>>  	start_dp[idx].flags &= ~VRING_DESC_F_NEXT;
>>> -	idx = start_dp[idx].next;
>>> +
>>> +	if (use_indirect)
>>> +		idx = txvq->vq_ring.desc[head_idx].next;
>>> +	else
>>> +		idx = start_dp[idx].next;
>>> +
>>>  	txvq->vq_desc_head_idx = idx;
>>>  	if (txvq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END)
>>>  		txvq->vq_desc_tail_idx = idx;
>>> @@ -261,7 +284,7 @@ static void
>>>  virtio_dev_vring_start(struct virtqueue *vq, int queue_type)
>>>  {
>>>  	struct rte_mbuf *m;
>>> -	int i, nbufs, error, size = vq->vq_nentries;
>>> +	int nbufs, error, size = vq->vq_nentries;
>>>  	struct vring *vr = &vq->vq_ring;
>>>  	uint8_t *ring_mem = vq->vq_ring_virt_mem;
>>>  
>>> @@ -279,10 +302,7 @@ virtio_dev_vring_start(struct virtqueue *vq, int queue_type)
>>>  	vq->vq_free_cnt = vq->vq_nentries;
>>>  	memset(vq->vq_descx, 0, sizeof(struct vq_desc_extra) * vq->vq_nentries);
>>>  
>>> -	/* Chain all the descriptors in the ring with an END */
>>> -	for (i = 0; i < size - 1; i++)
>>> -		vr->desc[i].next = (uint16_t)(i + 1);
>>> -	vr->desc[i].next = VQ_RING_DESC_CHAIN_END;
>>> +	vring_desc_init(vr->desc, size);
>>>  
>>>  	/*
>>>  	 * Disable device(host) interrupting guest
>>> @@ -760,7 +780,15 @@ virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
>>>  
>>>  	for (nb_tx = 0; nb_tx < nb_pkts; nb_tx++) {
>>>  		struct rte_mbuf *txm = tx_pkts[nb_tx];
>>> -		int need = txm->nb_segs - txvq->vq_free_cnt + 1;
>>> +		int use_indirect, slots, need;
>>> +
>>> +		use_indirect = vtpci_with_feature(txvq->hw,
>>> +						  VIRTIO_RING_F_INDIRECT_DESC)
>>> +			&& (txm->nb_segs < VIRTIO_MAX_TX_INDIRECT);
>>> +
>>> +		/* How many ring entries are needed to this Tx? */  
>> "how many descs" is more accurate , at least to me, ring entries/slots
>> means entries/slots in avail ring.
>> If it is OK, s/slots/descs/ as well.
> The virtio spec doesn't use the words descriptors. that is more an Intel
> driver terminolgy.
>
>>> +		slots = use_indirect ? 1 : 1 + txm->nb_segs;
>>> +		need = slots - txvq->vq_free_cnt;
>>>  
>>>  		/* Positive value indicates it need free vring descriptors */
>>>  		if (need > 0) {
>>> @@ -769,7 +797,7 @@ virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
>>>  			need = RTE_MIN(need, (int)nb_used);
>>>  
>>>  			virtio_xmit_cleanup(txvq, need);
>>> -			need = txm->nb_segs - txvq->vq_free_cnt + 1;
>>> +			need = slots - txvq->vq_free_cnt;
>>>  			if (unlikely(need > 0)) {
>>>  				PMD_TX_LOG(ERR,
>>>  					   "No free tx descriptors to transmit");
>>> @@ -787,7 +815,7 @@ virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
>>>  		}
>>>  
>>>  		/* Enqueue Packet buffers */
>>> -		error = virtqueue_enqueue_xmit(txvq, txm);
>>> +		error = virtqueue_enqueue_xmit(txvq, txm, use_indirect);
>>>  		if (unlikely(error)) {
>>>  			if (error == ENOSPC)
>>>  				PMD_TX_LOG(ERR, "virtqueue_enqueue Free count = 0");
>>> diff --git a/drivers/net/virtio/virtqueue.h b/drivers/net/virtio/virtqueue.h
>>> index 7789411..fe3fa66 100644
>>> --- a/drivers/net/virtio/virtqueue.h
>>> +++ b/drivers/net/virtio/virtqueue.h
>>> @@ -237,6 +237,25 @@ struct virtio_net_hdr_mrg_rxbuf {
>>>  	uint16_t num_buffers; /**< Number of merged rx buffers */
>>>  };
>>>  
>>> +/* Region reserved to allow for transmit header and indirect ring */
>>> +#define VIRTIO_MAX_TX_INDIRECT 8
>>> +struct virtio_tx_region {
>>> +	struct virtio_net_hdr_mrg_rxbuf tx_hdr;  
>> Any reason to use merge-able header here?
>>> +	struct vring_desc tx_indir[VIRTIO_MAX_TX_INDIRECT]
>>> +			   __attribute__((__aligned__(16)));  
>> WARNING: __aligned(size) is preferred over __attribute__((aligned(size)))
>> [...]
>
  
Thomas Monjalon Oct. 30, 2015, 6:01 p.m. UTC | #4
Sorry there is a gcc error.

2015-10-18 22:16, Stephen Hemminger:
> +		for (i = 0; i < vq_size; i++) {

virtio_ethdev.c:386:17: error: comparison between signed and unsigned integer expressions
  

Patch

diff --git a/drivers/net/virtio/virtio_ethdev.c b/drivers/net/virtio/virtio_ethdev.c
index 465d3cd..cfce4f0 100644
--- a/drivers/net/virtio/virtio_ethdev.c
+++ b/drivers/net/virtio/virtio_ethdev.c
@@ -357,27 +357,45 @@  int virtio_dev_queue_setup(struct rte_eth_dev *dev,
 	vq->virtio_net_hdr_mem = 0;
 
 	if (queue_type == VTNET_TQ) {
+		const struct rte_memzone *hdr_mz;
+		struct virtio_tx_region *txr;
+		int i;
+
 		/*
 		 * For each xmit packet, allocate a virtio_net_hdr
+		 * and indirect ring elements
 		 */
 		snprintf(vq_name, sizeof(vq_name), "port%d_tvq%d_hdrzone",
 			dev->data->port_id, queue_idx);
-		vq->virtio_net_hdr_mz = rte_memzone_reserve_aligned(vq_name,
-			vq_size * hw->vtnet_hdr_size,
+		hdr_mz = rte_memzone_reserve_aligned(vq_name,
+			vq_size * sizeof(*txr),
 			socket_id, 0, RTE_CACHE_LINE_SIZE);
-		if (vq->virtio_net_hdr_mz == NULL) {
+		if (hdr_mz == NULL) {
 			if (rte_errno == EEXIST)
-				vq->virtio_net_hdr_mz =
-					rte_memzone_lookup(vq_name);
-			if (vq->virtio_net_hdr_mz == NULL) {
+				hdr_mz = rte_memzone_lookup(vq_name);
+			if (hdr_mz == NULL) {
 				rte_free(vq);
 				return -ENOMEM;
 			}
 		}
-		vq->virtio_net_hdr_mem =
-			vq->virtio_net_hdr_mz->phys_addr;
-		memset(vq->virtio_net_hdr_mz->addr, 0,
-			vq_size * hw->vtnet_hdr_size);
+		vq->virtio_net_hdr_mz = hdr_mz;
+		vq->virtio_net_hdr_mem = hdr_mz->phys_addr;
+
+		txr = hdr_mz->addr;
+		memset(txr, 0, vq_size * sizeof(*txr));
+		for (i = 0; i < vq_size; i++) {
+			struct vring_desc *start_dp = txr[i].tx_indir;
+
+			vring_desc_init(start_dp, VIRTIO_MAX_INDIRECT);
+
+			/* first indirect descriptor is always the tx header */
+			start_dp->addr = vq->virtio_net_hdr_mem
+				+ i * sizeof(*txr)
+				+ offsetof(struct virtio_tx_region, tx_hdr);
+
+			start_dp->len = vq->hw->vtnet_hdr_size;
+			start_dp->flags = VRING_DESC_F_NEXT;
+		}
 	} else if (queue_type == VTNET_CQ) {
 		/* Allocate a page for control vq command, data and status */
 		snprintf(vq_name, sizeof(vq_name), "port%d_cvq_hdrzone",
diff --git a/drivers/net/virtio/virtio_ethdev.h b/drivers/net/virtio/virtio_ethdev.h
index 9026d42..07a9265 100644
--- a/drivers/net/virtio/virtio_ethdev.h
+++ b/drivers/net/virtio/virtio_ethdev.h
@@ -64,7 +64,8 @@ 
 	 1u << VIRTIO_NET_F_CTRL_VQ	  |	\
 	 1u << VIRTIO_NET_F_CTRL_RX	  |	\
 	 1u << VIRTIO_NET_F_CTRL_VLAN	  |	\
-	 1u << VIRTIO_NET_F_MRG_RXBUF)
+	 1u << VIRTIO_NET_F_MRG_RXBUF     |	\
+	 1u << VIRTIO_RING_F_INDIRECT_DESC)
 
 /*
  * CQ function prototype
diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c
index dbe6665..f68ab8f 100644
--- a/drivers/net/virtio/virtio_rxtx.c
+++ b/drivers/net/virtio/virtio_rxtx.c
@@ -199,14 +199,15 @@  virtqueue_enqueue_recv_refill(struct virtqueue *vq, struct rte_mbuf *cookie)
 }
 
 static int
-virtqueue_enqueue_xmit(struct virtqueue *txvq, struct rte_mbuf *cookie)
+virtqueue_enqueue_xmit(struct virtqueue *txvq, struct rte_mbuf *cookie,
+		       int use_indirect)
 {
 	struct vq_desc_extra *dxp;
 	struct vring_desc *start_dp;
 	uint16_t seg_num = cookie->nb_segs;
-	uint16_t needed = 1 + seg_num;
+	uint16_t needed = use_indirect ? 1 : 1 + seg_num;
 	uint16_t head_idx, idx;
-	uint16_t head_size = txvq->hw->vtnet_hdr_size;
+	unsigned long offs;
 
 	if (unlikely(txvq->vq_free_cnt == 0))
 		return -ENOSPC;
@@ -220,12 +221,29 @@  virtqueue_enqueue_xmit(struct virtqueue *txvq, struct rte_mbuf *cookie)
 	dxp = &txvq->vq_descx[idx];
 	dxp->cookie = (void *)cookie;
 	dxp->ndescs = needed;
-
 	start_dp = txvq->vq_ring.desc;
-	start_dp[idx].addr =
-		txvq->virtio_net_hdr_mem + idx * head_size;
-	start_dp[idx].len = (uint32_t)head_size;
-	start_dp[idx].flags = VRING_DESC_F_NEXT;
+
+	if (use_indirect) {
+		struct virtio_tx_region *txr
+			= txvq->virtio_net_hdr_mz->addr;
+
+		offs = idx * sizeof(struct virtio_tx_region)
+			+ offsetof(struct virtio_tx_region, tx_indir);
+
+		start_dp[idx].addr  = txvq->virtio_net_hdr_mem + offs;
+		start_dp[idx].len   = (seg_num + 1) * sizeof(struct vring_desc);
+		start_dp[idx].flags = VRING_DESC_F_INDIRECT;
+
+		start_dp = txr[idx].tx_indir;
+		idx = 0;
+	} else {
+		offs = idx * sizeof(struct virtio_tx_region)
+			+ offsetof(struct virtio_tx_region, tx_hdr);
+
+		start_dp[idx].addr  = txvq->virtio_net_hdr_mem + offs;
+		start_dp[idx].len   = txvq->hw->vtnet_hdr_size;
+		start_dp[idx].flags = VRING_DESC_F_NEXT;
+	}
 
 	for (; ((seg_num > 0) && (cookie != NULL)); seg_num--) {
 		idx = start_dp[idx].next;
@@ -236,7 +254,12 @@  virtqueue_enqueue_xmit(struct virtqueue *txvq, struct rte_mbuf *cookie)
 	}
 
 	start_dp[idx].flags &= ~VRING_DESC_F_NEXT;
-	idx = start_dp[idx].next;
+
+	if (use_indirect)
+		idx = txvq->vq_ring.desc[head_idx].next;
+	else
+		idx = start_dp[idx].next;
+
 	txvq->vq_desc_head_idx = idx;
 	if (txvq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END)
 		txvq->vq_desc_tail_idx = idx;
@@ -261,7 +284,7 @@  static void
 virtio_dev_vring_start(struct virtqueue *vq, int queue_type)
 {
 	struct rte_mbuf *m;
-	int i, nbufs, error, size = vq->vq_nentries;
+	int nbufs, error, size = vq->vq_nentries;
 	struct vring *vr = &vq->vq_ring;
 	uint8_t *ring_mem = vq->vq_ring_virt_mem;
 
@@ -279,10 +302,7 @@  virtio_dev_vring_start(struct virtqueue *vq, int queue_type)
 	vq->vq_free_cnt = vq->vq_nentries;
 	memset(vq->vq_descx, 0, sizeof(struct vq_desc_extra) * vq->vq_nentries);
 
-	/* Chain all the descriptors in the ring with an END */
-	for (i = 0; i < size - 1; i++)
-		vr->desc[i].next = (uint16_t)(i + 1);
-	vr->desc[i].next = VQ_RING_DESC_CHAIN_END;
+	vring_desc_init(vr->desc, size);
 
 	/*
 	 * Disable device(host) interrupting guest
@@ -760,7 +780,15 @@  virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
 
 	for (nb_tx = 0; nb_tx < nb_pkts; nb_tx++) {
 		struct rte_mbuf *txm = tx_pkts[nb_tx];
-		int need = txm->nb_segs - txvq->vq_free_cnt + 1;
+		int use_indirect, slots, need;
+
+		use_indirect = vtpci_with_feature(txvq->hw,
+						  VIRTIO_RING_F_INDIRECT_DESC)
+			&& (txm->nb_segs < VIRTIO_MAX_TX_INDIRECT);
+
+		/* How many ring entries are needed to this Tx? */
+		slots = use_indirect ? 1 : 1 + txm->nb_segs;
+		need = slots - txvq->vq_free_cnt;
 
 		/* Positive value indicates it need free vring descriptors */
 		if (need > 0) {
@@ -769,7 +797,7 @@  virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
 			need = RTE_MIN(need, (int)nb_used);
 
 			virtio_xmit_cleanup(txvq, need);
-			need = txm->nb_segs - txvq->vq_free_cnt + 1;
+			need = slots - txvq->vq_free_cnt;
 			if (unlikely(need > 0)) {
 				PMD_TX_LOG(ERR,
 					   "No free tx descriptors to transmit");
@@ -787,7 +815,7 @@  virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
 		}
 
 		/* Enqueue Packet buffers */
-		error = virtqueue_enqueue_xmit(txvq, txm);
+		error = virtqueue_enqueue_xmit(txvq, txm, use_indirect);
 		if (unlikely(error)) {
 			if (error == ENOSPC)
 				PMD_TX_LOG(ERR, "virtqueue_enqueue Free count = 0");
diff --git a/drivers/net/virtio/virtqueue.h b/drivers/net/virtio/virtqueue.h
index 7789411..fe3fa66 100644
--- a/drivers/net/virtio/virtqueue.h
+++ b/drivers/net/virtio/virtqueue.h
@@ -237,6 +237,25 @@  struct virtio_net_hdr_mrg_rxbuf {
 	uint16_t num_buffers; /**< Number of merged rx buffers */
 };
 
+/* Region reserved to allow for transmit header and indirect ring */
+#define VIRTIO_MAX_TX_INDIRECT 8
+struct virtio_tx_region {
+	struct virtio_net_hdr_mrg_rxbuf tx_hdr;
+	struct vring_desc tx_indir[VIRTIO_MAX_TX_INDIRECT]
+			   __attribute__((__aligned__(16)));
+};
+
+/* Chain all the descriptors in the ring with an END */
+static inline void
+vring_desc_init(struct vring_desc *dp, uint16_t n)
+{
+	uint16_t i;
+
+	for (i = 0; i < n - 1; i++)
+		dp[i].next = (uint16_t)(i + 1);
+	dp[i].next = VQ_RING_DESC_CHAIN_END;
+}
+
 /**
  * Tell the backend not to interrupt us.
  */