[dpdk-dev,6/8] bond: handle slaves with fewer queues than bonding device

Message ID 1449249260-15165-7-git-send-email-stephen@networkplumber.org (mailing list archive)
State Superseded, archived
Delegated to: Bruce Richardson
Headers

Commit Message

Stephen Hemminger Dec. 4, 2015, 5:14 p.m. UTC
  From: Eric Kinzie <ekinzie@brocade.com>

In the event that the bonding device has a greater number of tx and/or rx
queues than the slave being added, track the queue limits of the slave.
On receive, ignore queue identifiers beyond what the slave interface
can support.  During transmit, pick a different queue id to use if the
intended queue is not available on the slave.

Signed-off-by: Eric Kinzie <ekinzie@brocade.com>
Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
---
 drivers/net/bonding/rte_eth_bond_api.c     |   6 +-
 drivers/net/bonding/rte_eth_bond_pmd.c     | 141 +++++++++++++++++++++++++----
 drivers/net/bonding/rte_eth_bond_private.h |   5 +-
 3 files changed, 129 insertions(+), 23 deletions(-)
  

Comments

Andriy Berestovskyy Dec. 4, 2015, 6:36 p.m. UTC | #1
Hi guys,
I'm not quite sure if we can support less TX queues on a slave that easy:

> queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
> num_tx_slave = rte_eth_tx_burst(slaves[i], queue_id,
>      slave_bufs[i], slave_nb_pkts[i]);

It seems that two different lcores might end up writing to the same
slave queue at the same time, isn't it?

Regards,
Andriy

On Fri, Dec 4, 2015 at 6:14 PM, Stephen Hemminger
<stephen@networkplumber.org> wrote:
> From: Eric Kinzie <ekinzie@brocade.com>
>
> In the event that the bonding device has a greater number of tx and/or rx
> queues than the slave being added, track the queue limits of the slave.
> On receive, ignore queue identifiers beyond what the slave interface
> can support.  During transmit, pick a different queue id to use if the
> intended queue is not available on the slave.
>
> Signed-off-by: Eric Kinzie <ekinzie@brocade.com>
> Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
> ---
>  drivers/net/bonding/rte_eth_bond_api.c     |   6 +-
>  drivers/net/bonding/rte_eth_bond_pmd.c     | 141 +++++++++++++++++++++++++----
>  drivers/net/bonding/rte_eth_bond_private.h |   5 +-
>  3 files changed, 129 insertions(+), 23 deletions(-)
>
> diff --git a/drivers/net/bonding/rte_eth_bond_api.c b/drivers/net/bonding/rte_eth_bond_api.c
> index 630a461..64058ff 100644
> --- a/drivers/net/bonding/rte_eth_bond_api.c
> +++ b/drivers/net/bonding/rte_eth_bond_api.c
> @@ -340,11 +340,11 @@ __eth_bond_slave_add_lock_free(uint8_t bonded_port_id, uint8_t slave_port_id)
>
>         slave_eth_dev = &rte_eth_devices[slave_port_id];
>
> -       /* Add slave details to bonded device */
> -       slave_add(internals, slave_eth_dev);
> -
>         rte_eth_dev_info_get(slave_port_id, &dev_info);
>
> +       /* Add slave details to bonded device */
> +       slave_add(internals, slave_eth_dev, &dev_info);
> +
>         /* We need to store slaves reta_size to be able to synchronize RETA for all
>          * slave devices even if its sizes are different.
>          */
> diff --git a/drivers/net/bonding/rte_eth_bond_pmd.c b/drivers/net/bonding/rte_eth_bond_pmd.c
> index 77582dd..868e66b 100644
> --- a/drivers/net/bonding/rte_eth_bond_pmd.c
> +++ b/drivers/net/bonding/rte_eth_bond_pmd.c
> @@ -76,6 +76,47 @@ get_vlan_offset(struct ether_hdr *eth_hdr, uint16_t *proto)
>         return vlan_offset;
>  }
>
> +static uint8_t
> +bond_active_slaves_by_rxqid(struct bond_dev_private *internals, int queue_id,
> +               uint8_t slaves[])
> +{
> +       struct bond_slave_details *slave_details;
> +       uint8_t num_of_slaves;
> +       uint8_t i = 0;
> +
> +       num_of_slaves = internals->active_slave_count;
> +       memcpy(slaves, internals->active_slaves,
> +                       sizeof(internals->active_slaves[0]) * num_of_slaves);
> +
> +       if (num_of_slaves < 1 || internals->kvlist)
> +               return num_of_slaves;
> +
> +       /* remove slaves that don't have a queue numbered "queue_id" */
> +       while (i < num_of_slaves) {
> +               slave_details = &internals->slaves[i];
> +               if (unlikely(queue_id >= slave_details->nb_rx_queues)) {
> +                       slaves[i] = slaves[num_of_slaves-1];
> +                       num_of_slaves--;
> +               } else
> +                       i++;
> +       }
> +
> +       return num_of_slaves;
> +}
> +
> +static int
> +bond_slave_txqid(struct bond_dev_private *internals, uint8_t slave_id,
> +               int queue_id)
> +{
> +       struct bond_slave_details *slave_details;
> +
> +       if (internals->kvlist)
> +               return queue_id;
> +
> +       slave_details = &internals->slaves[slave_id];
> +       return queue_id % slave_details->nb_tx_queues;
> +}
> +
>  static uint16_t
>  bond_ethdev_rx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
>  {
> @@ -83,6 +124,8 @@ bond_ethdev_rx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
>
>         uint16_t num_rx_slave = 0;
>         uint16_t num_rx_total = 0;
> +       uint8_t slaves[RTE_MAX_ETHPORTS];
> +       uint8_t num_of_slaves;
>
>         int i;
>
> @@ -91,11 +134,13 @@ bond_ethdev_rx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
>
>         internals = bd_rx_q->dev_private;
>
> +       num_of_slaves = bond_active_slaves_by_rxqid(internals, bd_rx_q->queue_id,
> +                                                   slaves);
>
> -       for (i = 0; i < internals->active_slave_count && nb_pkts; i++) {
> +       for (i = 0; i < num_of_slaves && nb_pkts; i++) {
>                 /* Offset of pointer to *bufs increases as packets are received
>                  * from other slaves */
> -               num_rx_slave = rte_eth_rx_burst(internals->active_slaves[i],
> +               num_rx_slave = rte_eth_rx_burst(slaves[i],
>                                 bd_rx_q->queue_id, bufs + num_rx_total, nb_pkts);
>                 if (num_rx_slave) {
>                         num_rx_total += num_rx_slave;
> @@ -117,8 +162,13 @@ bond_ethdev_rx_burst_active_backup(void *queue, struct rte_mbuf **bufs,
>
>         internals = bd_rx_q->dev_private;
>
> -       return rte_eth_rx_burst(internals->current_primary_port,
> -                       bd_rx_q->queue_id, bufs, nb_pkts);
> +       uint8_t active_slave = internals->current_primary_port;
> +       struct rte_eth_dev *dev = &rte_eth_devices[active_slave];
> +
> +       if (bd_rx_q->queue_id >= dev->data->nb_rx_queues)
> +               return 0;
> +
> +       return rte_eth_rx_burst(active_slave, bd_rx_q->queue_id, bufs, nb_pkts);
>  }
>
>  static uint16_t
> @@ -144,9 +194,9 @@ bond_ethdev_rx_burst_8023ad(void *queue, struct rte_mbuf **bufs,
>         rte_eth_macaddr_get(internals->port_id, &bond_mac);
>         /* Copy slave list to protect against slave up/down changes during tx
>          * bursting */
> -       slave_count = internals->active_slave_count;
> -       memcpy(slaves, internals->active_slaves,
> -                       sizeof(internals->active_slaves[0]) * slave_count);
> +
> +       slave_count = bond_active_slaves_by_rxqid(internals, bd_rx_q->queue_id,
> +                                                 slaves);
>
>         for (i = 0; i < slave_count && num_rx_total < nb_pkts; i++) {
>                 j = num_rx_total;
> @@ -401,6 +451,7 @@ bond_ethdev_tx_burst_round_robin(void *queue, struct rte_mbuf **bufs,
>
>         static int slave_idx = 0;
>         int i, cslave_idx = 0, tx_fail_total = 0;
> +       int queue_id;
>
>         bd_tx_q = (struct bond_tx_queue *)queue;
>         internals = bd_tx_q->dev_private;
> @@ -427,7 +478,9 @@ bond_ethdev_tx_burst_round_robin(void *queue, struct rte_mbuf **bufs,
>         /* Send packet burst on each slave device */
>         for (i = 0; i < num_of_slaves; i++) {
>                 if (slave_nb_pkts[i] > 0) {
> -                       num_tx_slave = rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
> +                       queue_id = bond_slave_txqid(internals, i,
> +                                                   bd_tx_q->queue_id);
> +                       num_tx_slave = rte_eth_tx_burst(slaves[i], queue_id,
>                                         slave_bufs[i], slave_nb_pkts[i]);
>
>                         /* if tx burst fails move packets to end of bufs */
> @@ -453,14 +506,27 @@ bond_ethdev_tx_burst_active_backup(void *queue,
>  {
>         struct bond_dev_private *internals;
>         struct bond_tx_queue *bd_tx_q;
> +       int queue_id;
> +       int i;
> +       uint8_t num_of_slaves;
> +       uint8_t slaves[RTE_MAX_ETHPORTS];
>
>         bd_tx_q = (struct bond_tx_queue *)queue;
>         internals = bd_tx_q->dev_private;
>
> -       if (internals->active_slave_count < 1)
> +       num_of_slaves = internals->active_slave_count;
> +       memcpy(slaves, internals->active_slaves,
> +                       sizeof(internals->active_slaves[0]) * num_of_slaves);
> +
> +       if (num_of_slaves < 1)
>                 return 0;
>
> -       return rte_eth_tx_burst(internals->current_primary_port, bd_tx_q->queue_id,
> +       for (i = 0; i < num_of_slaves; i++)
> +               if (slaves[i] == internals->current_primary_port)
> +                       break;
> +
> +       queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
> +       return rte_eth_tx_burst(internals->current_primary_port, queue_id,
>                         bufs, nb_pkts);
>  }
>
> @@ -696,6 +762,7 @@ bond_ethdev_tx_burst_tlb(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
>         struct ether_hdr *ether_hdr;
>         struct ether_addr primary_slave_addr;
>         struct ether_addr active_slave_addr;
> +       int queue_id;
>
>         if (num_of_slaves < 1)
>                 return num_tx_total;
> @@ -725,7 +792,8 @@ bond_ethdev_tx_burst_tlb(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
>  #endif
>                 }
>
> -               num_tx_total += rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
> +               queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
> +               num_tx_total += rte_eth_tx_burst(slaves[i], queue_id,
>                                 bufs + num_tx_total, nb_pkts - num_tx_total);
>
>                 if (num_tx_total == nb_pkts)
> @@ -903,6 +971,7 @@ bond_ethdev_tx_burst_balance(void *queue, struct rte_mbuf **bufs,
>         uint16_t num_tx_total = 0, num_tx_slave = 0, tx_fail_total = 0;
>
>         int i, op_slave_id;
> +       int queue_id;
>
>         struct rte_mbuf *slave_bufs[RTE_MAX_ETHPORTS][nb_pkts];
>         uint16_t slave_nb_pkts[RTE_MAX_ETHPORTS] = { 0 };
> @@ -931,7 +1000,9 @@ bond_ethdev_tx_burst_balance(void *queue, struct rte_mbuf **bufs,
>         /* Send packet burst on each slave device */
>         for (i = 0; i < num_of_slaves; i++) {
>                 if (slave_nb_pkts[i] > 0) {
> -                       num_tx_slave = rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
> +                       queue_id = bond_slave_txqid(internals, i,
> +                                                   bd_tx_q->queue_id);
> +                       num_tx_slave = rte_eth_tx_burst(slaves[i], queue_id,
>                                         slave_bufs[i], slave_nb_pkts[i]);
>
>                         /* if tx burst fails move packets to end of bufs */
> @@ -977,6 +1048,8 @@ bond_ethdev_tx_burst_8023ad(void *queue, struct rte_mbuf **bufs,
>         /* Slow packets placed in each slave */
>         uint8_t slave_slow_nb_pkts[RTE_MAX_ETHPORTS] = { 0 };
>
> +       int queue_id;
> +
>         bd_tx_q = (struct bond_tx_queue *)queue;
>         internals = bd_tx_q->dev_private;
>
> @@ -1022,7 +1095,8 @@ bond_ethdev_tx_burst_8023ad(void *queue, struct rte_mbuf **bufs,
>                 if (slave_nb_pkts[i] == 0)
>                         continue;
>
> -               num_tx_slave = rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
> +               queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
> +               num_tx_slave = rte_eth_tx_burst(slaves[i], queue_id,
>                                 slave_bufs[i], slave_nb_pkts[i]);
>
>                 /* If tx burst fails drop slow packets */
> @@ -1057,6 +1131,7 @@ bond_ethdev_tx_burst_broadcast(void *queue, struct rte_mbuf **bufs,
>
>         int slave_tx_total[RTE_MAX_ETHPORTS];
>         int i, most_successful_tx_slave = -1;
> +       int queue_id;
>
>         bd_tx_q = (struct bond_tx_queue *)queue;
>         internals = bd_tx_q->dev_private;
> @@ -1076,7 +1151,8 @@ bond_ethdev_tx_burst_broadcast(void *queue, struct rte_mbuf **bufs,
>
>         /* Transmit burst on each active slave */
>         for (i = 0; i < num_of_slaves; i++) {
> -               slave_tx_total[i] = rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
> +               queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
> +               slave_tx_total[i] = rte_eth_tx_burst(slaves[i], queue_id,
>                                         bufs, nb_pkts);
>
>                 if (unlikely(slave_tx_total[i] < nb_pkts))
> @@ -1298,9 +1374,22 @@ int
>  slave_configure(struct rte_eth_dev *bonded_eth_dev,
>                 struct rte_eth_dev *slave_eth_dev)
>  {
> +       struct bond_dev_private *internals;
>         struct bond_rx_queue *bd_rx_q;
>         struct bond_tx_queue *bd_tx_q;
> +       int slave_id;
> +
> +       internals = bonded_eth_dev->data->dev_private;
>
> +       for (slave_id = 0; slave_id < internals->slave_count; slave_id++)
> +               if (internals->slaves[slave_id].port_id ==
> +                   slave_eth_dev->data->port_id)
> +                       break;
> +
> +       RTE_VERIFY(slave_id != internals->slave_count);
> +
> +       uint16_t nb_rx_queues = internals->slaves[slave_id].nb_rx_queues;
> +       uint16_t nb_tx_queues = internals->slaves[slave_id].nb_tx_queues;
>         int errval;
>         uint16_t q_id;
>
> @@ -1331,8 +1420,7 @@ slave_configure(struct rte_eth_dev *bonded_eth_dev,
>
>         /* Configure device */
>         errval = rte_eth_dev_configure(slave_eth_dev->data->port_id,
> -                       bonded_eth_dev->data->nb_rx_queues,
> -                       bonded_eth_dev->data->nb_tx_queues,
> +                       nb_rx_queues, nb_tx_queues,
>                         &(slave_eth_dev->data->dev_conf));
>         if (errval != 0) {
>                 RTE_BOND_LOG(ERR, "Cannot configure slave device: port %u , err (%d)",
> @@ -1343,7 +1431,7 @@ slave_configure(struct rte_eth_dev *bonded_eth_dev,
>         /* Setup Rx Queues */
>         /* Use existing queues, if any */
>         for (q_id = slave_eth_dev->data->nb_rx_queues;
> -            q_id < bonded_eth_dev->data->nb_rx_queues; q_id++) {
> +            q_id < nb_rx_queues ; q_id++) {
>                 bd_rx_q = (struct bond_rx_queue *)bonded_eth_dev->data->rx_queues[q_id];
>
>                 errval = rte_eth_rx_queue_setup(slave_eth_dev->data->port_id, q_id,
> @@ -1361,7 +1449,7 @@ slave_configure(struct rte_eth_dev *bonded_eth_dev,
>         /* Setup Tx Queues */
>         /* Use existing queues, if any */
>         for (q_id = slave_eth_dev->data->nb_tx_queues;
> -            q_id < bonded_eth_dev->data->nb_tx_queues; q_id++) {
> +            q_id < nb_tx_queues ; q_id++) {
>                 bd_tx_q = (struct bond_tx_queue *)bonded_eth_dev->data->tx_queues[q_id];
>
>                 errval = rte_eth_tx_queue_setup(slave_eth_dev->data->port_id, q_id,
> @@ -1440,7 +1528,8 @@ bond_ethdev_slave_link_status_change_monitor(void *cb_arg);
>
>  void
>  slave_add(struct bond_dev_private *internals,
> -               struct rte_eth_dev *slave_eth_dev)
> +               struct rte_eth_dev *slave_eth_dev,
> +               const struct rte_eth_dev_info *slave_dev_info)
>  {
>         struct bond_slave_details *slave_details =
>                         &internals->slaves[internals->slave_count];
> @@ -1448,6 +1537,20 @@ slave_add(struct bond_dev_private *internals,
>         slave_details->port_id = slave_eth_dev->data->port_id;
>         slave_details->last_link_status = 0;
>
> +       uint16_t bond_nb_rx_queues =
> +               rte_eth_devices[internals->port_id].data->nb_rx_queues;
> +       uint16_t bond_nb_tx_queues =
> +               rte_eth_devices[internals->port_id].data->nb_tx_queues;
> +
> +       slave_details->nb_rx_queues =
> +               bond_nb_rx_queues > slave_dev_info->max_rx_queues
> +               ? slave_dev_info->max_rx_queues
> +               : bond_nb_rx_queues;
> +       slave_details->nb_tx_queues =
> +               bond_nb_tx_queues > slave_dev_info->max_tx_queues
> +               ? slave_dev_info->max_tx_queues
> +               : bond_nb_tx_queues;
> +
>         /* If slave device doesn't support interrupts then we need to enabled
>          * polling to monitor link status */
>         if (!(slave_eth_dev->data->dev_flags & RTE_PCI_DRV_INTR_LSC)) {
> diff --git a/drivers/net/bonding/rte_eth_bond_private.h b/drivers/net/bonding/rte_eth_bond_private.h
> index 6c47a29..02f6de1 100644
> --- a/drivers/net/bonding/rte_eth_bond_private.h
> +++ b/drivers/net/bonding/rte_eth_bond_private.h
> @@ -101,6 +101,8 @@ struct bond_slave_details {
>         uint8_t link_status_poll_enabled;
>         uint8_t link_status_wait_to_complete;
>         uint8_t last_link_status;
> +       uint16_t nb_rx_queues;
> +       uint16_t nb_tx_queues;
>         /**< Port Id of slave eth_dev */
>         struct ether_addr persisted_mac_addr;
>
> @@ -240,7 +242,8 @@ slave_remove(struct bond_dev_private *internals,
>
>  void
>  slave_add(struct bond_dev_private *internals,
> -               struct rte_eth_dev *slave_eth_dev);
> +               struct rte_eth_dev *slave_eth_dev,
> +               const struct rte_eth_dev_info *slave_dev_info);
>
>  uint16_t
>  xmit_l2_hash(const struct rte_mbuf *buf, uint8_t slave_count);
> --
> 2.1.4
>
  
Eric Kinzie Dec. 4, 2015, 7:18 p.m. UTC | #2
On Fri Dec 04 19:36:09 +0100 2015, Andriy Berestovskyy wrote:
> Hi guys,
> I'm not quite sure if we can support less TX queues on a slave that easy:
> 
> > queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
> > num_tx_slave = rte_eth_tx_burst(slaves[i], queue_id,
> >      slave_bufs[i], slave_nb_pkts[i]);
> 
> It seems that two different lcores might end up writing to the same
> slave queue at the same time, isn't it?
> 
> Regards,
> Andriy

Andriy, I think you're probably right about this.  Perhaps it should
instead refuse to add or refuse to activate a slave with too few
tx queues.  Could probably fix this with another layer of buffering
so that an lcore with a valid tx queue could pick up the mbufs later,
but this doesn't seem very appealing.

Eric


> On Fri, Dec 4, 2015 at 6:14 PM, Stephen Hemminger
> <stephen@networkplumber.org> wrote:
> > From: Eric Kinzie <ekinzie@brocade.com>
> >
> > In the event that the bonding device has a greater number of tx and/or rx
> > queues than the slave being added, track the queue limits of the slave.
> > On receive, ignore queue identifiers beyond what the slave interface
> > can support.  During transmit, pick a different queue id to use if the
> > intended queue is not available on the slave.
> >
> > Signed-off-by: Eric Kinzie <ekinzie@brocade.com>
> > Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
> > ---
> >  drivers/net/bonding/rte_eth_bond_api.c     |   6 +-
> >  drivers/net/bonding/rte_eth_bond_pmd.c     | 141 +++++++++++++++++++++++++----
> >  drivers/net/bonding/rte_eth_bond_private.h |   5 +-
> >  3 files changed, 129 insertions(+), 23 deletions(-)
> >
> > diff --git a/drivers/net/bonding/rte_eth_bond_api.c b/drivers/net/bonding/rte_eth_bond_api.c
> > index 630a461..64058ff 100644
> > --- a/drivers/net/bonding/rte_eth_bond_api.c
> > +++ b/drivers/net/bonding/rte_eth_bond_api.c
> > @@ -340,11 +340,11 @@ __eth_bond_slave_add_lock_free(uint8_t bonded_port_id, uint8_t slave_port_id)
> >
> >         slave_eth_dev = &rte_eth_devices[slave_port_id];
> >
> > -       /* Add slave details to bonded device */
> > -       slave_add(internals, slave_eth_dev);
> > -
> >         rte_eth_dev_info_get(slave_port_id, &dev_info);
> >
> > +       /* Add slave details to bonded device */
> > +       slave_add(internals, slave_eth_dev, &dev_info);
> > +
> >         /* We need to store slaves reta_size to be able to synchronize RETA for all
> >          * slave devices even if its sizes are different.
> >          */
> > diff --git a/drivers/net/bonding/rte_eth_bond_pmd.c b/drivers/net/bonding/rte_eth_bond_pmd.c
> > index 77582dd..868e66b 100644
> > --- a/drivers/net/bonding/rte_eth_bond_pmd.c
> > +++ b/drivers/net/bonding/rte_eth_bond_pmd.c
> > @@ -76,6 +76,47 @@ get_vlan_offset(struct ether_hdr *eth_hdr, uint16_t *proto)
> >         return vlan_offset;
> >  }
> >
> > +static uint8_t
> > +bond_active_slaves_by_rxqid(struct bond_dev_private *internals, int queue_id,
> > +               uint8_t slaves[])
> > +{
> > +       struct bond_slave_details *slave_details;
> > +       uint8_t num_of_slaves;
> > +       uint8_t i = 0;
> > +
> > +       num_of_slaves = internals->active_slave_count;
> > +       memcpy(slaves, internals->active_slaves,
> > +                       sizeof(internals->active_slaves[0]) * num_of_slaves);
> > +
> > +       if (num_of_slaves < 1 || internals->kvlist)
> > +               return num_of_slaves;
> > +
> > +       /* remove slaves that don't have a queue numbered "queue_id" */
> > +       while (i < num_of_slaves) {
> > +               slave_details = &internals->slaves[i];
> > +               if (unlikely(queue_id >= slave_details->nb_rx_queues)) {
> > +                       slaves[i] = slaves[num_of_slaves-1];
> > +                       num_of_slaves--;
> > +               } else
> > +                       i++;
> > +       }
> > +
> > +       return num_of_slaves;
> > +}
> > +
> > +static int
> > +bond_slave_txqid(struct bond_dev_private *internals, uint8_t slave_id,
> > +               int queue_id)
> > +{
> > +       struct bond_slave_details *slave_details;
> > +
> > +       if (internals->kvlist)
> > +               return queue_id;
> > +
> > +       slave_details = &internals->slaves[slave_id];
> > +       return queue_id % slave_details->nb_tx_queues;
> > +}
> > +
> >  static uint16_t
> >  bond_ethdev_rx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
> >  {
> > @@ -83,6 +124,8 @@ bond_ethdev_rx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
> >
> >         uint16_t num_rx_slave = 0;
> >         uint16_t num_rx_total = 0;
> > +       uint8_t slaves[RTE_MAX_ETHPORTS];
> > +       uint8_t num_of_slaves;
> >
> >         int i;
> >
> > @@ -91,11 +134,13 @@ bond_ethdev_rx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
> >
> >         internals = bd_rx_q->dev_private;
> >
> > +       num_of_slaves = bond_active_slaves_by_rxqid(internals, bd_rx_q->queue_id,
> > +                                                   slaves);
> >
> > -       for (i = 0; i < internals->active_slave_count && nb_pkts; i++) {
> > +       for (i = 0; i < num_of_slaves && nb_pkts; i++) {
> >                 /* Offset of pointer to *bufs increases as packets are received
> >                  * from other slaves */
> > -               num_rx_slave = rte_eth_rx_burst(internals->active_slaves[i],
> > +               num_rx_slave = rte_eth_rx_burst(slaves[i],
> >                                 bd_rx_q->queue_id, bufs + num_rx_total, nb_pkts);
> >                 if (num_rx_slave) {
> >                         num_rx_total += num_rx_slave;
> > @@ -117,8 +162,13 @@ bond_ethdev_rx_burst_active_backup(void *queue, struct rte_mbuf **bufs,
> >
> >         internals = bd_rx_q->dev_private;
> >
> > -       return rte_eth_rx_burst(internals->current_primary_port,
> > -                       bd_rx_q->queue_id, bufs, nb_pkts);
> > +       uint8_t active_slave = internals->current_primary_port;
> > +       struct rte_eth_dev *dev = &rte_eth_devices[active_slave];
> > +
> > +       if (bd_rx_q->queue_id >= dev->data->nb_rx_queues)
> > +               return 0;
> > +
> > +       return rte_eth_rx_burst(active_slave, bd_rx_q->queue_id, bufs, nb_pkts);
> >  }
> >
> >  static uint16_t
> > @@ -144,9 +194,9 @@ bond_ethdev_rx_burst_8023ad(void *queue, struct rte_mbuf **bufs,
> >         rte_eth_macaddr_get(internals->port_id, &bond_mac);
> >         /* Copy slave list to protect against slave up/down changes during tx
> >          * bursting */
> > -       slave_count = internals->active_slave_count;
> > -       memcpy(slaves, internals->active_slaves,
> > -                       sizeof(internals->active_slaves[0]) * slave_count);
> > +
> > +       slave_count = bond_active_slaves_by_rxqid(internals, bd_rx_q->queue_id,
> > +                                                 slaves);
> >
> >         for (i = 0; i < slave_count && num_rx_total < nb_pkts; i++) {
> >                 j = num_rx_total;
> > @@ -401,6 +451,7 @@ bond_ethdev_tx_burst_round_robin(void *queue, struct rte_mbuf **bufs,
> >
> >         static int slave_idx = 0;
> >         int i, cslave_idx = 0, tx_fail_total = 0;
> > +       int queue_id;
> >
> >         bd_tx_q = (struct bond_tx_queue *)queue;
> >         internals = bd_tx_q->dev_private;
> > @@ -427,7 +478,9 @@ bond_ethdev_tx_burst_round_robin(void *queue, struct rte_mbuf **bufs,
> >         /* Send packet burst on each slave device */
> >         for (i = 0; i < num_of_slaves; i++) {
> >                 if (slave_nb_pkts[i] > 0) {
> > -                       num_tx_slave = rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
> > +                       queue_id = bond_slave_txqid(internals, i,
> > +                                                   bd_tx_q->queue_id);
> > +                       num_tx_slave = rte_eth_tx_burst(slaves[i], queue_id,
> >                                         slave_bufs[i], slave_nb_pkts[i]);
> >
> >                         /* if tx burst fails move packets to end of bufs */
> > @@ -453,14 +506,27 @@ bond_ethdev_tx_burst_active_backup(void *queue,
> >  {
> >         struct bond_dev_private *internals;
> >         struct bond_tx_queue *bd_tx_q;
> > +       int queue_id;
> > +       int i;
> > +       uint8_t num_of_slaves;
> > +       uint8_t slaves[RTE_MAX_ETHPORTS];
> >
> >         bd_tx_q = (struct bond_tx_queue *)queue;
> >         internals = bd_tx_q->dev_private;
> >
> > -       if (internals->active_slave_count < 1)
> > +       num_of_slaves = internals->active_slave_count;
> > +       memcpy(slaves, internals->active_slaves,
> > +                       sizeof(internals->active_slaves[0]) * num_of_slaves);
> > +
> > +       if (num_of_slaves < 1)
> >                 return 0;
> >
> > -       return rte_eth_tx_burst(internals->current_primary_port, bd_tx_q->queue_id,
> > +       for (i = 0; i < num_of_slaves; i++)
> > +               if (slaves[i] == internals->current_primary_port)
> > +                       break;
> > +
> > +       queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
> > +       return rte_eth_tx_burst(internals->current_primary_port, queue_id,
> >                         bufs, nb_pkts);
> >  }
> >
> > @@ -696,6 +762,7 @@ bond_ethdev_tx_burst_tlb(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
> >         struct ether_hdr *ether_hdr;
> >         struct ether_addr primary_slave_addr;
> >         struct ether_addr active_slave_addr;
> > +       int queue_id;
> >
> >         if (num_of_slaves < 1)
> >                 return num_tx_total;
> > @@ -725,7 +792,8 @@ bond_ethdev_tx_burst_tlb(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
> >  #endif
> >                 }
> >
> > -               num_tx_total += rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
> > +               queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
> > +               num_tx_total += rte_eth_tx_burst(slaves[i], queue_id,
> >                                 bufs + num_tx_total, nb_pkts - num_tx_total);
> >
> >                 if (num_tx_total == nb_pkts)
> > @@ -903,6 +971,7 @@ bond_ethdev_tx_burst_balance(void *queue, struct rte_mbuf **bufs,
> >         uint16_t num_tx_total = 0, num_tx_slave = 0, tx_fail_total = 0;
> >
> >         int i, op_slave_id;
> > +       int queue_id;
> >
> >         struct rte_mbuf *slave_bufs[RTE_MAX_ETHPORTS][nb_pkts];
> >         uint16_t slave_nb_pkts[RTE_MAX_ETHPORTS] = { 0 };
> > @@ -931,7 +1000,9 @@ bond_ethdev_tx_burst_balance(void *queue, struct rte_mbuf **bufs,
> >         /* Send packet burst on each slave device */
> >         for (i = 0; i < num_of_slaves; i++) {
> >                 if (slave_nb_pkts[i] > 0) {
> > -                       num_tx_slave = rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
> > +                       queue_id = bond_slave_txqid(internals, i,
> > +                                                   bd_tx_q->queue_id);
> > +                       num_tx_slave = rte_eth_tx_burst(slaves[i], queue_id,
> >                                         slave_bufs[i], slave_nb_pkts[i]);
> >
> >                         /* if tx burst fails move packets to end of bufs */
> > @@ -977,6 +1048,8 @@ bond_ethdev_tx_burst_8023ad(void *queue, struct rte_mbuf **bufs,
> >         /* Slow packets placed in each slave */
> >         uint8_t slave_slow_nb_pkts[RTE_MAX_ETHPORTS] = { 0 };
> >
> > +       int queue_id;
> > +
> >         bd_tx_q = (struct bond_tx_queue *)queue;
> >         internals = bd_tx_q->dev_private;
> >
> > @@ -1022,7 +1095,8 @@ bond_ethdev_tx_burst_8023ad(void *queue, struct rte_mbuf **bufs,
> >                 if (slave_nb_pkts[i] == 0)
> >                         continue;
> >
> > -               num_tx_slave = rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
> > +               queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
> > +               num_tx_slave = rte_eth_tx_burst(slaves[i], queue_id,
> >                                 slave_bufs[i], slave_nb_pkts[i]);
> >
> >                 /* If tx burst fails drop slow packets */
> > @@ -1057,6 +1131,7 @@ bond_ethdev_tx_burst_broadcast(void *queue, struct rte_mbuf **bufs,
> >
> >         int slave_tx_total[RTE_MAX_ETHPORTS];
> >         int i, most_successful_tx_slave = -1;
> > +       int queue_id;
> >
> >         bd_tx_q = (struct bond_tx_queue *)queue;
> >         internals = bd_tx_q->dev_private;
> > @@ -1076,7 +1151,8 @@ bond_ethdev_tx_burst_broadcast(void *queue, struct rte_mbuf **bufs,
> >
> >         /* Transmit burst on each active slave */
> >         for (i = 0; i < num_of_slaves; i++) {
> > -               slave_tx_total[i] = rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
> > +               queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
> > +               slave_tx_total[i] = rte_eth_tx_burst(slaves[i], queue_id,
> >                                         bufs, nb_pkts);
> >
> >                 if (unlikely(slave_tx_total[i] < nb_pkts))
> > @@ -1298,9 +1374,22 @@ int
> >  slave_configure(struct rte_eth_dev *bonded_eth_dev,
> >                 struct rte_eth_dev *slave_eth_dev)
> >  {
> > +       struct bond_dev_private *internals;
> >         struct bond_rx_queue *bd_rx_q;
> >         struct bond_tx_queue *bd_tx_q;
> > +       int slave_id;
> > +
> > +       internals = bonded_eth_dev->data->dev_private;
> >
> > +       for (slave_id = 0; slave_id < internals->slave_count; slave_id++)
> > +               if (internals->slaves[slave_id].port_id ==
> > +                   slave_eth_dev->data->port_id)
> > +                       break;
> > +
> > +       RTE_VERIFY(slave_id != internals->slave_count);
> > +
> > +       uint16_t nb_rx_queues = internals->slaves[slave_id].nb_rx_queues;
> > +       uint16_t nb_tx_queues = internals->slaves[slave_id].nb_tx_queues;
> >         int errval;
> >         uint16_t q_id;
> >
> > @@ -1331,8 +1420,7 @@ slave_configure(struct rte_eth_dev *bonded_eth_dev,
> >
> >         /* Configure device */
> >         errval = rte_eth_dev_configure(slave_eth_dev->data->port_id,
> > -                       bonded_eth_dev->data->nb_rx_queues,
> > -                       bonded_eth_dev->data->nb_tx_queues,
> > +                       nb_rx_queues, nb_tx_queues,
> >                         &(slave_eth_dev->data->dev_conf));
> >         if (errval != 0) {
> >                 RTE_BOND_LOG(ERR, "Cannot configure slave device: port %u , err (%d)",
> > @@ -1343,7 +1431,7 @@ slave_configure(struct rte_eth_dev *bonded_eth_dev,
> >         /* Setup Rx Queues */
> >         /* Use existing queues, if any */
> >         for (q_id = slave_eth_dev->data->nb_rx_queues;
> > -            q_id < bonded_eth_dev->data->nb_rx_queues; q_id++) {
> > +            q_id < nb_rx_queues ; q_id++) {
> >                 bd_rx_q = (struct bond_rx_queue *)bonded_eth_dev->data->rx_queues[q_id];
> >
> >                 errval = rte_eth_rx_queue_setup(slave_eth_dev->data->port_id, q_id,
> > @@ -1361,7 +1449,7 @@ slave_configure(struct rte_eth_dev *bonded_eth_dev,
> >         /* Setup Tx Queues */
> >         /* Use existing queues, if any */
> >         for (q_id = slave_eth_dev->data->nb_tx_queues;
> > -            q_id < bonded_eth_dev->data->nb_tx_queues; q_id++) {
> > +            q_id < nb_tx_queues ; q_id++) {
> >                 bd_tx_q = (struct bond_tx_queue *)bonded_eth_dev->data->tx_queues[q_id];
> >
> >                 errval = rte_eth_tx_queue_setup(slave_eth_dev->data->port_id, q_id,
> > @@ -1440,7 +1528,8 @@ bond_ethdev_slave_link_status_change_monitor(void *cb_arg);
> >
> >  void
> >  slave_add(struct bond_dev_private *internals,
> > -               struct rte_eth_dev *slave_eth_dev)
> > +               struct rte_eth_dev *slave_eth_dev,
> > +               const struct rte_eth_dev_info *slave_dev_info)
> >  {
> >         struct bond_slave_details *slave_details =
> >                         &internals->slaves[internals->slave_count];
> > @@ -1448,6 +1537,20 @@ slave_add(struct bond_dev_private *internals,
> >         slave_details->port_id = slave_eth_dev->data->port_id;
> >         slave_details->last_link_status = 0;
> >
> > +       uint16_t bond_nb_rx_queues =
> > +               rte_eth_devices[internals->port_id].data->nb_rx_queues;
> > +       uint16_t bond_nb_tx_queues =
> > +               rte_eth_devices[internals->port_id].data->nb_tx_queues;
> > +
> > +       slave_details->nb_rx_queues =
> > +               bond_nb_rx_queues > slave_dev_info->max_rx_queues
> > +               ? slave_dev_info->max_rx_queues
> > +               : bond_nb_rx_queues;
> > +       slave_details->nb_tx_queues =
> > +               bond_nb_tx_queues > slave_dev_info->max_tx_queues
> > +               ? slave_dev_info->max_tx_queues
> > +               : bond_nb_tx_queues;
> > +
> >         /* If slave device doesn't support interrupts then we need to enabled
> >          * polling to monitor link status */
> >         if (!(slave_eth_dev->data->dev_flags & RTE_PCI_DRV_INTR_LSC)) {
> > diff --git a/drivers/net/bonding/rte_eth_bond_private.h b/drivers/net/bonding/rte_eth_bond_private.h
> > index 6c47a29..02f6de1 100644
> > --- a/drivers/net/bonding/rte_eth_bond_private.h
> > +++ b/drivers/net/bonding/rte_eth_bond_private.h
> > @@ -101,6 +101,8 @@ struct bond_slave_details {
> >         uint8_t link_status_poll_enabled;
> >         uint8_t link_status_wait_to_complete;
> >         uint8_t last_link_status;
> > +       uint16_t nb_rx_queues;
> > +       uint16_t nb_tx_queues;
> >         /**< Port Id of slave eth_dev */
> >         struct ether_addr persisted_mac_addr;
> >
> > @@ -240,7 +242,8 @@ slave_remove(struct bond_dev_private *internals,
> >
> >  void
> >  slave_add(struct bond_dev_private *internals,
> > -               struct rte_eth_dev *slave_eth_dev);
> > +               struct rte_eth_dev *slave_eth_dev,
> > +               const struct rte_eth_dev_info *slave_dev_info);
> >
> >  uint16_t
> >  xmit_l2_hash(const struct rte_mbuf *buf, uint8_t slave_count);
> > --
> > 2.1.4
> >
> 
> 
> 
> -- 
> Andriy Berestovskyy
  
Doherty, Declan Jan. 5, 2016, 1:46 p.m. UTC | #3
On 04/12/15 19:18, Eric Kinzie wrote:
> On Fri Dec 04 19:36:09 +0100 2015, Andriy Berestovskyy wrote:
>> Hi guys,
>> I'm not quite sure if we can support less TX queues on a slave that easy:
>>
>>> queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
>>> num_tx_slave = rte_eth_tx_burst(slaves[i], queue_id,
>>>       slave_bufs[i], slave_nb_pkts[i]);
>>
>> It seems that two different lcores might end up writing to the same
>> slave queue at the same time, isn't it?
>>
>> Regards,
>> Andriy
>
> Andriy, I think you're probably right about this.  Perhaps it should
> instead refuse to add or refuse to activate a slave with too few
> tx queues.  Could probably fix this with another layer of buffering
> so that an lcore with a valid tx queue could pick up the mbufs later,
> but this doesn't seem very appealing.
>
> Eric
>
>
>> On Fri, Dec 4, 2015 at 6:14 PM, Stephen Hemminger
>> <stephen@networkplumber.org> wrote:
>>> From: Eric Kinzie <ekinzie@brocade.com>
>>>
>>> In the event that the bonding device has a greater number of tx and/or rx
>>> queues than the slave being added, track the queue limits of the slave.
>>> On receive, ignore queue identifiers beyond what the slave interface
>>> can support.  During transmit, pick a different queue id to use if the
>>> intended queue is not available on the slave.
>>>
>>> Signed-off-by: Eric Kinzie <ekinzie@brocade.com>
>>> Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
>>> ---
...


I don't there is any straight forward way of supporting slaves with 
different numbers of queues, the initial library was written with the 
assumption that the number of tx/rx queues would always be the same on 
each slave. This is why,when a slave is added to a bonded device we 
reconfigure the queues. For features like RSS we have to have the same 
number of rx queues otherwise the flow distribution to an application 
could change in the case of a fail over event. Also by supporting 
different numbers of queues between slaves we would be no longer be 
supporting the standard behavior of ethdevs in DPDK were we expect that 
by using different queues we don't require locking to be thread safe.
  
Stephen Hemminger Jan. 5, 2016, 3:31 p.m. UTC | #4
A common usage scenario is to bond a vnic like virtio which typically has
only a single rx queue with a VF device that has multiple receive queues.
This is done to do live migration
On Jan 5, 2016 05:47, "Declan Doherty" <declan.doherty@intel.com> wrote:

> On 04/12/15 19:18, Eric Kinzie wrote:
>
>> On Fri Dec 04 19:36:09 +0100 2015, Andriy Berestovskyy wrote:
>>
>>> Hi guys,
>>> I'm not quite sure if we can support less TX queues on a slave that easy:
>>>
>>> queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
>>>> num_tx_slave = rte_eth_tx_burst(slaves[i], queue_id,
>>>>       slave_bufs[i], slave_nb_pkts[i]);
>>>>
>>>
>>> It seems that two different lcores might end up writing to the same
>>> slave queue at the same time, isn't it?
>>>
>>> Regards,
>>> Andriy
>>>
>>
>> Andriy, I think you're probably right about this.  Perhaps it should
>> instead refuse to add or refuse to activate a slave with too few
>> tx queues.  Could probably fix this with another layer of buffering
>> so that an lcore with a valid tx queue could pick up the mbufs later,
>> but this doesn't seem very appealing.
>>
>> Eric
>>
>>
>> On Fri, Dec 4, 2015 at 6:14 PM, Stephen Hemminger
>>> <stephen@networkplumber.org> wrote:
>>>
>>>> From: Eric Kinzie <ekinzie@brocade.com>
>>>>
>>>> In the event that the bonding device has a greater number of tx and/or
>>>> rx
>>>> queues than the slave being added, track the queue limits of the slave.
>>>> On receive, ignore queue identifiers beyond what the slave interface
>>>> can support.  During transmit, pick a different queue id to use if the
>>>> intended queue is not available on the slave.
>>>>
>>>> Signed-off-by: Eric Kinzie <ekinzie@brocade.com>
>>>> Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
>>>> ---
>>>>
>>> ...
>
>
> I don't there is any straight forward way of supporting slaves with
> different numbers of queues, the initial library was written with the
> assumption that the number of tx/rx queues would always be the same on each
> slave. This is why,when a slave is added to a bonded device we reconfigure
> the queues. For features like RSS we have to have the same number of rx
> queues otherwise the flow distribution to an application could change in
> the case of a fail over event. Also by supporting different numbers of
> queues between slaves we would be no longer be supporting the standard
> behavior of ethdevs in DPDK were we expect that by using different queues
> we don't require locking to be thread safe.
>
>
>
  
Bruce Richardson Feb. 3, 2016, 11:28 a.m. UTC | #5
On Fri, Dec 04, 2015 at 02:18:34PM -0500, Eric Kinzie wrote:
> On Fri Dec 04 19:36:09 +0100 2015, Andriy Berestovskyy wrote:
> > Hi guys,
> > I'm not quite sure if we can support less TX queues on a slave that easy:
> > 
> > > queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
> > > num_tx_slave = rte_eth_tx_burst(slaves[i], queue_id,
> > >      slave_bufs[i], slave_nb_pkts[i]);
> > 
> > It seems that two different lcores might end up writing to the same
> > slave queue at the same time, isn't it?
> > 
> > Regards,
> > Andriy
> 
> Andriy, I think you're probably right about this.  Perhaps it should
> instead refuse to add or refuse to activate a slave with too few
> tx queues.  Could probably fix this with another layer of buffering
> so that an lcore with a valid tx queue could pick up the mbufs later,
> but this doesn't seem very appealing.
> 
> Eric
>
Hi Eric, Stephen, Declan,

all patches of the set apart from this one and the next (nos 6 & 7) have no
comments and have been acked. Is there a resolution on these two patches, so they
can be acked and merged?

Regards,
/Bruce
  
Doherty, Declan Feb. 3, 2016, 3:17 p.m. UTC | #6
On 03/02/16 11:28, Bruce Richardson wrote:
> On Fri, Dec 04, 2015 at 02:18:34PM -0500, Eric Kinzie wrote:
>> On Fri Dec 04 19:36:09 +0100 2015, Andriy Berestovskyy wrote:
>>> Hi guys,
>>> I'm not quite sure if we can support less TX queues on a slave that easy:
>>>
>>>> queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
>>>> num_tx_slave = rte_eth_tx_burst(slaves[i], queue_id,
>>>>       slave_bufs[i], slave_nb_pkts[i]);
>>>
>>> It seems that two different lcores might end up writing to the same
>>> slave queue at the same time, isn't it?
>>>
>>> Regards,
>>> Andriy
>>
>> Andriy, I think you're probably right about this.  Perhaps it should
>> instead refuse to add or refuse to activate a slave with too few
>> tx queues.  Could probably fix this with another layer of buffering
>> so that an lcore with a valid tx queue could pick up the mbufs later,
>> but this doesn't seem very appealing.
>>
>> Eric
>>
> Hi Eric, Stephen, Declan,
>
> all patches of the set apart from this one and the next (nos 6 & 7) have no
> comments and have been acked. Is there a resolution on these two patches, so they
> can be acked and merged?
>
> Regards,
> /Bruce
>


Hey Bruce, Eric, Stephen, sorry about leaving this patchset hanging 
around. Can you apply patches 1-5 & patch 8 in this patch set. I've 
reviewed and acked all of those patches and I believe they are good tof 
go. I need to give further feedback on patches 6 and 7, as I would like 
to avoid bring further rte_ring buffering into the bonded device if 
possible and I think this should be possible but I haven't had time to 
prototype any alternatives but that shouldn't stop the other patches 
being applied.

Thanks
Declan
  
Thomas Monjalon Feb. 3, 2016, 3:21 p.m. UTC | #7
2016-02-03 15:17, Declan Doherty:
> On 03/02/16 11:28, Bruce Richardson wrote:
> > Hi Eric, Stephen, Declan,
> >
> > all patches of the set apart from this one and the next (nos 6 & 7) have no
> > comments and have been acked. Is there a resolution on these two patches, so they
> > can be acked and merged?
> 
> Hey Bruce, Eric, Stephen, sorry about leaving this patchset hanging 
> around. Can you apply patches 1-5 & patch 8 in this patch set. I've 
> reviewed and acked all of those patches and I believe they are good tof 
> go. I need to give further feedback on patches 6 and 7, as I would like 
> to avoid bring further rte_ring buffering into the bonded device if 
> possible and I think this should be possible but I haven't had time to 
> prototype any alternatives but that shouldn't stop the other patches 
> being applied.

Picking some patches in a series makes tracking confusing.
The better solution is to re-send the series with only the desired patches.
When re-sending, do not forget to embed the acks from the previous version, thanks.
  
Iremonger, Bernard Feb. 18, 2016, 10:26 a.m. UTC | #8
Hi Stephen,

> -----Original Message-----
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Thomas Monjalon
> Sent: Wednesday, February 3, 2016 3:21 PM
> To: Doherty, Declan <declan.doherty@intel.com>
> Cc: dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH 6/8] bond: handle slaves with fewer queues
> than bonding device
> 
> 2016-02-03 15:17, Declan Doherty:
> > On 03/02/16 11:28, Bruce Richardson wrote:
> > > Hi Eric, Stephen, Declan,
> > >
> > > all patches of the set apart from this one and the next (nos 6 & 7)
> > > have no comments and have been acked. Is there a resolution on these
> > > two patches, so they can be acked and merged?
> >
> > Hey Bruce, Eric, Stephen, sorry about leaving this patchset hanging
> > around. Can you apply patches 1-5 & patch 8 in this patch set. I've
> > reviewed and acked all of those patches and I believe they are good
> > tof go. I need to give further feedback on patches 6 and 7, as I would
> > like to avoid bring further rte_ring buffering into the bonded device
> > if possible and I think this should be possible but I haven't had time
> > to prototype any alternatives but that shouldn't stop the other
> > patches being applied.
> 
> Picking some patches in a series makes tracking confusing.
> The better solution is to re-send the series with only the desired patches.
> When re-sending, do not forget to embed the acks from the previous
> version, thanks.

Could you send a V2 of this patch set without patches 6 and 7.
The other 6 patches have been acked  already.

Regards,

Bernard.
  
Eric Kinzie Feb. 19, 2016, 7:17 p.m. UTC | #9
These are bug fixes and some small enhancements to allow bonding
to work with external control (teamd). Please consider integrating
these into DPDK 2.2

Changes in v2:
- remove "bond: handle slaves with fewer queues than bonding device"
- remove "bond: per-slave intermediate rx ring"

Eric Kinzie (6):
  bond: use existing enslaved device queues
  bond mode 4: copy entire config structure
  bond mode 4: do not ignore multicast
  bond mode 4: allow external state machine
  bond: active slaves with no primary
  bond: do not activate slave twice

 app/test/test_link_bonding_mode4.c                |    7 +-
 drivers/net/bonding/rte_eth_bond_8023ad.c         |  174 +++++++++++++++++++++
 drivers/net/bonding/rte_eth_bond_8023ad.h         |   44 ++++++
 drivers/net/bonding/rte_eth_bond_8023ad_private.h |    2 +
 drivers/net/bonding/rte_eth_bond_api.c            |   13 +-
 drivers/net/bonding/rte_eth_bond_pmd.c            |    9 +-
 drivers/net/bonding/rte_eth_bond_version.map      |    6 +
 7 files changed, 249 insertions(+), 6 deletions(-)
  

Patch

diff --git a/drivers/net/bonding/rte_eth_bond_api.c b/drivers/net/bonding/rte_eth_bond_api.c
index 630a461..64058ff 100644
--- a/drivers/net/bonding/rte_eth_bond_api.c
+++ b/drivers/net/bonding/rte_eth_bond_api.c
@@ -340,11 +340,11 @@  __eth_bond_slave_add_lock_free(uint8_t bonded_port_id, uint8_t slave_port_id)
 
 	slave_eth_dev = &rte_eth_devices[slave_port_id];
 
-	/* Add slave details to bonded device */
-	slave_add(internals, slave_eth_dev);
-
 	rte_eth_dev_info_get(slave_port_id, &dev_info);
 
+	/* Add slave details to bonded device */
+	slave_add(internals, slave_eth_dev, &dev_info);
+
 	/* We need to store slaves reta_size to be able to synchronize RETA for all
 	 * slave devices even if its sizes are different.
 	 */
diff --git a/drivers/net/bonding/rte_eth_bond_pmd.c b/drivers/net/bonding/rte_eth_bond_pmd.c
index 77582dd..868e66b 100644
--- a/drivers/net/bonding/rte_eth_bond_pmd.c
+++ b/drivers/net/bonding/rte_eth_bond_pmd.c
@@ -76,6 +76,47 @@  get_vlan_offset(struct ether_hdr *eth_hdr, uint16_t *proto)
 	return vlan_offset;
 }
 
+static uint8_t
+bond_active_slaves_by_rxqid(struct bond_dev_private *internals, int queue_id,
+		uint8_t slaves[])
+{
+	struct bond_slave_details *slave_details;
+	uint8_t num_of_slaves;
+	uint8_t i = 0;
+
+	num_of_slaves = internals->active_slave_count;
+	memcpy(slaves, internals->active_slaves,
+			sizeof(internals->active_slaves[0]) * num_of_slaves);
+
+	if (num_of_slaves < 1 || internals->kvlist)
+		return num_of_slaves;
+
+	/* remove slaves that don't have a queue numbered "queue_id" */
+	while (i < num_of_slaves) {
+		slave_details = &internals->slaves[i];
+		if (unlikely(queue_id >= slave_details->nb_rx_queues)) {
+			slaves[i] = slaves[num_of_slaves-1];
+			num_of_slaves--;
+		} else
+			i++;
+	}
+
+	return num_of_slaves;
+}
+
+static int
+bond_slave_txqid(struct bond_dev_private *internals, uint8_t slave_id,
+		int queue_id)
+{
+	struct bond_slave_details *slave_details;
+
+	if (internals->kvlist)
+		return queue_id;
+
+	slave_details = &internals->slaves[slave_id];
+	return queue_id % slave_details->nb_tx_queues;
+}
+
 static uint16_t
 bond_ethdev_rx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 {
@@ -83,6 +124,8 @@  bond_ethdev_rx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 
 	uint16_t num_rx_slave = 0;
 	uint16_t num_rx_total = 0;
+	uint8_t slaves[RTE_MAX_ETHPORTS];
+	uint8_t num_of_slaves;
 
 	int i;
 
@@ -91,11 +134,13 @@  bond_ethdev_rx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 
 	internals = bd_rx_q->dev_private;
 
+	num_of_slaves = bond_active_slaves_by_rxqid(internals, bd_rx_q->queue_id,
+						    slaves);
 
-	for (i = 0; i < internals->active_slave_count && nb_pkts; i++) {
+	for (i = 0; i < num_of_slaves && nb_pkts; i++) {
 		/* Offset of pointer to *bufs increases as packets are received
 		 * from other slaves */
-		num_rx_slave = rte_eth_rx_burst(internals->active_slaves[i],
+		num_rx_slave = rte_eth_rx_burst(slaves[i],
 				bd_rx_q->queue_id, bufs + num_rx_total, nb_pkts);
 		if (num_rx_slave) {
 			num_rx_total += num_rx_slave;
@@ -117,8 +162,13 @@  bond_ethdev_rx_burst_active_backup(void *queue, struct rte_mbuf **bufs,
 
 	internals = bd_rx_q->dev_private;
 
-	return rte_eth_rx_burst(internals->current_primary_port,
-			bd_rx_q->queue_id, bufs, nb_pkts);
+	uint8_t active_slave = internals->current_primary_port;
+	struct rte_eth_dev *dev = &rte_eth_devices[active_slave];
+
+	if (bd_rx_q->queue_id >= dev->data->nb_rx_queues)
+		return 0;
+
+	return rte_eth_rx_burst(active_slave, bd_rx_q->queue_id, bufs, nb_pkts);
 }
 
 static uint16_t
@@ -144,9 +194,9 @@  bond_ethdev_rx_burst_8023ad(void *queue, struct rte_mbuf **bufs,
 	rte_eth_macaddr_get(internals->port_id, &bond_mac);
 	/* Copy slave list to protect against slave up/down changes during tx
 	 * bursting */
-	slave_count = internals->active_slave_count;
-	memcpy(slaves, internals->active_slaves,
-			sizeof(internals->active_slaves[0]) * slave_count);
+
+	slave_count = bond_active_slaves_by_rxqid(internals, bd_rx_q->queue_id,
+						  slaves);
 
 	for (i = 0; i < slave_count && num_rx_total < nb_pkts; i++) {
 		j = num_rx_total;
@@ -401,6 +451,7 @@  bond_ethdev_tx_burst_round_robin(void *queue, struct rte_mbuf **bufs,
 
 	static int slave_idx = 0;
 	int i, cslave_idx = 0, tx_fail_total = 0;
+	int queue_id;
 
 	bd_tx_q = (struct bond_tx_queue *)queue;
 	internals = bd_tx_q->dev_private;
@@ -427,7 +478,9 @@  bond_ethdev_tx_burst_round_robin(void *queue, struct rte_mbuf **bufs,
 	/* Send packet burst on each slave device */
 	for (i = 0; i < num_of_slaves; i++) {
 		if (slave_nb_pkts[i] > 0) {
-			num_tx_slave = rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
+			queue_id = bond_slave_txqid(internals, i,
+						    bd_tx_q->queue_id);
+			num_tx_slave = rte_eth_tx_burst(slaves[i], queue_id,
 					slave_bufs[i], slave_nb_pkts[i]);
 
 			/* if tx burst fails move packets to end of bufs */
@@ -453,14 +506,27 @@  bond_ethdev_tx_burst_active_backup(void *queue,
 {
 	struct bond_dev_private *internals;
 	struct bond_tx_queue *bd_tx_q;
+	int queue_id;
+	int i;
+	uint8_t num_of_slaves;
+	uint8_t slaves[RTE_MAX_ETHPORTS];
 
 	bd_tx_q = (struct bond_tx_queue *)queue;
 	internals = bd_tx_q->dev_private;
 
-	if (internals->active_slave_count < 1)
+	num_of_slaves = internals->active_slave_count;
+	memcpy(slaves, internals->active_slaves,
+			sizeof(internals->active_slaves[0]) * num_of_slaves);
+
+	if (num_of_slaves < 1)
 		return 0;
 
-	return rte_eth_tx_burst(internals->current_primary_port, bd_tx_q->queue_id,
+	for (i = 0; i < num_of_slaves; i++)
+		if (slaves[i] == internals->current_primary_port)
+			break;
+
+	queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
+	return rte_eth_tx_burst(internals->current_primary_port, queue_id,
 			bufs, nb_pkts);
 }
 
@@ -696,6 +762,7 @@  bond_ethdev_tx_burst_tlb(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 	struct ether_hdr *ether_hdr;
 	struct ether_addr primary_slave_addr;
 	struct ether_addr active_slave_addr;
+	int queue_id;
 
 	if (num_of_slaves < 1)
 		return num_tx_total;
@@ -725,7 +792,8 @@  bond_ethdev_tx_burst_tlb(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 #endif
 		}
 
-		num_tx_total += rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
+		queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
+		num_tx_total += rte_eth_tx_burst(slaves[i], queue_id,
 				bufs + num_tx_total, nb_pkts - num_tx_total);
 
 		if (num_tx_total == nb_pkts)
@@ -903,6 +971,7 @@  bond_ethdev_tx_burst_balance(void *queue, struct rte_mbuf **bufs,
 	uint16_t num_tx_total = 0, num_tx_slave = 0, tx_fail_total = 0;
 
 	int i, op_slave_id;
+	int queue_id;
 
 	struct rte_mbuf *slave_bufs[RTE_MAX_ETHPORTS][nb_pkts];
 	uint16_t slave_nb_pkts[RTE_MAX_ETHPORTS] = { 0 };
@@ -931,7 +1000,9 @@  bond_ethdev_tx_burst_balance(void *queue, struct rte_mbuf **bufs,
 	/* Send packet burst on each slave device */
 	for (i = 0; i < num_of_slaves; i++) {
 		if (slave_nb_pkts[i] > 0) {
-			num_tx_slave = rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
+			queue_id = bond_slave_txqid(internals, i,
+						    bd_tx_q->queue_id);
+			num_tx_slave = rte_eth_tx_burst(slaves[i], queue_id,
 					slave_bufs[i], slave_nb_pkts[i]);
 
 			/* if tx burst fails move packets to end of bufs */
@@ -977,6 +1048,8 @@  bond_ethdev_tx_burst_8023ad(void *queue, struct rte_mbuf **bufs,
 	/* Slow packets placed in each slave */
 	uint8_t slave_slow_nb_pkts[RTE_MAX_ETHPORTS] = { 0 };
 
+	int queue_id;
+
 	bd_tx_q = (struct bond_tx_queue *)queue;
 	internals = bd_tx_q->dev_private;
 
@@ -1022,7 +1095,8 @@  bond_ethdev_tx_burst_8023ad(void *queue, struct rte_mbuf **bufs,
 		if (slave_nb_pkts[i] == 0)
 			continue;
 
-		num_tx_slave = rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
+		queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
+		num_tx_slave = rte_eth_tx_burst(slaves[i], queue_id,
 				slave_bufs[i], slave_nb_pkts[i]);
 
 		/* If tx burst fails drop slow packets */
@@ -1057,6 +1131,7 @@  bond_ethdev_tx_burst_broadcast(void *queue, struct rte_mbuf **bufs,
 
 	int slave_tx_total[RTE_MAX_ETHPORTS];
 	int i, most_successful_tx_slave = -1;
+	int queue_id;
 
 	bd_tx_q = (struct bond_tx_queue *)queue;
 	internals = bd_tx_q->dev_private;
@@ -1076,7 +1151,8 @@  bond_ethdev_tx_burst_broadcast(void *queue, struct rte_mbuf **bufs,
 
 	/* Transmit burst on each active slave */
 	for (i = 0; i < num_of_slaves; i++) {
-		slave_tx_total[i] = rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
+		queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
+		slave_tx_total[i] = rte_eth_tx_burst(slaves[i], queue_id,
 					bufs, nb_pkts);
 
 		if (unlikely(slave_tx_total[i] < nb_pkts))
@@ -1298,9 +1374,22 @@  int
 slave_configure(struct rte_eth_dev *bonded_eth_dev,
 		struct rte_eth_dev *slave_eth_dev)
 {
+	struct bond_dev_private *internals;
 	struct bond_rx_queue *bd_rx_q;
 	struct bond_tx_queue *bd_tx_q;
+	int slave_id;
+
+	internals = bonded_eth_dev->data->dev_private;
 
+	for (slave_id = 0; slave_id < internals->slave_count; slave_id++)
+		if (internals->slaves[slave_id].port_id ==
+		    slave_eth_dev->data->port_id)
+			break;
+
+	RTE_VERIFY(slave_id != internals->slave_count);
+
+	uint16_t nb_rx_queues = internals->slaves[slave_id].nb_rx_queues;
+	uint16_t nb_tx_queues = internals->slaves[slave_id].nb_tx_queues;
 	int errval;
 	uint16_t q_id;
 
@@ -1331,8 +1420,7 @@  slave_configure(struct rte_eth_dev *bonded_eth_dev,
 
 	/* Configure device */
 	errval = rte_eth_dev_configure(slave_eth_dev->data->port_id,
-			bonded_eth_dev->data->nb_rx_queues,
-			bonded_eth_dev->data->nb_tx_queues,
+			nb_rx_queues, nb_tx_queues,
 			&(slave_eth_dev->data->dev_conf));
 	if (errval != 0) {
 		RTE_BOND_LOG(ERR, "Cannot configure slave device: port %u , err (%d)",
@@ -1343,7 +1431,7 @@  slave_configure(struct rte_eth_dev *bonded_eth_dev,
 	/* Setup Rx Queues */
 	/* Use existing queues, if any */
 	for (q_id = slave_eth_dev->data->nb_rx_queues;
-	     q_id < bonded_eth_dev->data->nb_rx_queues; q_id++) {
+	     q_id < nb_rx_queues ; q_id++) {
 		bd_rx_q = (struct bond_rx_queue *)bonded_eth_dev->data->rx_queues[q_id];
 
 		errval = rte_eth_rx_queue_setup(slave_eth_dev->data->port_id, q_id,
@@ -1361,7 +1449,7 @@  slave_configure(struct rte_eth_dev *bonded_eth_dev,
 	/* Setup Tx Queues */
 	/* Use existing queues, if any */
 	for (q_id = slave_eth_dev->data->nb_tx_queues;
-	     q_id < bonded_eth_dev->data->nb_tx_queues; q_id++) {
+	     q_id < nb_tx_queues ; q_id++) {
 		bd_tx_q = (struct bond_tx_queue *)bonded_eth_dev->data->tx_queues[q_id];
 
 		errval = rte_eth_tx_queue_setup(slave_eth_dev->data->port_id, q_id,
@@ -1440,7 +1528,8 @@  bond_ethdev_slave_link_status_change_monitor(void *cb_arg);
 
 void
 slave_add(struct bond_dev_private *internals,
-		struct rte_eth_dev *slave_eth_dev)
+		struct rte_eth_dev *slave_eth_dev,
+		const struct rte_eth_dev_info *slave_dev_info)
 {
 	struct bond_slave_details *slave_details =
 			&internals->slaves[internals->slave_count];
@@ -1448,6 +1537,20 @@  slave_add(struct bond_dev_private *internals,
 	slave_details->port_id = slave_eth_dev->data->port_id;
 	slave_details->last_link_status = 0;
 
+	uint16_t bond_nb_rx_queues =
+		rte_eth_devices[internals->port_id].data->nb_rx_queues;
+	uint16_t bond_nb_tx_queues =
+		rte_eth_devices[internals->port_id].data->nb_tx_queues;
+
+	slave_details->nb_rx_queues =
+		bond_nb_rx_queues > slave_dev_info->max_rx_queues
+		? slave_dev_info->max_rx_queues
+		: bond_nb_rx_queues;
+	slave_details->nb_tx_queues =
+		bond_nb_tx_queues > slave_dev_info->max_tx_queues
+		? slave_dev_info->max_tx_queues
+		: bond_nb_tx_queues;
+
 	/* If slave device doesn't support interrupts then we need to enabled
 	 * polling to monitor link status */
 	if (!(slave_eth_dev->data->dev_flags & RTE_PCI_DRV_INTR_LSC)) {
diff --git a/drivers/net/bonding/rte_eth_bond_private.h b/drivers/net/bonding/rte_eth_bond_private.h
index 6c47a29..02f6de1 100644
--- a/drivers/net/bonding/rte_eth_bond_private.h
+++ b/drivers/net/bonding/rte_eth_bond_private.h
@@ -101,6 +101,8 @@  struct bond_slave_details {
 	uint8_t link_status_poll_enabled;
 	uint8_t link_status_wait_to_complete;
 	uint8_t last_link_status;
+	uint16_t nb_rx_queues;
+	uint16_t nb_tx_queues;
 	/**< Port Id of slave eth_dev */
 	struct ether_addr persisted_mac_addr;
 
@@ -240,7 +242,8 @@  slave_remove(struct bond_dev_private *internals,
 
 void
 slave_add(struct bond_dev_private *internals,
-		struct rte_eth_dev *slave_eth_dev);
+		struct rte_eth_dev *slave_eth_dev,
+		const struct rte_eth_dev_info *slave_dev_info);
 
 uint16_t
 xmit_l2_hash(const struct rte_mbuf *buf, uint8_t slave_count);