[dpdk-dev,1/2] ethdev: add buffered tx api

Message ID 1452869038-9140-2-git-send-email-tomaszx.kulasek@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers

Commit Message

Tomasz Kulasek Jan. 15, 2016, 2:43 p.m. UTC
  Many sample apps include internal buffering for single-packet-at-a-time
operation. Since this is such a common paradigm, this functionality is
better suited to being inside the core ethdev API.
The new APIs in the ethdev library are:
* rte_eth_tx_buffer - buffer up a single packet for future transmission
* rte_eth_tx_buffer_flush - flush any unsent buffered packets
* rte_eth_tx_buffer_set_err_callback - set up a callback to be called in
  case transmitting a buffered burst fails. By default, we just free the
  unsent packets.

As well as these, an additional reference callback is provided, which
frees the packets (as the default callback does), as well as updating a
user-provided counter, so that the number of dropped packets can be
tracked.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
Signed-off-by: Tomasz Kulasek <tomaszx.kulasek@intel.com>
---
 config/common_bsdapp                   |    1 +
 config/common_linuxapp                 |    1 +
 lib/librte_ether/rte_ethdev.c          |   63 +++++++++-
 lib/librte_ether/rte_ethdev.h          |  211 +++++++++++++++++++++++++++++++-
 lib/librte_ether/rte_ether_version.map |    8 ++
 5 files changed, 279 insertions(+), 5 deletions(-)
  

Comments

Stephen Hemminger Jan. 15, 2016, 6:13 p.m. UTC | #1
On Fri, 15 Jan 2016 15:43:57 +0100
Tomasz Kulasek <tomaszx.kulasek@intel.com> wrote:

>  static int
>  rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
>  {
>  	uint16_t old_nb_queues = dev->data->nb_tx_queues;
>  	void **txq;
> +	struct rte_eth_dev_tx_buffer *new_bufs;
>  	unsigned i;
>  
>  	if (dev->data->tx_queues == NULL) { /* first time configuration */
> @@ -841,17 +872,40 @@ rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
>  			dev->data->nb_tx_queues = 0;
>  			return -(ENOMEM);
>  		}
> +
> +		dev->data->txq_bufs = rte_zmalloc("ethdev->txq_bufs",
> +				sizeof(*dev->data->txq_bufs) * nb_queues, 0);

You should use zmalloc_socket and put the buffering on the same numa
node as the device?
  
Stephen Hemminger Jan. 15, 2016, 6:14 p.m. UTC | #2
On Fri, 15 Jan 2016 15:43:57 +0100
Tomasz Kulasek <tomaszx.kulasek@intel.com> wrote:

> +			return -(ENOMEM);

Please don't put () around args to return, it is a BSD stylism
  
Ananyev, Konstantin Jan. 15, 2016, 6:44 p.m. UTC | #3
Hi Tomasz,

>  static int
>  rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
>  {
>  	uint16_t old_nb_queues = dev->data->nb_tx_queues;
>  	void **txq;
> +	struct rte_eth_dev_tx_buffer *new_bufs;
>  	unsigned i;
> 
>  	if (dev->data->tx_queues == NULL) { /* first time configuration */
> @@ -841,17 +872,40 @@ rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
>  			dev->data->nb_tx_queues = 0;
>  			return -(ENOMEM);
>  		}
> +
> +		dev->data->txq_bufs = rte_zmalloc("ethdev->txq_bufs",
> +				sizeof(*dev->data->txq_bufs) * nb_queues, 0);
> +		if (dev->data->txq_bufs == NULL) {
> +			dev->data->nb_tx_queues = 0;
> +			rte_free(dev->data->tx_queues);
> +			return -(ENOMEM);
> +		}
> +
>  	} else { /* re-configure */
> +
> +		/* flush the packets queued for all queues*/
> +		for (i = 0; i < old_nb_queues; i++)
> +			rte_eth_tx_buffer_flush(dev->data->port_id, i);
> +

I don't think it is safe to call tx_burst() at queue config stage.
Instead you need to flush (or just empty) your txq)bufs at tx_queue_stop stage.

>  		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_release, -ENOTSUP);
> 
> +		/* get new buffer space first, but keep old space around */
> +		new_bufs = rte_zmalloc("ethdev->txq_bufs",
> +				sizeof(*dev->data->txq_bufs) * nb_queues, 0);
> +		if (new_bufs == NULL)
> +			return -(ENOMEM);
> +


Why not to allocate space for txq_bufs together with tx_queues (as one chunk for both)?
As I understand there is always one to one mapping between them anyway.
Would simplify things a bit.
Or even introduce a new struct to group with all related tx queue info togetehr
struct rte_eth_txq_data {
	void *queue; /*actual pmd  queue*/
	struct rte_eth_dev_tx_buffer buf;
	uint8_t state;
}
And use it inside struct rte_eth_dev_data?
Would probably give a better data locality.

>  		txq = dev->data->tx_queues;
> 
>  		for (i = nb_queues; i < old_nb_queues; i++)
>  			(*dev->dev_ops->tx_queue_release)(txq[i]);
>  		txq = rte_realloc(txq, sizeof(txq[0]) * nb_queues,
>  				  RTE_CACHE_LINE_SIZE);
> -		if (txq == NULL)
> -			return -ENOMEM;
> +		if (txq == NULL) {
> +			rte_free(new_bufs);
> +			return -(ENOMEM);
> +		}
> +
>  		if (nb_queues > old_nb_queues) {
>  			uint16_t new_qs = nb_queues - old_nb_queues;
> 
> @@ -861,6 +915,9 @@ rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
> 
>  		dev->data->tx_queues = txq;
> 
> +		/* now replace old buffers with new */
> +		rte_free(dev->data->txq_bufs);
> +		dev->data->txq_bufs = new_bufs;
>  	}
>  	dev->data->nb_tx_queues = nb_queues;
>  	return 0;
> diff --git a/lib/librte_ether/rte_ethdev.h b/lib/librte_ether/rte_ethdev.h
> index bada8ad..23faa6a 100644
> --- a/lib/librte_ether/rte_ethdev.h
> +++ b/lib/librte_ether/rte_ethdev.h
> @@ -1,7 +1,7 @@
>  /*-
>   *   BSD LICENSE
>   *
> - *   Copyright(c) 2010-2015 Intel Corporation. All rights reserved.
> + *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
>   *   All rights reserved.
>   *
>   *   Redistribution and use in source and binary forms, with or without
> @@ -182,6 +182,7 @@ extern "C" {
>  #include <rte_pci.h>
>  #include <rte_dev.h>
>  #include <rte_devargs.h>
> +#include <rte_branch_prediction.h>
>  #include "rte_ether.h"
>  #include "rte_eth_ctrl.h"
>  #include "rte_dev_info.h"
> @@ -1519,6 +1520,34 @@ enum rte_eth_dev_type {
>  	RTE_ETH_DEV_MAX		/**< max value of this enum */
>  };
> 
> +typedef void (*buffer_tx_error_fn)(struct rte_mbuf **unsent, uint16_t count,
> +		void *userdata);
> +
> +/**
> + * @internal
> + * Structure used to buffer packets for future TX
> + * Used by APIs rte_eth_tx_buffer and rte_eth_tx_buffer_flush
> + */
> +struct rte_eth_dev_tx_buffer {
> +	struct rte_mbuf *pkts[RTE_ETHDEV_TX_BUFSIZE];

I think it is better to make size of pkts[] configurable at runtime.
There are a lot of different usage scenarios - hard to predict what would be an
optimal buffer size for all cases.  

> +	unsigned nb_pkts;
> +	uint64_t errors;
> +	/**< Total number of queue packets to sent that are dropped. */
> +};
> +
> +/**
> + * @internal
> + * Structure to hold a callback to be used on error when a tx_buffer_flush
> + * call fails to send all packets.
> + * This needs to be a separate structure, as it must go in the ethdev structure
> + * rather than ethdev_data, due to the use of a function pointer, which is not
> + * multi-process safe.
> + */
> +struct rte_eth_dev_tx_buffer_err_cb {
> +	buffer_tx_error_fn flush_cb; /* callback for when tx_burst fails */
> +	void *userdata;              /* userdata for callback */
> +};
> +
>  /**
>   * @internal
>   * The generic data structure associated with each ethernet device.
> @@ -1550,6 +1579,9 @@ struct rte_eth_dev {
>  	struct rte_eth_rxtx_callback *pre_tx_burst_cbs[RTE_MAX_QUEUES_PER_PORT];
>  	uint8_t attached; /**< Flag indicating the port is attached */
>  	enum rte_eth_dev_type dev_type; /**< Flag indicating the device type */
> +
> +	/** Callbacks to be used on a tx_buffer_flush error */
> +	struct rte_eth_dev_tx_buffer_err_cb tx_buf_err_cb[RTE_MAX_QUEUES_PER_PORT];
>  };
> 
>  struct rte_eth_dev_sriov {
> @@ -1610,6 +1642,8 @@ struct rte_eth_dev_data {
>  	enum rte_kernel_driver kdrv;    /**< Kernel driver passthrough */
>  	int numa_node;  /**< NUMA node connection */
>  	const char *drv_name;   /**< Driver name */
> +	struct rte_eth_dev_tx_buffer *txq_bufs;
> +	/**< space to allow buffered transmits */
>  };
> 
>  /** Device supports hotplug detach */
> @@ -2661,8 +2695,181 @@ rte_eth_tx_burst(uint8_t port_id, uint16_t queue_id,
>  }
> 
>  /**
> - * The eth device event type for interrupt, and maybe others in the future.
> + * Buffer a single packet for future transmission on a port and queue
> + *
> + * This function takes a single mbuf/packet and buffers it for later
> + * transmission on the particular port and queue specified. Once the buffer is
> + * full of packets, an attempt will be made to transmit all the buffered
> + * packets. In case of error, where not all packets can be transmitted, a
> + * callback is called with the unsent packets as a parameter. If no callback
> + * is explicitly set up, the unsent packets are just freed back to the owning
> + * mempool. The function returns the number of packets actually sent i.e.
> + * 0 if no buffer flush occurred, otherwise the number of packets successfully
> + * flushed
> + *
> + * @param port_id
> + *   The port identifier of the Ethernet device.
> + * @param queue_id
> + *   The index of the transmit queue through which output packets must be
> + *   sent.
> + *   The value must be in the range [0, nb_tx_queue - 1] previously supplied
> + *   to rte_eth_dev_configure().
> + * @param tx_pkt
> + *   Pointer to the packet mbuf to be sent.
> + * @return
> + *   0 = packet has been buffered for later transmission
> + *   N > 0 = packet has been buffered, and the buffer was subsequently flushed,
> + *     causing N packets to be sent, and the error callback to be called for
> + *     the rest.
> + */
> +static inline uint16_t __attribute__((always_inline))
> +rte_eth_tx_buffer(uint8_t port_id, uint16_t queue_id, struct rte_mbuf *tx_pkt)
> +{
> +	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
> +	struct rte_eth_dev_tx_buffer *qbuf = &dev->data->txq_bufs[queue_id];
> +	uint16_t i;
> +
> +	qbuf->pkts[qbuf->nb_pkts++] = tx_pkt;
> +	if (qbuf->nb_pkts < RTE_ETHDEV_TX_BUFSIZE)
> +		return 0;
> +

Probably just call rte_eth_tx_buffer_flush() here to avoid duplication.

> +	const uint16_t sent = rte_eth_tx_burst(port_id, queue_id, qbuf->pkts,
> +			RTE_ETHDEV_TX_BUFSIZE);
> +
> +	qbuf->nb_pkts = 0;
> +
> +	/* All packets sent, or to be dealt with by callback below */
> +	if (unlikely(sent != RTE_ETHDEV_TX_BUFSIZE)) {
> +		if (dev->tx_buf_err_cb[queue_id].flush_cb)
> +			dev->tx_buf_err_cb[queue_id].flush_cb(&qbuf->pkts[sent],
> +					RTE_ETHDEV_TX_BUFSIZE - sent,
> +					dev->tx_buf_err_cb[queue_id].userdata);
> +		else {
> +			qbuf->errors += RTE_ETHDEV_TX_BUFSIZE - sent;
> +			for (i = sent; i < RTE_ETHDEV_TX_BUFSIZE; i++)
> +				rte_pktmbuf_free(qbuf->pkts[i]);
> +		}
> +	}
> +
> +	return sent;
> +}
> +
> +/**
> + * Send any packets queued up for transmission on a port and HW queue
> + *
> + * This causes an explicit flush of packets previously buffered via the
> + * rte_eth_tx_buffer() function. It returns the number of packets successfully
> + * sent to the NIC, and calls the error callback for any unsent packets. Unless
> + * explicitly set up otherwise, the default callback simply frees the unsent
> + * packets back to the owning mempool.
> + *
> + * @param port_id
> + *   The port identifier of the Ethernet device.
> + * @param queue_id
> + *   The index of the transmit queue through which output packets must be
> + *   sent.
> + *   The value must be in the range [0, nb_tx_queue - 1] previously supplied
> + *   to rte_eth_dev_configure().
> + * @return
> + *   The number of packets successfully sent to the Ethernet device. The error
> + *   callback is called for any packets which could not be sent.
> + */
> +static inline uint16_t
> +rte_eth_tx_buffer_flush(uint8_t port_id, uint16_t queue_id)
> +{
> +	uint16_t i;
> +	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
> +	struct rte_eth_dev_tx_buffer *qbuf = &dev->data->txq_bufs[queue_id];
> +
> +	if (qbuf->nb_pkts == 0)
> +		return 0;
> +
> +	const uint16_t to_send = qbuf->nb_pkts;
> +
> +	const uint16_t sent = rte_eth_tx_burst(port_id, queue_id, qbuf->pkts,
> +			to_send);

Try to avoid defining variables in the middle of the code block.
Again no much value in having these 2 above variables as 'const'.
Konstantin

> +
> +	qbuf->nb_pkts = 0;
> +
> +	/* All packets sent, or to be dealt with by callback below */
> +	if (unlikely(sent != to_send)) {
> +		if (dev->tx_buf_err_cb[queue_id].flush_cb)
> +			dev->tx_buf_err_cb[queue_id].flush_cb(&qbuf->pkts[sent],
> +					to_send - sent,
> +					dev->tx_buf_err_cb[queue_id].userdata);
> +		else {
> +			qbuf->errors += to_send - sent;
> +			for (i = sent; i < to_send; i++)
> +				rte_pktmbuf_free(qbuf->pkts[i]);
> +		}
> +	}
> +
> +	return sent;
> +}
> +
  
Tomasz Kulasek Feb. 2, 2016, 10 a.m. UTC | #4
Hi Konstantin,

> -----Original Message-----
> From: Ananyev, Konstantin
> Sent: Friday, January 15, 2016 19:45
> To: Kulasek, TomaszX; dev@dpdk.org
> Subject: RE: [dpdk-dev] [PATCH 1/2] ethdev: add buffered tx api
> 
> Hi Tomasz,
> 
> >
> > +		/* get new buffer space first, but keep old space around */
> > +		new_bufs = rte_zmalloc("ethdev->txq_bufs",
> > +				sizeof(*dev->data->txq_bufs) * nb_queues, 0);
> > +		if (new_bufs == NULL)
> > +			return -(ENOMEM);
> > +
> 
> Why not to allocate space for txq_bufs together with tx_queues (as one
> chunk for both)?
> As I understand there is always one to one mapping between them anyway.
> Would simplify things a bit.
> Or even introduce a new struct to group with all related tx queue info
> togetehr struct rte_eth_txq_data {
> 	void *queue; /*actual pmd  queue*/
> 	struct rte_eth_dev_tx_buffer buf;
> 	uint8_t state;
> }
> And use it inside struct rte_eth_dev_data?
> Would probably give a better data locality.
> 

Introducing such a struct will require a huge rework of pmd drivers. I don't think it's worth only for this one feature. 


> > +/**
> > + * @internal
> > + * Structure used to buffer packets for future TX
> > + * Used by APIs rte_eth_tx_buffer and rte_eth_tx_buffer_flush  */
> > +struct rte_eth_dev_tx_buffer {
> > +	struct rte_mbuf *pkts[RTE_ETHDEV_TX_BUFSIZE];
> 
> I think it is better to make size of pkts[] configurable at runtime.
> There are a lot of different usage scenarios - hard to predict what would
> be an optimal buffer size for all cases.
> 

This buffer is allocated in eth_dev shared memory, so there are two scenarios:
1) We have prealocated buffer with maximal size, and then we can set threshold level without restarting device, or
2) We need to set its size before starting device.

Second one is better, I think.

Tomasz
  
Ananyev, Konstantin Feb. 2, 2016, 1:49 p.m. UTC | #5
Hi Tomasz,

> -----Original Message-----
> From: Kulasek, TomaszX
> Sent: Tuesday, February 02, 2016 10:01 AM
> To: Ananyev, Konstantin; dev@dpdk.org
> Subject: RE: [dpdk-dev] [PATCH 1/2] ethdev: add buffered tx api
> 
> Hi Konstantin,
> 
> > -----Original Message-----
> > From: Ananyev, Konstantin
> > Sent: Friday, January 15, 2016 19:45
> > To: Kulasek, TomaszX; dev@dpdk.org
> > Subject: RE: [dpdk-dev] [PATCH 1/2] ethdev: add buffered tx api
> >
> > Hi Tomasz,
> >
> > >
> > > +		/* get new buffer space first, but keep old space around */
> > > +		new_bufs = rte_zmalloc("ethdev->txq_bufs",
> > > +				sizeof(*dev->data->txq_bufs) * nb_queues, 0);
> > > +		if (new_bufs == NULL)
> > > +			return -(ENOMEM);
> > > +
> >
> > Why not to allocate space for txq_bufs together with tx_queues (as one
> > chunk for both)?
> > As I understand there is always one to one mapping between them anyway.
> > Would simplify things a bit.
> > Or even introduce a new struct to group with all related tx queue info
> > togetehr struct rte_eth_txq_data {
> > 	void *queue; /*actual pmd  queue*/
> > 	struct rte_eth_dev_tx_buffer buf;
> > 	uint8_t state;
> > }
> > And use it inside struct rte_eth_dev_data?
> > Would probably give a better data locality.
> >
> 
> Introducing such a struct will require a huge rework of pmd drivers. I don't think it's worth only for this one feature.

Why not?
Things are getting more and more messy here: now we have a separate array of pointer to queues,
Separate array of queue states, you are going to add separate array of tx buffers.
For me it seems logical to unite all these 3 fields into one sub-struct. 

> 
> 
> > > +/**
> > > + * @internal
> > > + * Structure used to buffer packets for future TX
> > > + * Used by APIs rte_eth_tx_buffer and rte_eth_tx_buffer_flush  */
> > > +struct rte_eth_dev_tx_buffer {
> > > +	struct rte_mbuf *pkts[RTE_ETHDEV_TX_BUFSIZE];
> >
> > I think it is better to make size of pkts[] configurable at runtime.
> > There are a lot of different usage scenarios - hard to predict what would
> > be an optimal buffer size for all cases.
> >
> 
> This buffer is allocated in eth_dev shared memory, so there are two scenarios:
> 1) We have prealocated buffer with maximal size, and then we can set threshold level without restarting device, or
> 2) We need to set its size before starting device.

> 
> Second one is better, I think.

Yep, I was thinking about 2) too.
Might be an extra parameter in struct rte_eth_txconf.

> 
> Tomasz
  
Tomasz Kulasek Feb. 9, 2016, 5:02 p.m. UTC | #6
> -----Original Message-----
> From: Ananyev, Konstantin
> Sent: Tuesday, February 2, 2016 14:50
> To: Kulasek, TomaszX <tomaszx.kulasek@intel.com>; dev@dpdk.org
> Subject: RE: [dpdk-dev] [PATCH 1/2] ethdev: add buffered tx api
> 
> Hi Tomasz,
> 
> > -----Original Message-----
> > From: Kulasek, TomaszX
> > Sent: Tuesday, February 02, 2016 10:01 AM
> > To: Ananyev, Konstantin; dev@dpdk.org
> > Subject: RE: [dpdk-dev] [PATCH 1/2] ethdev: add buffered tx api
> >
> > Hi Konstantin,
> >
> > > -----Original Message-----
> > > From: Ananyev, Konstantin
> > > Sent: Friday, January 15, 2016 19:45
> > > To: Kulasek, TomaszX; dev@dpdk.org
> > > Subject: RE: [dpdk-dev] [PATCH 1/2] ethdev: add buffered tx api
> > >
> > > Hi Tomasz,
> > >
> > > >
> > > > +		/* get new buffer space first, but keep old space around
> */
> > > > +		new_bufs = rte_zmalloc("ethdev->txq_bufs",
> > > > +				sizeof(*dev->data->txq_bufs) * nb_queues, 0);
> > > > +		if (new_bufs == NULL)
> > > > +			return -(ENOMEM);
> > > > +
> > >
> > > Why not to allocate space for txq_bufs together with tx_queues (as
> > > one chunk for both)?
> > > As I understand there is always one to one mapping between them
> anyway.
> > > Would simplify things a bit.
> > > Or even introduce a new struct to group with all related tx queue
> > > info togetehr struct rte_eth_txq_data {
> > > 	void *queue; /*actual pmd  queue*/
> > > 	struct rte_eth_dev_tx_buffer buf;
> > > 	uint8_t state;
> > > }
> > > And use it inside struct rte_eth_dev_data?
> > > Would probably give a better data locality.
> > >
> >
> > Introducing such a struct will require a huge rework of pmd drivers. I
> don't think it's worth only for this one feature.
> 
> Why not?
> Things are getting more and more messy here: now we have a separate array
> of pointer to queues, Separate array of queue states, you are going to add
> separate array of tx buffers.
> For me it seems logical to unite all these 3 fields into one sub-struct.
> 

I agree with you, and probably such a work will be nice also for rx queues, but these two changes impacts on another part of dpdk. While buffered tx API is more client application helper.

For me these two thinks are different features and should be made separately because:
1) They are independent and can be done separately,
2) They can (and should) be reviewed, tested and approved separately,
3) They are addressed to another type of people (tx buffering to application developers, rte_eth_dev_data to pmd developers), so another people can be interested in having (or not) one or second feature

Even for bug tracking it will be cleaner to separate these two things. And yes, it is logical to unite it, maybe also for rx queues, but should be discussed separately.

I've made a prototype with this rework, and the impact on the code not related to this particular feature is too wide and strong to join them. I would rather to provide it as independent patch for further discussion only on it, if needed.

> >
> >
> > > > +/**
> > > > + * @internal
> > > > + * Structure used to buffer packets for future TX
> > > > + * Used by APIs rte_eth_tx_buffer and rte_eth_tx_buffer_flush  */
> > > > +struct rte_eth_dev_tx_buffer {
> > > > +	struct rte_mbuf *pkts[RTE_ETHDEV_TX_BUFSIZE];
> > >
> > > I think it is better to make size of pkts[] configurable at runtime.
> > > There are a lot of different usage scenarios - hard to predict what
> > > would be an optimal buffer size for all cases.
> > >
> >
> > This buffer is allocated in eth_dev shared memory, so there are two
> scenarios:
> > 1) We have prealocated buffer with maximal size, and then we can set
> > threshold level without restarting device, or
> > 2) We need to set its size before starting device.
> 
> >
> > Second one is better, I think.
> 
> Yep, I was thinking about 2) too.
> Might be an extra parameter in struct rte_eth_txconf.
> 

Struct rte_eth_txconf is passed to ethdev after rte_eth_dev_tx_queue_config, so we don't know its value when buffers are allocated.
I'm looking for another solution.

> >
> > Tomasz
  
Ananyev, Konstantin Feb. 9, 2016, 11:56 p.m. UTC | #7
> -----Original Message-----
> From: Kulasek, TomaszX
> Sent: Tuesday, February 09, 2016 5:03 PM
> To: Ananyev, Konstantin; dev@dpdk.org
> Subject: RE: [dpdk-dev] [PATCH 1/2] ethdev: add buffered tx api
> 
> 
> 
> > -----Original Message-----
> > From: Ananyev, Konstantin
> > Sent: Tuesday, February 2, 2016 14:50
> > To: Kulasek, TomaszX <tomaszx.kulasek@intel.com>; dev@dpdk.org
> > Subject: RE: [dpdk-dev] [PATCH 1/2] ethdev: add buffered tx api
> >
> > Hi Tomasz,
> >
> > > -----Original Message-----
> > > From: Kulasek, TomaszX
> > > Sent: Tuesday, February 02, 2016 10:01 AM
> > > To: Ananyev, Konstantin; dev@dpdk.org
> > > Subject: RE: [dpdk-dev] [PATCH 1/2] ethdev: add buffered tx api
> > >
> > > Hi Konstantin,
> > >
> > > > -----Original Message-----
> > > > From: Ananyev, Konstantin
> > > > Sent: Friday, January 15, 2016 19:45
> > > > To: Kulasek, TomaszX; dev@dpdk.org
> > > > Subject: RE: [dpdk-dev] [PATCH 1/2] ethdev: add buffered tx api
> > > >
> > > > Hi Tomasz,
> > > >
> > > > >
> > > > > +		/* get new buffer space first, but keep old space around
> > */
> > > > > +		new_bufs = rte_zmalloc("ethdev->txq_bufs",
> > > > > +				sizeof(*dev->data->txq_bufs) * nb_queues, 0);
> > > > > +		if (new_bufs == NULL)
> > > > > +			return -(ENOMEM);
> > > > > +
> > > >
> > > > Why not to allocate space for txq_bufs together with tx_queues (as
> > > > one chunk for both)?
> > > > As I understand there is always one to one mapping between them
> > anyway.
> > > > Would simplify things a bit.
> > > > Or even introduce a new struct to group with all related tx queue
> > > > info togetehr struct rte_eth_txq_data {
> > > > 	void *queue; /*actual pmd  queue*/
> > > > 	struct rte_eth_dev_tx_buffer buf;
> > > > 	uint8_t state;
> > > > }
> > > > And use it inside struct rte_eth_dev_data?
> > > > Would probably give a better data locality.
> > > >
> > >
> > > Introducing such a struct will require a huge rework of pmd drivers. I
> > don't think it's worth only for this one feature.
> >
> > Why not?
> > Things are getting more and more messy here: now we have a separate array
> > of pointer to queues, Separate array of queue states, you are going to add
> > separate array of tx buffers.
> > For me it seems logical to unite all these 3 fields into one sub-struct.
> >
> 
> I agree with you, and probably such a work will be nice also for rx queues, but these two changes impacts on another part of dpdk.
> While buffered tx API is more client application helper.
> 
> For me these two thinks are different features and should be made separately because:
> 1) They are independent and can be done separately,
> 2) They can (and should) be reviewed, tested and approved separately,
> 3) They are addressed to another type of people (tx buffering to application developers, rte_eth_dev_data to pmd developers), so
> another people can be interested in having (or not) one or second feature

Such division seems a bit artificial to me :)
You are making changes in rte_ethdev.[c,h]  - I think that filed regrouping would make code cleaner and easier to read/maintain.

> 
> Even for bug tracking it will be cleaner to separate these two things. And yes, it is logical to unite it, maybe also for rx queues, but
> should be discussed separately.
> 
> I've made a prototype with this rework, and the impact on the code not related to this particular feature is too wide and strong to join
> them. I would rather to provide it as independent patch for further discussion only on it, if needed.

Sure, separate patch is fine.
Why not to submit it as extra one is the series?


> 
> > >
> > >
> > > > > +/**
> > > > > + * @internal
> > > > > + * Structure used to buffer packets for future TX
> > > > > + * Used by APIs rte_eth_tx_buffer and rte_eth_tx_buffer_flush  */
> > > > > +struct rte_eth_dev_tx_buffer {
> > > > > +	struct rte_mbuf *pkts[RTE_ETHDEV_TX_BUFSIZE];
> > > >
> > > > I think it is better to make size of pkts[] configurable at runtime.
> > > > There are a lot of different usage scenarios - hard to predict what
> > > > would be an optimal buffer size for all cases.
> > > >
> > >
> > > This buffer is allocated in eth_dev shared memory, so there are two
> > scenarios:
> > > 1) We have prealocated buffer with maximal size, and then we can set
> > > threshold level without restarting device, or
> > > 2) We need to set its size before starting device.
> >
> > >
> > > Second one is better, I think.
> >
> > Yep, I was thinking about 2) too.
> > Might be an extra parameter in struct rte_eth_txconf.
> >
> 
> Struct rte_eth_txconf is passed to ethdev after rte_eth_dev_tx_queue_config, so we don't know its value when buffers are
> allocated.

Ok, and why allocation of the tx buffer can't be done at rte_eth_tx_queue_setup()? 

Actually just thought why not to let rte_eth_tx_buffer() to accept struct rte_eth_dev_tx_buffer * as a parameter:
+static inline int __attribute__((always_inline))
+rte_eth_tx_buffer(uint8_t port_id, uint16_t queue_id,  accept struct rte_eth_dev_tx_buffer * txb, struct rte_mbuf *tx_pkt)
?

In that case we don't need to make any changes at rte_ethdev.[h,c] to alloc/free/maintain tx_buffer inside each queue...
It all will be upper layer responsibility.
So no need to modify existing rte_ethdev structures/code.
Again, no need for error callback - caller would check return value and decide what to do with unsent packets in the tx_buffer.

Konstantin

> I'm looking for another solution.
> 
> > >
> > > Tomasz
  
Ananyev, Konstantin Feb. 12, 2016, 11:44 a.m. UTC | #8
> 
> > -----Original Message-----
> > From: Kulasek, TomaszX
> > Sent: Tuesday, February 09, 2016 5:03 PM
> > To: Ananyev, Konstantin; dev@dpdk.org
> > Subject: RE: [dpdk-dev] [PATCH 1/2] ethdev: add buffered tx api
> >
> >
> >
> > > -----Original Message-----
> > > From: Ananyev, Konstantin
> > > Sent: Tuesday, February 2, 2016 14:50
> > > To: Kulasek, TomaszX <tomaszx.kulasek@intel.com>; dev@dpdk.org
> > > Subject: RE: [dpdk-dev] [PATCH 1/2] ethdev: add buffered tx api
> > >
> > > Hi Tomasz,
> > >
> > > > -----Original Message-----
> > > > From: Kulasek, TomaszX
> > > > Sent: Tuesday, February 02, 2016 10:01 AM
> > > > To: Ananyev, Konstantin; dev@dpdk.org
> > > > Subject: RE: [dpdk-dev] [PATCH 1/2] ethdev: add buffered tx api
> > > >
> > > > Hi Konstantin,
> > > >
> > > > > -----Original Message-----
> > > > > From: Ananyev, Konstantin
> > > > > Sent: Friday, January 15, 2016 19:45
> > > > > To: Kulasek, TomaszX; dev@dpdk.org
> > > > > Subject: RE: [dpdk-dev] [PATCH 1/2] ethdev: add buffered tx api
> > > > >
> > > > > Hi Tomasz,
> > > > >
> > > > > >
> > > > > > +		/* get new buffer space first, but keep old space around
> > > */
> > > > > > +		new_bufs = rte_zmalloc("ethdev->txq_bufs",
> > > > > > +				sizeof(*dev->data->txq_bufs) * nb_queues, 0);
> > > > > > +		if (new_bufs == NULL)
> > > > > > +			return -(ENOMEM);
> > > > > > +
> > > > >
> > > > > Why not to allocate space for txq_bufs together with tx_queues (as
> > > > > one chunk for both)?
> > > > > As I understand there is always one to one mapping between them
> > > anyway.
> > > > > Would simplify things a bit.
> > > > > Or even introduce a new struct to group with all related tx queue
> > > > > info togetehr struct rte_eth_txq_data {
> > > > > 	void *queue; /*actual pmd  queue*/
> > > > > 	struct rte_eth_dev_tx_buffer buf;
> > > > > 	uint8_t state;
> > > > > }
> > > > > And use it inside struct rte_eth_dev_data?
> > > > > Would probably give a better data locality.
> > > > >
> > > >
> > > > Introducing such a struct will require a huge rework of pmd drivers. I
> > > don't think it's worth only for this one feature.
> > >
> > > Why not?
> > > Things are getting more and more messy here: now we have a separate array
> > > of pointer to queues, Separate array of queue states, you are going to add
> > > separate array of tx buffers.
> > > For me it seems logical to unite all these 3 fields into one sub-struct.
> > >
> >
> > I agree with you, and probably such a work will be nice also for rx queues, but these two changes impacts on another part of dpdk.
> > While buffered tx API is more client application helper.
> >
> > For me these two thinks are different features and should be made separately because:
> > 1) They are independent and can be done separately,
> > 2) They can (and should) be reviewed, tested and approved separately,
> > 3) They are addressed to another type of people (tx buffering to application developers, rte_eth_dev_data to pmd developers), so
> > another people can be interested in having (or not) one or second feature
> 
> Such division seems a bit artificial to me :)
> You are making changes in rte_ethdev.[c,h]  - I think that filed regrouping would make code cleaner and easier to read/maintain.
> 
> >
> > Even for bug tracking it will be cleaner to separate these two things. And yes, it is logical to unite it, maybe also for rx queues, but
> > should be discussed separately.
> >
> > I've made a prototype with this rework, and the impact on the code not related to this particular feature is too wide and strong to
> join
> > them. I would rather to provide it as independent patch for further discussion only on it, if needed.
> 
> Sure, separate patch is fine.
> Why not to submit it as extra one is the series?
> 
> 
> >
> > > >
> > > >
> > > > > > +/**
> > > > > > + * @internal
> > > > > > + * Structure used to buffer packets for future TX
> > > > > > + * Used by APIs rte_eth_tx_buffer and rte_eth_tx_buffer_flush  */
> > > > > > +struct rte_eth_dev_tx_buffer {
> > > > > > +	struct rte_mbuf *pkts[RTE_ETHDEV_TX_BUFSIZE];
> > > > >
> > > > > I think it is better to make size of pkts[] configurable at runtime.
> > > > > There are a lot of different usage scenarios - hard to predict what
> > > > > would be an optimal buffer size for all cases.
> > > > >
> > > >
> > > > This buffer is allocated in eth_dev shared memory, so there are two
> > > scenarios:
> > > > 1) We have prealocated buffer with maximal size, and then we can set
> > > > threshold level without restarting device, or
> > > > 2) We need to set its size before starting device.
> > >
> > > >
> > > > Second one is better, I think.
> > >
> > > Yep, I was thinking about 2) too.
> > > Might be an extra parameter in struct rte_eth_txconf.
> > >
> >
> > Struct rte_eth_txconf is passed to ethdev after rte_eth_dev_tx_queue_config, so we don't know its value when buffers are
> > allocated.
> 
> Ok, and why allocation of the tx buffer can't be done at rte_eth_tx_queue_setup()?
> 
> Actually just thought why not to let rte_eth_tx_buffer() to accept struct rte_eth_dev_tx_buffer * as a parameter:
> +static inline int __attribute__((always_inline))
> +rte_eth_tx_buffer(uint8_t port_id, uint16_t queue_id,  accept struct rte_eth_dev_tx_buffer * txb, struct rte_mbuf *tx_pkt)
> ?
> 
> In that case we don't need to make any changes at rte_ethdev.[h,c] to alloc/free/maintain tx_buffer inside each queue...
> It all will be upper layer responsibility.
> So no need to modify existing rte_ethdev structures/code.
> Again, no need for error callback - caller would check return value and decide what to do with unsent packets in the tx_buffer.
> 

Just to summarise why I think it is better to have tx buffering managed on the app level:

1. avoid any ABI change.
2. Avoid extra changes in rte_ethdev.c: tx_queue_setup/tx_queue_stop.
3. Provides much more flexibility to the user:
   a) where to allocate space for tx_buffer (stack, heap, hugepages, etc).
   b) user can mix and match plain tx_burst() and   tx_buffer/tx_buffer_flush()
        in any way he fills it appropriate.
   c) user can change the size of tx_buffer without stop/re-config/start queue:
        just allocate new larger(smaller) tx_buffer & copy contents to the new one.
   d) user can preserve buffered packets through device restart circle:
        i.e if let say TX hang happened, and user has to do dev_stop/dev_start -
        contents of tx_buffer will stay unchanged and its contents could be
        (re-)transmitted after device is up again, or  through different port/queue if needed.
 
As a drawbacks mentioned - tx error handling becomes less transparent...
But we can add error handling routine and it's user provided parameter
into struct rte_eth_dev_tx_buffer', something like that:

+struct rte_eth_dev_tx_buffer {
+	buffer_tx_error_fn cbfn;
+	void *userdata;
+	unsigned nb_pkts;
+	uint64_t errors;
+	/**< Total number of queue packets to sent that are dropped. */
+	struct rte_mbuf *pkts[];
+};

Konstantin
  
Ivan Boule Feb. 12, 2016, 4:40 p.m. UTC | #9
On 02/12/2016 12:44 PM, Ananyev, Konstantin wrote:
>
>>
>>> -----Original Message-----
...
>>
>> In that case we don't need to make any changes at rte_ethdev.[h,c] to alloc/free/maintain tx_buffer inside each queue...
>> It all will be upper layer responsibility.
>> So no need to modify existing rte_ethdev structures/code.
>> Again, no need for error callback - caller would check return value and decide what to do with unsent packets in the tx_buffer.
>>
>
> Just to summarise why I think it is better to have tx buffering managed on the app level:
>
> 1. avoid any ABI change.
> 2. Avoid extra changes in rte_ethdev.c: tx_queue_setup/tx_queue_stop.
> 3. Provides much more flexibility to the user:
>     a) where to allocate space for tx_buffer (stack, heap, hugepages, etc).
>     b) user can mix and match plain tx_burst() and   tx_buffer/tx_buffer_flush()
>          in any way he fills it appropriate.
>     c) user can change the size of tx_buffer without stop/re-config/start queue:
>          just allocate new larger(smaller) tx_buffer & copy contents to the new one.
>     d) user can preserve buffered packets through device restart circle:
>          i.e if let say TX hang happened, and user has to do dev_stop/dev_start -
>          contents of tx_buffer will stay unchanged and its contents could be
>          (re-)transmitted after device is up again, or  through different port/queue if needed.
>
> As a drawbacks mentioned - tx error handling becomes less transparent...
> But we can add error handling routine and it's user provided parameter
> into struct rte_eth_dev_tx_buffer', something like that:
>
> +struct rte_eth_dev_tx_buffer {
> +	buffer_tx_error_fn cbfn;
> +	void *userdata;
> +	unsigned nb_pkts;
> +	uint64_t errors;
> +	/**< Total number of queue packets to sent that are dropped. */
> +	struct rte_mbuf *pkts[];
> +};
>
> Konstantin
>

Just to enforce Konstantin's comments.
As a very basic - not to say fundamental - rule, one should avoid adding 
in the PMD RX/TX API any extra processing that can be handled at a 
higher level.
The only and self-sufficient reason is that we must avoid impacting 
performances on the critical path, in particular for those - usually the 
majority of - applications that do not need such extra operations, or 
better implement them at upper level.

Maybe in a not so far future will come a proposal for forking a new open 
source fast-dpdk project aiming at providing API simplicity, 
zero-overhead, modular design, and all those nice properties that every 
one claims to seek :-)

Ivan
  
Bruce Richardson Feb. 12, 2016, 5:33 p.m. UTC | #10
On Fri, Feb 12, 2016 at 05:40:02PM +0100, Ivan Boule wrote:
> On 02/12/2016 12:44 PM, Ananyev, Konstantin wrote:
> >
> >>
> >>>-----Original Message-----
> ...
> >>
> >>In that case we don't need to make any changes at rte_ethdev.[h,c] to alloc/free/maintain tx_buffer inside each queue...
> >>It all will be upper layer responsibility.
> >>So no need to modify existing rte_ethdev structures/code.
> >>Again, no need for error callback - caller would check return value and decide what to do with unsent packets in the tx_buffer.
> >>
> >
> >Just to summarise why I think it is better to have tx buffering managed on the app level:
> >
> >1. avoid any ABI change.
> >2. Avoid extra changes in rte_ethdev.c: tx_queue_setup/tx_queue_stop.
> >3. Provides much more flexibility to the user:
> >    a) where to allocate space for tx_buffer (stack, heap, hugepages, etc).
> >    b) user can mix and match plain tx_burst() and   tx_buffer/tx_buffer_flush()
> >         in any way he fills it appropriate.
> >    c) user can change the size of tx_buffer without stop/re-config/start queue:
> >         just allocate new larger(smaller) tx_buffer & copy contents to the new one.
> >    d) user can preserve buffered packets through device restart circle:
> >         i.e if let say TX hang happened, and user has to do dev_stop/dev_start -
> >         contents of tx_buffer will stay unchanged and its contents could be
> >         (re-)transmitted after device is up again, or  through different port/queue if needed.
> >
> >As a drawbacks mentioned - tx error handling becomes less transparent...
> >But we can add error handling routine and it's user provided parameter
> >into struct rte_eth_dev_tx_buffer', something like that:
> >
> >+struct rte_eth_dev_tx_buffer {
> >+	buffer_tx_error_fn cbfn;
> >+	void *userdata;
> >+	unsigned nb_pkts;
> >+	uint64_t errors;
> >+	/**< Total number of queue packets to sent that are dropped. */
> >+	struct rte_mbuf *pkts[];
> >+};
> >
> >Konstantin
> >
> 
> Just to enforce Konstantin's comments.
> As a very basic - not to say fundamental - rule, one should avoid adding in
> the PMD RX/TX API any extra processing that can be handled at a higher
> level.
> The only and self-sufficient reason is that we must avoid impacting
> performances on the critical path, in particular for those - usually the
> majority of - applications that do not need such extra operations, or better
> implement them at upper level.
> 
> Maybe in a not so far future will come a proposal for forking a new open
> source fast-dpdk project aiming at providing API simplicity, zero-overhead,
> modular design, and all those nice properties that every one claims to seek
> :-)
> 
> Ivan
> 
Hi Ivan,

I completely agree with your comments. However, none of the proposals for TX 
buffering would impact the existing fast-path processing paths. They simply add
an optional buffering layer above it - as is done by a very large number of our
sample apps. The point of this patchset is to reduce or eliminate this duplication
of code by centralising it in the libs.

Of the different ways of doing this proposed, my slight preference is for the
original one due to the simplicity of the APIs it provides, but the advantages
in flexibility provided by Konstantin's proposals may outway the additional 
"ugliness" in the APIs.

Regards,
/Bruce
  

Patch

diff --git a/config/common_bsdapp b/config/common_bsdapp
index ed7c31c..8a2e4c5 100644
--- a/config/common_bsdapp
+++ b/config/common_bsdapp
@@ -148,6 +148,7 @@  CONFIG_RTE_MAX_QUEUES_PER_PORT=1024
 CONFIG_RTE_LIBRTE_IEEE1588=n
 CONFIG_RTE_ETHDEV_QUEUE_STAT_CNTRS=16
 CONFIG_RTE_ETHDEV_RXTX_CALLBACKS=y
+CONFIG_RTE_ETHDEV_TX_BUFSIZE=32
 
 #
 # Support NIC bypass logic
diff --git a/config/common_linuxapp b/config/common_linuxapp
index 74bc515..6229cab 100644
--- a/config/common_linuxapp
+++ b/config/common_linuxapp
@@ -146,6 +146,7 @@  CONFIG_RTE_MAX_QUEUES_PER_PORT=1024
 CONFIG_RTE_LIBRTE_IEEE1588=n
 CONFIG_RTE_ETHDEV_QUEUE_STAT_CNTRS=16
 CONFIG_RTE_ETHDEV_RXTX_CALLBACKS=y
+CONFIG_RTE_ETHDEV_TX_BUFSIZE=32
 
 #
 # Support NIC bypass logic
diff --git a/lib/librte_ether/rte_ethdev.c b/lib/librte_ether/rte_ethdev.c
index ed971b4..27dac1b 100644
--- a/lib/librte_ether/rte_ethdev.c
+++ b/lib/librte_ether/rte_ethdev.c
@@ -1,7 +1,7 @@ 
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2015 Intel Corporation. All rights reserved.
+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
  *   All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
@@ -826,11 +826,42 @@  rte_eth_dev_tx_queue_stop(uint8_t port_id, uint16_t tx_queue_id)
 
 }
 
+void
+rte_eth_count_unsent_packet_callback(struct rte_mbuf **pkts, uint16_t unsent,
+		void *userdata)
+{
+	unsigned long *count = userdata;
+	unsigned i;
+
+	for (i = 0; i < unsent; i++)
+		rte_pktmbuf_free(pkts[i]);
+
+	*count += unsent;
+}
+
+int
+rte_eth_tx_buffer_set_err_callback(uint8_t port_id, uint16_t queue_id,
+		buffer_tx_error_fn cbfn, void *userdata)
+{
+	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+
+	if (!rte_eth_dev_is_valid_port(port_id) ||
+			queue_id >= dev->data->nb_tx_queues) {
+		rte_errno = EINVAL;
+		return -1;
+	}
+
+	dev->tx_buf_err_cb[queue_id].userdata = userdata;
+	dev->tx_buf_err_cb[queue_id].flush_cb = cbfn;
+	return 0;
+}
+
 static int
 rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
 {
 	uint16_t old_nb_queues = dev->data->nb_tx_queues;
 	void **txq;
+	struct rte_eth_dev_tx_buffer *new_bufs;
 	unsigned i;
 
 	if (dev->data->tx_queues == NULL) { /* first time configuration */
@@ -841,17 +872,40 @@  rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
 			dev->data->nb_tx_queues = 0;
 			return -(ENOMEM);
 		}
+
+		dev->data->txq_bufs = rte_zmalloc("ethdev->txq_bufs",
+				sizeof(*dev->data->txq_bufs) * nb_queues, 0);
+		if (dev->data->txq_bufs == NULL) {
+			dev->data->nb_tx_queues = 0;
+			rte_free(dev->data->tx_queues);
+			return -(ENOMEM);
+		}
+
 	} else { /* re-configure */
+
+		/* flush the packets queued for all queues*/
+		for (i = 0; i < old_nb_queues; i++)
+			rte_eth_tx_buffer_flush(dev->data->port_id, i);
+
 		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_release, -ENOTSUP);
 
+		/* get new buffer space first, but keep old space around */
+		new_bufs = rte_zmalloc("ethdev->txq_bufs",
+				sizeof(*dev->data->txq_bufs) * nb_queues, 0);
+		if (new_bufs == NULL)
+			return -(ENOMEM);
+
 		txq = dev->data->tx_queues;
 
 		for (i = nb_queues; i < old_nb_queues; i++)
 			(*dev->dev_ops->tx_queue_release)(txq[i]);
 		txq = rte_realloc(txq, sizeof(txq[0]) * nb_queues,
 				  RTE_CACHE_LINE_SIZE);
-		if (txq == NULL)
-			return -ENOMEM;
+		if (txq == NULL) {
+			rte_free(new_bufs);
+			return -(ENOMEM);
+		}
+
 		if (nb_queues > old_nb_queues) {
 			uint16_t new_qs = nb_queues - old_nb_queues;
 
@@ -861,6 +915,9 @@  rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
 
 		dev->data->tx_queues = txq;
 
+		/* now replace old buffers with new */
+		rte_free(dev->data->txq_bufs);
+		dev->data->txq_bufs = new_bufs;
 	}
 	dev->data->nb_tx_queues = nb_queues;
 	return 0;
diff --git a/lib/librte_ether/rte_ethdev.h b/lib/librte_ether/rte_ethdev.h
index bada8ad..23faa6a 100644
--- a/lib/librte_ether/rte_ethdev.h
+++ b/lib/librte_ether/rte_ethdev.h
@@ -1,7 +1,7 @@ 
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2015 Intel Corporation. All rights reserved.
+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
  *   All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
@@ -182,6 +182,7 @@  extern "C" {
 #include <rte_pci.h>
 #include <rte_dev.h>
 #include <rte_devargs.h>
+#include <rte_branch_prediction.h>
 #include "rte_ether.h"
 #include "rte_eth_ctrl.h"
 #include "rte_dev_info.h"
@@ -1519,6 +1520,34 @@  enum rte_eth_dev_type {
 	RTE_ETH_DEV_MAX		/**< max value of this enum */
 };
 
+typedef void (*buffer_tx_error_fn)(struct rte_mbuf **unsent, uint16_t count,
+		void *userdata);
+
+/**
+ * @internal
+ * Structure used to buffer packets for future TX
+ * Used by APIs rte_eth_tx_buffer and rte_eth_tx_buffer_flush
+ */
+struct rte_eth_dev_tx_buffer {
+	struct rte_mbuf *pkts[RTE_ETHDEV_TX_BUFSIZE];
+	unsigned nb_pkts;
+	uint64_t errors;
+	/**< Total number of queue packets to sent that are dropped. */
+};
+
+/**
+ * @internal
+ * Structure to hold a callback to be used on error when a tx_buffer_flush
+ * call fails to send all packets.
+ * This needs to be a separate structure, as it must go in the ethdev structure
+ * rather than ethdev_data, due to the use of a function pointer, which is not
+ * multi-process safe.
+ */
+struct rte_eth_dev_tx_buffer_err_cb {
+	buffer_tx_error_fn flush_cb; /* callback for when tx_burst fails */
+	void *userdata;              /* userdata for callback */
+};
+
 /**
  * @internal
  * The generic data structure associated with each ethernet device.
@@ -1550,6 +1579,9 @@  struct rte_eth_dev {
 	struct rte_eth_rxtx_callback *pre_tx_burst_cbs[RTE_MAX_QUEUES_PER_PORT];
 	uint8_t attached; /**< Flag indicating the port is attached */
 	enum rte_eth_dev_type dev_type; /**< Flag indicating the device type */
+
+	/** Callbacks to be used on a tx_buffer_flush error */
+	struct rte_eth_dev_tx_buffer_err_cb tx_buf_err_cb[RTE_MAX_QUEUES_PER_PORT];
 };
 
 struct rte_eth_dev_sriov {
@@ -1610,6 +1642,8 @@  struct rte_eth_dev_data {
 	enum rte_kernel_driver kdrv;    /**< Kernel driver passthrough */
 	int numa_node;  /**< NUMA node connection */
 	const char *drv_name;   /**< Driver name */
+	struct rte_eth_dev_tx_buffer *txq_bufs;
+	/**< space to allow buffered transmits */
 };
 
 /** Device supports hotplug detach */
@@ -2661,8 +2695,181 @@  rte_eth_tx_burst(uint8_t port_id, uint16_t queue_id,
 }
 
 /**
- * The eth device event type for interrupt, and maybe others in the future.
+ * Buffer a single packet for future transmission on a port and queue
+ *
+ * This function takes a single mbuf/packet and buffers it for later
+ * transmission on the particular port and queue specified. Once the buffer is
+ * full of packets, an attempt will be made to transmit all the buffered
+ * packets. In case of error, where not all packets can be transmitted, a
+ * callback is called with the unsent packets as a parameter. If no callback
+ * is explicitly set up, the unsent packets are just freed back to the owning
+ * mempool. The function returns the number of packets actually sent i.e.
+ * 0 if no buffer flush occurred, otherwise the number of packets successfully
+ * flushed
+ *
+ * @param port_id
+ *   The port identifier of the Ethernet device.
+ * @param queue_id
+ *   The index of the transmit queue through which output packets must be
+ *   sent.
+ *   The value must be in the range [0, nb_tx_queue - 1] previously supplied
+ *   to rte_eth_dev_configure().
+ * @param tx_pkt
+ *   Pointer to the packet mbuf to be sent.
+ * @return
+ *   0 = packet has been buffered for later transmission
+ *   N > 0 = packet has been buffered, and the buffer was subsequently flushed,
+ *     causing N packets to be sent, and the error callback to be called for
+ *     the rest.
+ */
+static inline uint16_t __attribute__((always_inline))
+rte_eth_tx_buffer(uint8_t port_id, uint16_t queue_id, struct rte_mbuf *tx_pkt)
+{
+	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+	struct rte_eth_dev_tx_buffer *qbuf = &dev->data->txq_bufs[queue_id];
+	uint16_t i;
+
+	qbuf->pkts[qbuf->nb_pkts++] = tx_pkt;
+	if (qbuf->nb_pkts < RTE_ETHDEV_TX_BUFSIZE)
+		return 0;
+
+	const uint16_t sent = rte_eth_tx_burst(port_id, queue_id, qbuf->pkts,
+			RTE_ETHDEV_TX_BUFSIZE);
+
+	qbuf->nb_pkts = 0;
+
+	/* All packets sent, or to be dealt with by callback below */
+	if (unlikely(sent != RTE_ETHDEV_TX_BUFSIZE)) {
+		if (dev->tx_buf_err_cb[queue_id].flush_cb)
+			dev->tx_buf_err_cb[queue_id].flush_cb(&qbuf->pkts[sent],
+					RTE_ETHDEV_TX_BUFSIZE - sent,
+					dev->tx_buf_err_cb[queue_id].userdata);
+		else {
+			qbuf->errors += RTE_ETHDEV_TX_BUFSIZE - sent;
+			for (i = sent; i < RTE_ETHDEV_TX_BUFSIZE; i++)
+				rte_pktmbuf_free(qbuf->pkts[i]);
+		}
+	}
+
+	return sent;
+}
+
+/**
+ * Send any packets queued up for transmission on a port and HW queue
+ *
+ * This causes an explicit flush of packets previously buffered via the
+ * rte_eth_tx_buffer() function. It returns the number of packets successfully
+ * sent to the NIC, and calls the error callback for any unsent packets. Unless
+ * explicitly set up otherwise, the default callback simply frees the unsent
+ * packets back to the owning mempool.
+ *
+ * @param port_id
+ *   The port identifier of the Ethernet device.
+ * @param queue_id
+ *   The index of the transmit queue through which output packets must be
+ *   sent.
+ *   The value must be in the range [0, nb_tx_queue - 1] previously supplied
+ *   to rte_eth_dev_configure().
+ * @return
+ *   The number of packets successfully sent to the Ethernet device. The error
+ *   callback is called for any packets which could not be sent.
+ */
+static inline uint16_t
+rte_eth_tx_buffer_flush(uint8_t port_id, uint16_t queue_id)
+{
+	uint16_t i;
+	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+	struct rte_eth_dev_tx_buffer *qbuf = &dev->data->txq_bufs[queue_id];
+
+	if (qbuf->nb_pkts == 0)
+		return 0;
+
+	const uint16_t to_send = qbuf->nb_pkts;
+
+	const uint16_t sent = rte_eth_tx_burst(port_id, queue_id, qbuf->pkts,
+			to_send);
+
+	qbuf->nb_pkts = 0;
+
+	/* All packets sent, or to be dealt with by callback below */
+	if (unlikely(sent != to_send)) {
+		if (dev->tx_buf_err_cb[queue_id].flush_cb)
+			dev->tx_buf_err_cb[queue_id].flush_cb(&qbuf->pkts[sent],
+					to_send - sent,
+					dev->tx_buf_err_cb[queue_id].userdata);
+		else {
+			qbuf->errors += to_send - sent;
+			for (i = sent; i < to_send; i++)
+				rte_pktmbuf_free(qbuf->pkts[i]);
+		}
+	}
+
+	return sent;
+}
+
+/**
+ * Configure a callback for buffered packets which cannot be sent
+ *
+ * Register a specific callback to be called when an attempt is made to send
+ * all packets buffered on an ethernet port, but not all packets can
+ * successfully be sent. The callback registered here will be called only
+ * from calls to rte_eth_tx_buffer() and rte_eth_tx_buffer_flush() APIs.
+ * The default callback configured for each queue by default just frees the
+ * packets back to the calling mempool. If additional behaviour is required,
+ * for example, to count dropped packets, or to retry transmission of packets
+ * which cannot be sent, this function should be used to register a suitable
+ * callback function to implement the desired behaviour.
+ * The example callback "rte_eth_count_unsent_packet_callback()" is also
+ * provided as reference.
+ *
+ * @param port_id
+ *   The port identifier of the Ethernet device.
+ * @param queue_id
+ *   The index of the transmit queue through which output packets must be
+ *   sent.
+ *   The value must be in the range [0, nb_tx_queue - 1] previously supplied
+ *   to rte_eth_dev_configure().
+ * @param cbfn
+ *   The function to be used as the callback.
+ * @param userdata
+ *   Arbitrary parameter to be passed to the callback function
+ * @return
+ *   0 on success, or -1 on error with rte_errno set appropriately
  */
+int
+rte_eth_tx_buffer_set_err_callback(uint8_t port_id, uint16_t queue_id,
+		buffer_tx_error_fn cbfn, void *userdata);
+
+/**
+ * Callback function for tracking unsent buffered packets.
+ *
+ * This function can be passed to rte_eth_tx_buffer_set_err_callback() to
+ * adjust the default behaviour when buffered packets cannot be sent. This
+ * function drops any unsent packets, but also updates a user-supplied counter
+ * to track the overall number of packets dropped. The counter should be an
+ * unsigned long variable.
+ *
+ * NOTE: this function should not be called directly, instead it should be used
+ *       as a callback for packet buffering.
+ *
+ * NOTE: when configuring this function as a callback with
+ *       rte_eth_tx_buffer_set_err_callback(), the final, userdata parameter
+ *       should point to an unsigned long value.
+ *
+ * @param pkts
+ *   The previously buffered packets which could not be sent
+ * @param unsent
+ *   The number of unsent packets in the pkts array
+ * @param userdata
+ *   Pointer to an unsigned long value, which will be incremented by unsent
+ */
+void
+rte_eth_count_unsent_packet_callback(struct rte_mbuf **pkts, uint16_t unsent,
+		void *userdata);
+
+/**
+* The eth device event type for interrupt, and maybe others in the future.
+*/
 enum rte_eth_event_type {
 	RTE_ETH_EVENT_UNKNOWN,  /**< unknown event type */
 	RTE_ETH_EVENT_INTR_LSC, /**< lsc interrupt event */
diff --git a/lib/librte_ether/rte_ether_version.map b/lib/librte_ether/rte_ether_version.map
index d8db24d..c2019d6 100644
--- a/lib/librte_ether/rte_ether_version.map
+++ b/lib/librte_ether/rte_ether_version.map
@@ -117,3 +117,11 @@  DPDK_2.2 {
 
 	local: *;
 };
+
+DPDK_2.3 {
+	global:
+
+	rte_eth_count_unsent_packet_callback;
+	rte_eth_tx_buffer_set_err_callback;
+
+} DPDK_2.2;