diff mbox series

[v5,5/7] ethdev: make fast-path functions to use new flat array

Message ID 20211007112750.25526-6-konstantin.ananyev@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Ferruh Yigit
Headers show
Series hide eth dev related structures | expand

Checks

Context Check Description
ci/checkpatch warning coding style issues

Commit Message

Ananyev, Konstantin Oct. 7, 2021, 11:27 a.m. UTC
Rework fast-path ethdev functions to use rte_eth_fp_ops[].
While it is an API/ABI breakage, this change is intended to be
transparent for both users (no changes in user app is required) and
PMD developers (no changes in PMD is required).
One extra thing to note - RX/TX callback invocation will cause extra
function call with these changes. That might cause some insignificant
slowdown for code-path where RX/TX callbacks are heavily involved.

Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
---
 lib/ethdev/ethdev_private.c |  31 +++++
 lib/ethdev/rte_ethdev.h     | 242 ++++++++++++++++++++++++++----------
 lib/ethdev/version.map      |   3 +
 3 files changed, 208 insertions(+), 68 deletions(-)

Comments

Andrew Rybchenko Oct. 11, 2021, 9:02 a.m. UTC | #1
On 10/7/21 2:27 PM, Konstantin Ananyev wrote:
> Rework fast-path ethdev functions to use rte_eth_fp_ops[].
> While it is an API/ABI breakage, this change is intended to be
> transparent for both users (no changes in user app is required) and
> PMD developers (no changes in PMD is required).
> One extra thing to note - RX/TX callback invocation will cause extra
> function call with these changes. That might cause some insignificant
> slowdown for code-path where RX/TX callbacks are heavily involved.

I'm sorry for nit picking here and below:

RX -> Rx, TX -> Tx everywhere above.

> 
> Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> ---
>  lib/ethdev/ethdev_private.c |  31 +++++
>  lib/ethdev/rte_ethdev.h     | 242 ++++++++++++++++++++++++++----------
>  lib/ethdev/version.map      |   3 +
>  3 files changed, 208 insertions(+), 68 deletions(-)
> 
> diff --git a/lib/ethdev/ethdev_private.c b/lib/ethdev/ethdev_private.c
> index 3eeda6e9f9..1222c6f84e 100644
> --- a/lib/ethdev/ethdev_private.c
> +++ b/lib/ethdev/ethdev_private.c
> @@ -226,3 +226,34 @@ eth_dev_fp_ops_setup(struct rte_eth_fp_ops *fpo,
>  	fpo->txq.data = dev->data->tx_queues;
>  	fpo->txq.clbk = (void **)(uintptr_t)dev->pre_tx_burst_cbs;
>  }
> +
> +uint16_t
> +rte_eth_call_rx_callbacks(uint16_t port_id, uint16_t queue_id,
> +	struct rte_mbuf **rx_pkts, uint16_t nb_rx, uint16_t nb_pkts,
> +	void *opaque)
> +{
> +	const struct rte_eth_rxtx_callback *cb = opaque;
> +
> +	while (cb != NULL) {
> +		nb_rx = cb->fn.rx(port_id, queue_id, rx_pkts, nb_rx,
> +				nb_pkts, cb->param);
> +		cb = cb->next;
> +	}
> +
> +	return nb_rx;
> +}
> +
> +uint16_t
> +rte_eth_call_tx_callbacks(uint16_t port_id, uint16_t queue_id,
> +	struct rte_mbuf **tx_pkts, uint16_t nb_pkts, void *opaque)
> +{
> +	const struct rte_eth_rxtx_callback *cb = opaque;
> +
> +	while (cb != NULL) {
> +		nb_pkts = cb->fn.tx(port_id, queue_id, tx_pkts, nb_pkts,
> +				cb->param);
> +		cb = cb->next;
> +	}
> +
> +	return nb_pkts;
> +}
> diff --git a/lib/ethdev/rte_ethdev.h b/lib/ethdev/rte_ethdev.h
> index cdd16d6e57..c0e1a40681 100644
> --- a/lib/ethdev/rte_ethdev.h
> +++ b/lib/ethdev/rte_ethdev.h
> @@ -4904,6 +4904,33 @@ int rte_eth_representor_info_get(uint16_t port_id,
>  
>  #include <rte_ethdev_core.h>
>  
> +/**
> + * @internal
> + * Helper routine for eth driver rx_burst API.

rx -> Rx

> + * Should be called at exit from PMD's rte_eth_rx_bulk implementation.
> + * Does necessary post-processing - invokes RX callbacks if any, etc.

RX -> Rx

> + *
> + * @param port_id
> + *  The port identifier of the Ethernet device.
> + * @param queue_id
> + *  The index of the receive queue from which to retrieve input packets.

Isn't:
The index of the queue from which packets are received from?

> + * @param rx_pkts
> + *   The address of an array of pointers to *rte_mbuf* structures that
> + *   have been retrieved from the device.
> + * @param nb_pkts

Should be @param nb_rx

> + *   The number of packets that were retrieved from the device.
> + * @param nb_pkts
> + *   The number of elements in *rx_pkts* array.

@p should be used to refer to a paramter.

The description does not help to understand why both nb_rx and
nb_pkts are necessary. Isn't nb_pkts >= nb_rx and nb_rx
sufficient?

> + * @param opaque
> + *   Opaque pointer of RX queue callback related data.

RX -> Rx

> + *
> + * @return
> + *  The number of packets effectively supplied to the *rx_pkts* array.

@p should be used to refer to a parameter.

> + */
> +uint16_t rte_eth_call_rx_callbacks(uint16_t port_id, uint16_t queue_id,
> +		struct rte_mbuf **rx_pkts, uint16_t nb_rx, uint16_t nb_pkts,
> +		void *opaque);
> +
>  /**
>   *
>   * Retrieve a burst of input packets from a receive queue of an Ethernet
> @@ -4995,23 +5022,37 @@ static inline uint16_t
>  rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,
>  		 struct rte_mbuf **rx_pkts, const uint16_t nb_pkts)
>  {
> -	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
>  	uint16_t nb_rx;
> +	struct rte_eth_fp_ops *p;

p is typically a very bad name in a funcion with
many pointer variables etc. May be "fpo" as in previous
patch?

> +	void *cb, *qd;

Please, avoid variable, expecially pointers, declaration in
one line.

I'd suggest to use 'rxq' instead of 'qd'. The first paramter
of the rx_pkt_burst is 'rxq'.

Also 'cb' seems to be used under RTE_ETHDEV_RXTX_CALLBACKS
only. If so, it could be unused variable warning if
RTE_ETHDEV_RXTX_CALLBACKS is not defined.

> +
> +#ifdef RTE_ETHDEV_DEBUG_RX
> +	if (port_id >= RTE_MAX_ETHPORTS ||
> +			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
> +		RTE_ETHDEV_LOG(ERR,
> +			"Invalid port_id=%u or queue_id=%u\n",
> +			port_id, queue_id);
> +		return 0;
> +	}
> +#endif
> +
> +	/* fetch pointer to queue data */
> +	p = &rte_eth_fp_ops[port_id];
> +	qd = p->rxq.data[queue_id];
>  
>  #ifdef RTE_ETHDEV_DEBUG_RX
>  	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);
> -	RTE_FUNC_PTR_OR_ERR_RET(*dev->rx_pkt_burst, 0);
>  
> -	if (queue_id >= dev->data->nb_rx_queues) {
> -		RTE_ETHDEV_LOG(ERR, "Invalid RX queue_id=%u\n", queue_id);
> +	if (qd == NULL) {
> +		RTE_ETHDEV_LOG(ERR, "Invalid RX queue_id=%u for port_id=%u\n",

RX -> Rx

> +			queue_id, port_id);
>  		return 0;
>  	}
>  #endif
> -	nb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],
> -				     rx_pkts, nb_pkts);
> +
> +	nb_rx = p->rx_pkt_burst(qd, rx_pkts, nb_pkts);
>  
>  #ifdef RTE_ETHDEV_RXTX_CALLBACKS
> -	struct rte_eth_rxtx_callback *cb;
>  
>  	/* __ATOMIC_RELEASE memory order was used when the
>  	 * call back was inserted into the list.
> @@ -5019,16 +5060,10 @@ rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,
>  	 * cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is
>  	 * not required.
>  	 */
> -	cb = __atomic_load_n(&dev->post_rx_burst_cbs[queue_id],
> -				__ATOMIC_RELAXED);
> -
> -	if (unlikely(cb != NULL)) {
> -		do {
> -			nb_rx = cb->fn.rx(port_id, queue_id, rx_pkts, nb_rx,
> -						nb_pkts, cb->param);
> -			cb = cb->next;
> -		} while (cb != NULL);
> -	}
> +	cb = __atomic_load_n((void **)&p->rxq.clbk[queue_id], __ATOMIC_RELAXED);
> +	if (unlikely(cb != NULL))
> +		nb_rx = rte_eth_call_rx_callbacks(port_id, queue_id, rx_pkts,
> +				nb_rx, nb_pkts, cb);
>  #endif
>  
>  	rte_ethdev_trace_rx_burst(port_id, queue_id, (void **)rx_pkts, nb_rx);
> @@ -5051,16 +5086,27 @@ rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,
>  static inline int
>  rte_eth_rx_queue_count(uint16_t port_id, uint16_t queue_id)
>  {
> -	struct rte_eth_dev *dev;
> +	struct rte_eth_fp_ops *p;
> +	void *qd;

p -> fpo, qd -> rxq

> +
> +	if (port_id >= RTE_MAX_ETHPORTS ||
> +			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
> +		RTE_ETHDEV_LOG(ERR,
> +			"Invalid port_id=%u or queue_id=%u\n",
> +			port_id, queue_id);
> +		return -EINVAL;
> +	}
> +
> +	/* fetch pointer to queue data */
> +	p = &rte_eth_fp_ops[port_id];
> +	qd = p->rxq.data[queue_id];
>  
>  	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);
> -	dev = &rte_eth_devices[port_id];
> -	RTE_FUNC_PTR_OR_ERR_RET(*dev->rx_queue_count, -ENOTSUP);
> -	if (queue_id >= dev->data->nb_rx_queues ||
> -	    dev->data->rx_queues[queue_id] == NULL)
> +	RTE_FUNC_PTR_OR_ERR_RET(*p->rx_queue_count, -ENOTSUP);
> +	if (qd == NULL)
>  		return -EINVAL;
>  
> -	return (int)(*dev->rx_queue_count)(dev->data->rx_queues[queue_id]);
> +	return (int)(*p->rx_queue_count)(qd);
>  }
>  
>  /**@{@name Rx hardware descriptor states
> @@ -5108,21 +5154,30 @@ static inline int
>  rte_eth_rx_descriptor_status(uint16_t port_id, uint16_t queue_id,
>  	uint16_t offset)
>  {
> -	struct rte_eth_dev *dev;
> -	void *rxq;
> +	struct rte_eth_fp_ops *p;
> +	void *qd;

p -> fpo, qd -> rxq

>  
>  #ifdef RTE_ETHDEV_DEBUG_RX
> -	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);
> +	if (port_id >= RTE_MAX_ETHPORTS ||
> +			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
> +		RTE_ETHDEV_LOG(ERR,
> +			"Invalid port_id=%u or queue_id=%u\n",
> +			port_id, queue_id);
> +		return -EINVAL;
> +	}
>  #endif
> -	dev = &rte_eth_devices[port_id];
> +
> +	/* fetch pointer to queue data */
> +	p = &rte_eth_fp_ops[port_id];
> +	qd = p->rxq.data[queue_id];
> +
>  #ifdef RTE_ETHDEV_DEBUG_RX
> -	if (queue_id >= dev->data->nb_rx_queues)
> +	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);
> +	if (qd == NULL)
>  		return -ENODEV;
>  #endif
> -	RTE_FUNC_PTR_OR_ERR_RET(*dev->rx_descriptor_status, -ENOTSUP);
> -	rxq = dev->data->rx_queues[queue_id];
> -
> -	return (*dev->rx_descriptor_status)(rxq, offset);
> +	RTE_FUNC_PTR_OR_ERR_RET(*p->rx_descriptor_status, -ENOTSUP);
> +	return (*p->rx_descriptor_status)(qd, offset);
>  }
>  
>  /**@{@name Tx hardware descriptor states
> @@ -5169,23 +5224,54 @@ rte_eth_rx_descriptor_status(uint16_t port_id, uint16_t queue_id,
>  static inline int rte_eth_tx_descriptor_status(uint16_t port_id,
>  	uint16_t queue_id, uint16_t offset)
>  {
> -	struct rte_eth_dev *dev;
> -	void *txq;
> +	struct rte_eth_fp_ops *p;
> +	void *qd;

p -> fpo, qd -> txq

>  
>  #ifdef RTE_ETHDEV_DEBUG_TX
> -	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);
> +	if (port_id >= RTE_MAX_ETHPORTS ||
> +			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
> +		RTE_ETHDEV_LOG(ERR,
> +			"Invalid port_id=%u or queue_id=%u\n",
> +			port_id, queue_id);
> +		return -EINVAL;
> +	}
>  #endif
> -	dev = &rte_eth_devices[port_id];
> +
> +	/* fetch pointer to queue data */
> +	p = &rte_eth_fp_ops[port_id];
> +	qd = p->txq.data[queue_id];
> +
>  #ifdef RTE_ETHDEV_DEBUG_TX
> -	if (queue_id >= dev->data->nb_tx_queues)
> +	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);
> +	if (qd == NULL)
>  		return -ENODEV;
>  #endif
> -	RTE_FUNC_PTR_OR_ERR_RET(*dev->tx_descriptor_status, -ENOTSUP);
> -	txq = dev->data->tx_queues[queue_id];
> -
> -	return (*dev->tx_descriptor_status)(txq, offset);
> +	RTE_FUNC_PTR_OR_ERR_RET(*p->tx_descriptor_status, -ENOTSUP);
> +	return (*p->tx_descriptor_status)(qd, offset);
>  }
>  
> +/**
> + * @internal
> + * Helper routine for eth driver tx_burst API.
> + * Should be called before entry PMD's rte_eth_tx_bulk implementation.
> + * Does necessary pre-processing - invokes TX callbacks if any, etc.

TX -> Tx

> + *
> + * @param port_id
> + *   The port identifier of the Ethernet device.
> + * @param queue_id
> + *   The index of the transmit queue through which output packets must be
> + *   sent.
> + * @param tx_pkts
> + *   The address of an array of *nb_pkts* pointers to *rte_mbuf* structures
> + *   which contain the output packets.

*nb_pkts* -> @p nb_pkts

> + * @param nb_pkts
> + *   The maximum number of packets to transmit.
> + * @return
> + *   The number of output packets to transmit.
> + */
> +uint16_t rte_eth_call_tx_callbacks(uint16_t port_id, uint16_t queue_id,
> +	struct rte_mbuf **tx_pkts, uint16_t nb_pkts, void *opaque);
> +
>  /**
>   * Send a burst of output packets on a transmit queue of an Ethernet device.
>   *
> @@ -5256,20 +5342,34 @@ static inline uint16_t
>  rte_eth_tx_burst(uint16_t port_id, uint16_t queue_id,
>  		 struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
>  {
> -	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
> +	struct rte_eth_fp_ops *p;
> +	void *cb, *qd;

Same as above

> +
> +#ifdef RTE_ETHDEV_DEBUG_TX
> +	if (port_id >= RTE_MAX_ETHPORTS ||
> +			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
> +		RTE_ETHDEV_LOG(ERR,
> +			"Invalid port_id=%u or queue_id=%u\n",
> +			port_id, queue_id);
> +		return 0;
> +	}
> +#endif
> +
> +	/* fetch pointer to queue data */
> +	p = &rte_eth_fp_ops[port_id];
> +	qd = p->txq.data[queue_id];
>  
>  #ifdef RTE_ETHDEV_DEBUG_TX
>  	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);
> -	RTE_FUNC_PTR_OR_ERR_RET(*dev->tx_pkt_burst, 0);
>  
> -	if (queue_id >= dev->data->nb_tx_queues) {
> -		RTE_ETHDEV_LOG(ERR, "Invalid TX queue_id=%u\n", queue_id);
> +	if (qd == NULL) {
> +		RTE_ETHDEV_LOG(ERR, "Invalid TX queue_id=%u for port_id=%u\n",

TX -> Tx

> +			queue_id, port_id);
>  		return 0;
>  	}
>  #endif
>  
>  #ifdef RTE_ETHDEV_RXTX_CALLBACKS
> -	struct rte_eth_rxtx_callback *cb;
>  
>  	/* __ATOMIC_RELEASE memory order was used when the
>  	 * call back was inserted into the list.
> @@ -5277,21 +5377,16 @@ rte_eth_tx_burst(uint16_t port_id, uint16_t queue_id,
>  	 * cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is
>  	 * not required.
>  	 */
> -	cb = __atomic_load_n(&dev->pre_tx_burst_cbs[queue_id],
> -				__ATOMIC_RELAXED);
> -
> -	if (unlikely(cb != NULL)) {
> -		do {
> -			nb_pkts = cb->fn.tx(port_id, queue_id, tx_pkts, nb_pkts,
> -					cb->param);
> -			cb = cb->next;
> -		} while (cb != NULL);
> -	}
> +	cb = __atomic_load_n((void **)&p->txq.clbk[queue_id], __ATOMIC_RELAXED);
> +	if (unlikely(cb != NULL))
> +		nb_pkts = rte_eth_call_tx_callbacks(port_id, queue_id, tx_pkts,
> +				nb_pkts, cb);
>  #endif
>  
> -	rte_ethdev_trace_tx_burst(port_id, queue_id, (void **)tx_pkts,
> -		nb_pkts);
> -	return (*dev->tx_pkt_burst)(dev->data->tx_queues[queue_id], tx_pkts, nb_pkts);
> +	nb_pkts = p->tx_pkt_burst(qd, tx_pkts, nb_pkts);
> +
> +	rte_ethdev_trace_tx_burst(port_id, queue_id, (void **)tx_pkts, nb_pkts);
> +	return nb_pkts;
>  }
>  
>  /**
> @@ -5354,31 +5449,42 @@ static inline uint16_t
>  rte_eth_tx_prepare(uint16_t port_id, uint16_t queue_id,
>  		struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
>  {
> -	struct rte_eth_dev *dev;
> +	struct rte_eth_fp_ops *p;
> +	void *qd;

p->fpo, qd->txq

>  
>  #ifdef RTE_ETHDEV_DEBUG_TX
> -	if (!rte_eth_dev_is_valid_port(port_id)) {
> -		RTE_ETHDEV_LOG(ERR, "Invalid TX port_id=%u\n", port_id);
> +	if (port_id >= RTE_MAX_ETHPORTS ||
> +			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
> +		RTE_ETHDEV_LOG(ERR,
> +			"Invalid port_id=%u or queue_id=%u\n",
> +			port_id, queue_id);
>  		rte_errno = ENODEV;
>  		return 0;
>  	}
>  #endif
>  
> -	dev = &rte_eth_devices[port_id];
> +	/* fetch pointer to queue data */
> +	p = &rte_eth_fp_ops[port_id];
> +	qd = p->txq.data[queue_id];
>  
>  #ifdef RTE_ETHDEV_DEBUG_TX
> -	if (queue_id >= dev->data->nb_tx_queues) {
> -		RTE_ETHDEV_LOG(ERR, "Invalid TX queue_id=%u\n", queue_id);
> +	if (!rte_eth_dev_is_valid_port(port_id)) {
> +		RTE_ETHDEV_LOG(ERR, "Invalid TX port_id=%u\n", port_id);

TX -> Tx

> +		rte_errno = ENODEV;
> +		return 0;
> +	}
> +	if (qd == NULL) {
> +		RTE_ETHDEV_LOG(ERR, "Invalid TX queue_id=%u for port_id=%u\n",

TX -> Tx

> +			queue_id, port_id);
>  		rte_errno = EINVAL;
>  		return 0;
>  	}
>  #endif
>  
> -	if (!dev->tx_pkt_prepare)
> +	if (!p->tx_pkt_prepare)

Please, change it to compare vs NULL since you touch the line.
Just to be consistent with DPDK coding style and lines above.

>  		return nb_pkts;
>  
> -	return (*dev->tx_pkt_prepare)(dev->data->tx_queues[queue_id],
> -			tx_pkts, nb_pkts);
> +	return p->tx_pkt_prepare(qd, tx_pkts, nb_pkts);
>  }
>  
>  #else
> diff --git a/lib/ethdev/version.map b/lib/ethdev/version.map
> index 904bce6ea1..79e62dcf61 100644
> --- a/lib/ethdev/version.map
> +++ b/lib/ethdev/version.map
> @@ -7,6 +7,8 @@ DPDK_22 {
>  	rte_eth_allmulticast_disable;
>  	rte_eth_allmulticast_enable;
>  	rte_eth_allmulticast_get;
> +	rte_eth_call_rx_callbacks;
> +	rte_eth_call_tx_callbacks;
>  	rte_eth_dev_adjust_nb_rx_tx_desc;
>  	rte_eth_dev_callback_register;
>  	rte_eth_dev_callback_unregister;
> @@ -76,6 +78,7 @@ DPDK_22 {
>  	rte_eth_find_next_of;
>  	rte_eth_find_next_owned_by;
>  	rte_eth_find_next_sibling;
> +	rte_eth_fp_ops;
>  	rte_eth_iterator_cleanup;
>  	rte_eth_iterator_init;
>  	rte_eth_iterator_next;
>
Ananyev, Konstantin Oct. 11, 2021, 3:47 p.m. UTC | #2
> 
> On 10/7/21 2:27 PM, Konstantin Ananyev wrote:
> > Rework fast-path ethdev functions to use rte_eth_fp_ops[].
> > While it is an API/ABI breakage, this change is intended to be
> > transparent for both users (no changes in user app is required) and
> > PMD developers (no changes in PMD is required).
> > One extra thing to note - RX/TX callback invocation will cause extra
> > function call with these changes. That might cause some insignificant
> > slowdown for code-path where RX/TX callbacks are heavily involved.
> 
> I'm sorry for nit picking here and below:
> 
> RX -> Rx, TX -> Tx everywhere above.
> 
> >
> > Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> > ---
> >  lib/ethdev/ethdev_private.c |  31 +++++
> >  lib/ethdev/rte_ethdev.h     | 242 ++++++++++++++++++++++++++----------
> >  lib/ethdev/version.map      |   3 +
> >  3 files changed, 208 insertions(+), 68 deletions(-)
> >
> > diff --git a/lib/ethdev/ethdev_private.c b/lib/ethdev/ethdev_private.c
> > index 3eeda6e9f9..1222c6f84e 100644
> > --- a/lib/ethdev/ethdev_private.c
> > +++ b/lib/ethdev/ethdev_private.c
> > @@ -226,3 +226,34 @@ eth_dev_fp_ops_setup(struct rte_eth_fp_ops *fpo,
> >  	fpo->txq.data = dev->data->tx_queues;
> >  	fpo->txq.clbk = (void **)(uintptr_t)dev->pre_tx_burst_cbs;
> >  }
> > +
> > +uint16_t
> > +rte_eth_call_rx_callbacks(uint16_t port_id, uint16_t queue_id,
> > +	struct rte_mbuf **rx_pkts, uint16_t nb_rx, uint16_t nb_pkts,
> > +	void *opaque)
> > +{
> > +	const struct rte_eth_rxtx_callback *cb = opaque;
> > +
> > +	while (cb != NULL) {
> > +		nb_rx = cb->fn.rx(port_id, queue_id, rx_pkts, nb_rx,
> > +				nb_pkts, cb->param);
> > +		cb = cb->next;
> > +	}
> > +
> > +	return nb_rx;
> > +}
> > +
> > +uint16_t
> > +rte_eth_call_tx_callbacks(uint16_t port_id, uint16_t queue_id,
> > +	struct rte_mbuf **tx_pkts, uint16_t nb_pkts, void *opaque)
> > +{
> > +	const struct rte_eth_rxtx_callback *cb = opaque;
> > +
> > +	while (cb != NULL) {
> > +		nb_pkts = cb->fn.tx(port_id, queue_id, tx_pkts, nb_pkts,
> > +				cb->param);
> > +		cb = cb->next;
> > +	}
> > +
> > +	return nb_pkts;
> > +}
> > diff --git a/lib/ethdev/rte_ethdev.h b/lib/ethdev/rte_ethdev.h
> > index cdd16d6e57..c0e1a40681 100644
> > --- a/lib/ethdev/rte_ethdev.h
> > +++ b/lib/ethdev/rte_ethdev.h
> > @@ -4904,6 +4904,33 @@ int rte_eth_representor_info_get(uint16_t port_id,
> >
> >  #include <rte_ethdev_core.h>
> >
> > +/**
> > + * @internal
> > + * Helper routine for eth driver rx_burst API.
> 
> rx -> Rx
> 
> > + * Should be called at exit from PMD's rte_eth_rx_bulk implementation.
> > + * Does necessary post-processing - invokes RX callbacks if any, etc.
> 
> RX -> Rx
> 
> > + *
> > + * @param port_id
> > + *  The port identifier of the Ethernet device.
> > + * @param queue_id
> > + *  The index of the receive queue from which to retrieve input packets.
> 
> Isn't:
> The index of the queue from which packets are received from?

I copied it from comments from rte_eth_rx_burst().
I suppose it is just two ways to say the same thing.

> 
> > + * @param rx_pkts
> > + *   The address of an array of pointers to *rte_mbuf* structures that
> > + *   have been retrieved from the device.
> > + * @param nb_pkts
> 
> Should be @param nb_rx

Ack, will fix. 

> 
> > + *   The number of packets that were retrieved from the device.
> > + * @param nb_pkts
> > + *   The number of elements in *rx_pkts* array.
> 
> @p should be used to refer to a paramter.

To be more precise you are talking about:
s/*rx_pkts*/@ rx_pkts/
?

> 
> The description does not help to understand why both nb_rx and
> nb_pkts are necessary. Isn't nb_pkts >= nb_rx and nb_rx
> sufficient?

Nope,  that's for callbacks call.
Will update the comment.
 
> > + * @param opaque
> > + *   Opaque pointer of RX queue callback related data.
> 
> RX -> Rx
> 
> > + *
> > + * @return
> > + *  The number of packets effectively supplied to the *rx_pkts* array.
> 
> @p should be used to refer to a parameter.
> 
> > + */
> > +uint16_t rte_eth_call_rx_callbacks(uint16_t port_id, uint16_t queue_id,
> > +		struct rte_mbuf **rx_pkts, uint16_t nb_rx, uint16_t nb_pkts,
> > +		void *opaque);
> > +
> >  /**
> >   *
> >   * Retrieve a burst of input packets from a receive queue of an Ethernet
> > @@ -4995,23 +5022,37 @@ static inline uint16_t
> >  rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,
> >  		 struct rte_mbuf **rx_pkts, const uint16_t nb_pkts)
> >  {
> > -	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
> >  	uint16_t nb_rx;
> > +	struct rte_eth_fp_ops *p;
> 
> p is typically a very bad name in a funcion with
> many pointer variables etc. May be "fpo" as in previous
> patch?
> 
> > +	void *cb, *qd;
> 
> Please, avoid variable, expecially pointers, declaration in
> one line.

Here and in other places, I think local variable names and placement,
is just a matter of personal preference.

> 
> I'd suggest to use 'rxq' instead of 'qd'. The first paramter
> of the rx_pkt_burst is 'rxq'.
> 
> Also 'cb' seems to be used under RTE_ETHDEV_RXTX_CALLBACKS
> only. If so, it could be unused variable warning if
> RTE_ETHDEV_RXTX_CALLBACKS is not defined.

Good point, will move it back under #ifdef RTE_ETHDEV_RXTX_CALLBACKS.
 
> > +
> > +#ifdef RTE_ETHDEV_DEBUG_RX
> > +	if (port_id >= RTE_MAX_ETHPORTS ||
> > +			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
> > +		RTE_ETHDEV_LOG(ERR,
> > +			"Invalid port_id=%u or queue_id=%u\n",
> > +			port_id, queue_id);
> > +		return 0;
> > +	}
> > +#endif
> > +
> > +	/* fetch pointer to queue data */
> > +	p = &rte_eth_fp_ops[port_id];
> > +	qd = p->rxq.data[queue_id];
> >
> >  #ifdef RTE_ETHDEV_DEBUG_RX
> >  	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);
> > -	RTE_FUNC_PTR_OR_ERR_RET(*dev->rx_pkt_burst, 0);
> >
> > -	if (queue_id >= dev->data->nb_rx_queues) {
> > -		RTE_ETHDEV_LOG(ERR, "Invalid RX queue_id=%u\n", queue_id);
> > +	if (qd == NULL) {
> > +		RTE_ETHDEV_LOG(ERR, "Invalid RX queue_id=%u for port_id=%u\n",
> 
> RX -> Rx
> 
> > +			queue_id, port_id);
> >  		return 0;
> >  	}
> >  #endif
> > -	nb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],
> > -				     rx_pkts, nb_pkts);
> > +
> > +	nb_rx = p->rx_pkt_burst(qd, rx_pkts, nb_pkts);
> >
> >  #ifdef RTE_ETHDEV_RXTX_CALLBACKS
> > -	struct rte_eth_rxtx_callback *cb;
> >
> >  	/* __ATOMIC_RELEASE memory order was used when the
> >  	 * call back was inserted into the list.
> > @@ -5019,16 +5060,10 @@ rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,
> >  	 * cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is
> >  	 * not required.
> >  	 */
> > -	cb = __atomic_load_n(&dev->post_rx_burst_cbs[queue_id],
> > -				__ATOMIC_RELAXED);
> > -
> > -	if (unlikely(cb != NULL)) {
> > -		do {
> > -			nb_rx = cb->fn.rx(port_id, queue_id, rx_pkts, nb_rx,
> > -						nb_pkts, cb->param);
> > -			cb = cb->next;
> > -		} while (cb != NULL);
> > -	}
> > +	cb = __atomic_load_n((void **)&p->rxq.clbk[queue_id], __ATOMIC_RELAXED);
> > +	if (unlikely(cb != NULL))
> > +		nb_rx = rte_eth_call_rx_callbacks(port_id, queue_id, rx_pkts,
> > +				nb_rx, nb_pkts, cb);
> >  #endif
> >
> >  	rte_ethdev_trace_rx_burst(port_id, queue_id, (void **)rx_pkts, nb_rx);
> > @@ -5051,16 +5086,27 @@ rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,
> >  static inline int
> >  rte_eth_rx_queue_count(uint16_t port_id, uint16_t queue_id)
> >  {
> > -	struct rte_eth_dev *dev;
> > +	struct rte_eth_fp_ops *p;
> > +	void *qd;
> 
> p -> fpo, qd -> rxq
> 
> > +
> > +	if (port_id >= RTE_MAX_ETHPORTS ||
> > +			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
> > +		RTE_ETHDEV_LOG(ERR,
> > +			"Invalid port_id=%u or queue_id=%u\n",
> > +			port_id, queue_id);
> > +		return -EINVAL;
> > +	}
> > +
> > +	/* fetch pointer to queue data */
> > +	p = &rte_eth_fp_ops[port_id];
> > +	qd = p->rxq.data[queue_id];
> >
> >  	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);
> > -	dev = &rte_eth_devices[port_id];
> > -	RTE_FUNC_PTR_OR_ERR_RET(*dev->rx_queue_count, -ENOTSUP);
> > -	if (queue_id >= dev->data->nb_rx_queues ||
> > -	    dev->data->rx_queues[queue_id] == NULL)
> > +	RTE_FUNC_PTR_OR_ERR_RET(*p->rx_queue_count, -ENOTSUP);
> > +	if (qd == NULL)
> >  		return -EINVAL;
> >
> > -	return (int)(*dev->rx_queue_count)(dev->data->rx_queues[queue_id]);
> > +	return (int)(*p->rx_queue_count)(qd);
> >  }
> >
> >  /**@{@name Rx hardware descriptor states
> > @@ -5108,21 +5154,30 @@ static inline int
> >  rte_eth_rx_descriptor_status(uint16_t port_id, uint16_t queue_id,
> >  	uint16_t offset)
> >  {
> > -	struct rte_eth_dev *dev;
> > -	void *rxq;
> > +	struct rte_eth_fp_ops *p;
> > +	void *qd;
> 
> p -> fpo, qd -> rxq
> 
> >
> >  #ifdef RTE_ETHDEV_DEBUG_RX
> > -	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);
> > +	if (port_id >= RTE_MAX_ETHPORTS ||
> > +			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
> > +		RTE_ETHDEV_LOG(ERR,
> > +			"Invalid port_id=%u or queue_id=%u\n",
> > +			port_id, queue_id);
> > +		return -EINVAL;
> > +	}
> >  #endif
> > -	dev = &rte_eth_devices[port_id];
> > +
> > +	/* fetch pointer to queue data */
> > +	p = &rte_eth_fp_ops[port_id];
> > +	qd = p->rxq.data[queue_id];
> > +
> >  #ifdef RTE_ETHDEV_DEBUG_RX
> > -	if (queue_id >= dev->data->nb_rx_queues)
> > +	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);
> > +	if (qd == NULL)
> >  		return -ENODEV;
> >  #endif
> > -	RTE_FUNC_PTR_OR_ERR_RET(*dev->rx_descriptor_status, -ENOTSUP);
> > -	rxq = dev->data->rx_queues[queue_id];
> > -
> > -	return (*dev->rx_descriptor_status)(rxq, offset);
> > +	RTE_FUNC_PTR_OR_ERR_RET(*p->rx_descriptor_status, -ENOTSUP);
> > +	return (*p->rx_descriptor_status)(qd, offset);
> >  }
> >
> >  /**@{@name Tx hardware descriptor states
> > @@ -5169,23 +5224,54 @@ rte_eth_rx_descriptor_status(uint16_t port_id, uint16_t queue_id,
> >  static inline int rte_eth_tx_descriptor_status(uint16_t port_id,
> >  	uint16_t queue_id, uint16_t offset)
> >  {
> > -	struct rte_eth_dev *dev;
> > -	void *txq;
> > +	struct rte_eth_fp_ops *p;
> > +	void *qd;
> 
> p -> fpo, qd -> txq
> 
> >
> >  #ifdef RTE_ETHDEV_DEBUG_TX
> > -	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);
> > +	if (port_id >= RTE_MAX_ETHPORTS ||
> > +			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
> > +		RTE_ETHDEV_LOG(ERR,
> > +			"Invalid port_id=%u or queue_id=%u\n",
> > +			port_id, queue_id);
> > +		return -EINVAL;
> > +	}
> >  #endif
> > -	dev = &rte_eth_devices[port_id];
> > +
> > +	/* fetch pointer to queue data */
> > +	p = &rte_eth_fp_ops[port_id];
> > +	qd = p->txq.data[queue_id];
> > +
> >  #ifdef RTE_ETHDEV_DEBUG_TX
> > -	if (queue_id >= dev->data->nb_tx_queues)
> > +	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);
> > +	if (qd == NULL)
> >  		return -ENODEV;
> >  #endif
> > -	RTE_FUNC_PTR_OR_ERR_RET(*dev->tx_descriptor_status, -ENOTSUP);
> > -	txq = dev->data->tx_queues[queue_id];
> > -
> > -	return (*dev->tx_descriptor_status)(txq, offset);
> > +	RTE_FUNC_PTR_OR_ERR_RET(*p->tx_descriptor_status, -ENOTSUP);
> > +	return (*p->tx_descriptor_status)(qd, offset);
> >  }
> >
> > +/**
> > + * @internal
> > + * Helper routine for eth driver tx_burst API.
> > + * Should be called before entry PMD's rte_eth_tx_bulk implementation.
> > + * Does necessary pre-processing - invokes TX callbacks if any, etc.
> 
> TX -> Tx
> 
> > + *
> > + * @param port_id
> > + *   The port identifier of the Ethernet device.
> > + * @param queue_id
> > + *   The index of the transmit queue through which output packets must be
> > + *   sent.
> > + * @param tx_pkts
> > + *   The address of an array of *nb_pkts* pointers to *rte_mbuf* structures
> > + *   which contain the output packets.
> 
> *nb_pkts* -> @p nb_pkts
> 
> > + * @param nb_pkts
> > + *   The maximum number of packets to transmit.
> > + * @return
> > + *   The number of output packets to transmit.
> > + */
> > +uint16_t rte_eth_call_tx_callbacks(uint16_t port_id, uint16_t queue_id,
> > +	struct rte_mbuf **tx_pkts, uint16_t nb_pkts, void *opaque);
> > +
> >  /**
> >   * Send a burst of output packets on a transmit queue of an Ethernet device.
> >   *
> > @@ -5256,20 +5342,34 @@ static inline uint16_t
> >  rte_eth_tx_burst(uint16_t port_id, uint16_t queue_id,
> >  		 struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
> >  {
> > -	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
> > +	struct rte_eth_fp_ops *p;
> > +	void *cb, *qd;
> 
> Same as above
> 
> > +
> > +#ifdef RTE_ETHDEV_DEBUG_TX
> > +	if (port_id >= RTE_MAX_ETHPORTS ||
> > +			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
> > +		RTE_ETHDEV_LOG(ERR,
> > +			"Invalid port_id=%u or queue_id=%u\n",
> > +			port_id, queue_id);
> > +		return 0;
> > +	}
> > +#endif
> > +
> > +	/* fetch pointer to queue data */
> > +	p = &rte_eth_fp_ops[port_id];
> > +	qd = p->txq.data[queue_id];
> >
> >  #ifdef RTE_ETHDEV_DEBUG_TX
> >  	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);
> > -	RTE_FUNC_PTR_OR_ERR_RET(*dev->tx_pkt_burst, 0);
> >
> > -	if (queue_id >= dev->data->nb_tx_queues) {
> > -		RTE_ETHDEV_LOG(ERR, "Invalid TX queue_id=%u\n", queue_id);
> > +	if (qd == NULL) {
> > +		RTE_ETHDEV_LOG(ERR, "Invalid TX queue_id=%u for port_id=%u\n",
> 
> TX -> Tx
> 
> > +			queue_id, port_id);
> >  		return 0;
> >  	}
> >  #endif
> >
> >  #ifdef RTE_ETHDEV_RXTX_CALLBACKS
> > -	struct rte_eth_rxtx_callback *cb;
> >
> >  	/* __ATOMIC_RELEASE memory order was used when the
> >  	 * call back was inserted into the list.
> > @@ -5277,21 +5377,16 @@ rte_eth_tx_burst(uint16_t port_id, uint16_t queue_id,
> >  	 * cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is
> >  	 * not required.
> >  	 */
> > -	cb = __atomic_load_n(&dev->pre_tx_burst_cbs[queue_id],
> > -				__ATOMIC_RELAXED);
> > -
> > -	if (unlikely(cb != NULL)) {
> > -		do {
> > -			nb_pkts = cb->fn.tx(port_id, queue_id, tx_pkts, nb_pkts,
> > -					cb->param);
> > -			cb = cb->next;
> > -		} while (cb != NULL);
> > -	}
> > +	cb = __atomic_load_n((void **)&p->txq.clbk[queue_id], __ATOMIC_RELAXED);
> > +	if (unlikely(cb != NULL))
> > +		nb_pkts = rte_eth_call_tx_callbacks(port_id, queue_id, tx_pkts,
> > +				nb_pkts, cb);
> >  #endif
> >
> > -	rte_ethdev_trace_tx_burst(port_id, queue_id, (void **)tx_pkts,
> > -		nb_pkts);
> > -	return (*dev->tx_pkt_burst)(dev->data->tx_queues[queue_id], tx_pkts, nb_pkts);
> > +	nb_pkts = p->tx_pkt_burst(qd, tx_pkts, nb_pkts);
> > +
> > +	rte_ethdev_trace_tx_burst(port_id, queue_id, (void **)tx_pkts, nb_pkts);
> > +	return nb_pkts;
> >  }
> >
> >  /**
> > @@ -5354,31 +5449,42 @@ static inline uint16_t
> >  rte_eth_tx_prepare(uint16_t port_id, uint16_t queue_id,
> >  		struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
> >  {
> > -	struct rte_eth_dev *dev;
> > +	struct rte_eth_fp_ops *p;
> > +	void *qd;
> 
> p->fpo, qd->txq
> 
> >
> >  #ifdef RTE_ETHDEV_DEBUG_TX
> > -	if (!rte_eth_dev_is_valid_port(port_id)) {
> > -		RTE_ETHDEV_LOG(ERR, "Invalid TX port_id=%u\n", port_id);
> > +	if (port_id >= RTE_MAX_ETHPORTS ||
> > +			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
> > +		RTE_ETHDEV_LOG(ERR,
> > +			"Invalid port_id=%u or queue_id=%u\n",
> > +			port_id, queue_id);
> >  		rte_errno = ENODEV;
> >  		return 0;
> >  	}
> >  #endif
> >
> > -	dev = &rte_eth_devices[port_id];
> > +	/* fetch pointer to queue data */
> > +	p = &rte_eth_fp_ops[port_id];
> > +	qd = p->txq.data[queue_id];
> >
> >  #ifdef RTE_ETHDEV_DEBUG_TX
> > -	if (queue_id >= dev->data->nb_tx_queues) {
> > -		RTE_ETHDEV_LOG(ERR, "Invalid TX queue_id=%u\n", queue_id);
> > +	if (!rte_eth_dev_is_valid_port(port_id)) {
> > +		RTE_ETHDEV_LOG(ERR, "Invalid TX port_id=%u\n", port_id);
> 
> TX -> Tx
> 
> > +		rte_errno = ENODEV;
> > +		return 0;
> > +	}
> > +	if (qd == NULL) {
> > +		RTE_ETHDEV_LOG(ERR, "Invalid TX queue_id=%u for port_id=%u\n",
> 
> TX -> Tx
> 
> > +			queue_id, port_id);
> >  		rte_errno = EINVAL;
> >  		return 0;
> >  	}
> >  #endif
> >
> > -	if (!dev->tx_pkt_prepare)
> > +	if (!p->tx_pkt_prepare)
> 
> Please, change it to compare vs NULL since you touch the line.
> Just to be consistent with DPDK coding style and lines above.

Ok, I am also fond of explicit comparisons :) 

 
> >  		return nb_pkts;
> >
> > -	return (*dev->tx_pkt_prepare)(dev->data->tx_queues[queue_id],
> > -			tx_pkts, nb_pkts);
> > +	return p->tx_pkt_prepare(qd, tx_pkts, nb_pkts);
> >  }
> >
> >  #else
> > diff --git a/lib/ethdev/version.map b/lib/ethdev/version.map
> > index 904bce6ea1..79e62dcf61 100644
> > --- a/lib/ethdev/version.map
> > +++ b/lib/ethdev/version.map
> > @@ -7,6 +7,8 @@ DPDK_22 {
> >  	rte_eth_allmulticast_disable;
> >  	rte_eth_allmulticast_enable;
> >  	rte_eth_allmulticast_get;
> > +	rte_eth_call_rx_callbacks;
> > +	rte_eth_call_tx_callbacks;
> >  	rte_eth_dev_adjust_nb_rx_tx_desc;
> >  	rte_eth_dev_callback_register;
> >  	rte_eth_dev_callback_unregister;
> > @@ -76,6 +78,7 @@ DPDK_22 {
> >  	rte_eth_find_next_of;
> >  	rte_eth_find_next_owned_by;
> >  	rte_eth_find_next_sibling;
> > +	rte_eth_fp_ops;
> >  	rte_eth_iterator_cleanup;
> >  	rte_eth_iterator_init;
> >  	rte_eth_iterator_next;
> >
Andrew Rybchenko Oct. 11, 2021, 5:03 p.m. UTC | #3
On 10/11/21 6:47 PM, Ananyev, Konstantin wrote:
> 
>>
>> On 10/7/21 2:27 PM, Konstantin Ananyev wrote:
>>> Rework fast-path ethdev functions to use rte_eth_fp_ops[].
>>> While it is an API/ABI breakage, this change is intended to be
>>> transparent for both users (no changes in user app is required) and
>>> PMD developers (no changes in PMD is required).
>>> One extra thing to note - RX/TX callback invocation will cause extra
>>> function call with these changes. That might cause some insignificant
>>> slowdown for code-path where RX/TX callbacks are heavily involved.
>>
>> I'm sorry for nit picking here and below:
>>
>> RX -> Rx, TX -> Tx everywhere above.
>>
>>>
>>> Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
>>> ---
>>>   lib/ethdev/ethdev_private.c |  31 +++++
>>>   lib/ethdev/rte_ethdev.h     | 242 ++++++++++++++++++++++++++----------
>>>   lib/ethdev/version.map      |   3 +
>>>   3 files changed, 208 insertions(+), 68 deletions(-)
>>>
>>> diff --git a/lib/ethdev/ethdev_private.c b/lib/ethdev/ethdev_private.c
>>> index 3eeda6e9f9..1222c6f84e 100644
>>> --- a/lib/ethdev/ethdev_private.c
>>> +++ b/lib/ethdev/ethdev_private.c
>>> @@ -226,3 +226,34 @@ eth_dev_fp_ops_setup(struct rte_eth_fp_ops *fpo,
>>>   	fpo->txq.data = dev->data->tx_queues;
>>>   	fpo->txq.clbk = (void **)(uintptr_t)dev->pre_tx_burst_cbs;
>>>   }
>>> +
>>> +uint16_t
>>> +rte_eth_call_rx_callbacks(uint16_t port_id, uint16_t queue_id,
>>> +	struct rte_mbuf **rx_pkts, uint16_t nb_rx, uint16_t nb_pkts,
>>> +	void *opaque)
>>> +{
>>> +	const struct rte_eth_rxtx_callback *cb = opaque;
>>> +
>>> +	while (cb != NULL) {
>>> +		nb_rx = cb->fn.rx(port_id, queue_id, rx_pkts, nb_rx,
>>> +				nb_pkts, cb->param);
>>> +		cb = cb->next;
>>> +	}
>>> +
>>> +	return nb_rx;
>>> +}
>>> +
>>> +uint16_t
>>> +rte_eth_call_tx_callbacks(uint16_t port_id, uint16_t queue_id,
>>> +	struct rte_mbuf **tx_pkts, uint16_t nb_pkts, void *opaque)
>>> +{
>>> +	const struct rte_eth_rxtx_callback *cb = opaque;
>>> +
>>> +	while (cb != NULL) {
>>> +		nb_pkts = cb->fn.tx(port_id, queue_id, tx_pkts, nb_pkts,
>>> +				cb->param);
>>> +		cb = cb->next;
>>> +	}
>>> +
>>> +	return nb_pkts;
>>> +}
>>> diff --git a/lib/ethdev/rte_ethdev.h b/lib/ethdev/rte_ethdev.h
>>> index cdd16d6e57..c0e1a40681 100644
>>> --- a/lib/ethdev/rte_ethdev.h
>>> +++ b/lib/ethdev/rte_ethdev.h
>>> @@ -4904,6 +4904,33 @@ int rte_eth_representor_info_get(uint16_t port_id,
>>>
>>>   #include <rte_ethdev_core.h>
>>>
>>> +/**
>>> + * @internal
>>> + * Helper routine for eth driver rx_burst API.
>>
>> rx -> Rx
>>
>>> + * Should be called at exit from PMD's rte_eth_rx_bulk implementation.
>>> + * Does necessary post-processing - invokes RX callbacks if any, etc.
>>
>> RX -> Rx
>>
>>> + *
>>> + * @param port_id
>>> + *  The port identifier of the Ethernet device.
>>> + * @param queue_id
>>> + *  The index of the receive queue from which to retrieve input packets.
>>
>> Isn't:
>> The index of the queue from which packets are received from?
> 
> I copied it from comments from rte_eth_rx_burst().
> I suppose it is just two ways to say the same thing.

May be it is just my problem that I don't understand the
initial description.

> 
>>
>>> + * @param rx_pkts
>>> + *   The address of an array of pointers to *rte_mbuf* structures that
>>> + *   have been retrieved from the device.
>>> + * @param nb_pkts
>>
>> Should be @param nb_rx
> 
> Ack, will fix.
> 
>>
>>> + *   The number of packets that were retrieved from the device.
>>> + * @param nb_pkts
>>> + *   The number of elements in *rx_pkts* array.
>>
>> @p should be used to refer to a paramter.
> 
> To be more precise you are talking about:
> s/*rx_pkts*/@ rx_pkts/

s/"rx_pkts"/@p rx_pkts/

> ?
> 
>>
>> The description does not help to understand why both nb_rx and
>> nb_pkts are necessary. Isn't nb_pkts >= nb_rx and nb_rx
>> sufficient?
> 
> Nope,  that's for callbacks call.
> Will update the comment.

Thanks.

>>> + * @param opaque
>>> + *   Opaque pointer of RX queue callback related data.
>>
>> RX -> Rx
>>
>>> + *
>>> + * @return
>>> + *  The number of packets effectively supplied to the *rx_pkts* array.
>>
>> @p should be used to refer to a parameter.
>>
>>> + */
>>> +uint16_t rte_eth_call_rx_callbacks(uint16_t port_id, uint16_t queue_id,
>>> +		struct rte_mbuf **rx_pkts, uint16_t nb_rx, uint16_t nb_pkts,
>>> +		void *opaque);
>>> +
>>>   /**
>>>    *
>>>    * Retrieve a burst of input packets from a receive queue of an Ethernet
>>> @@ -4995,23 +5022,37 @@ static inline uint16_t
>>>   rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,
>>>   		 struct rte_mbuf **rx_pkts, const uint16_t nb_pkts)
>>>   {
>>> -	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
>>>   	uint16_t nb_rx;
>>> +	struct rte_eth_fp_ops *p;
>>
>> p is typically a very bad name in a funcion with
>> many pointer variables etc. May be "fpo" as in previous
>> patch?
>>
>>> +	void *cb, *qd;
>>
>> Please, avoid variable, expecially pointers, declaration in
>> one line.
> 
> Here and in other places, I think local variable names and placement,
> is just a matter of personal preference.

Of course you can drop my notes as long as I'm alone.
I've started my comment from "Please" :)
May be I'm asking too much.

Also, I'm sorry, but I stricly against 'p' name since such
naming makes code harder to read.

> 
>>
>> I'd suggest to use 'rxq' instead of 'qd'. The first paramter
>> of the rx_pkt_burst is 'rxq'.
>>
>> Also 'cb' seems to be used under RTE_ETHDEV_RXTX_CALLBACKS
>> only. If so, it could be unused variable warning if
>> RTE_ETHDEV_RXTX_CALLBACKS is not defined.
> 
> Good point, will move it back under #ifdef RTE_ETHDEV_RXTX_CALLBACKS.
>   
>>> +
>>> +#ifdef RTE_ETHDEV_DEBUG_RX
>>> +	if (port_id >= RTE_MAX_ETHPORTS ||
>>> +			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
>>> +		RTE_ETHDEV_LOG(ERR,
>>> +			"Invalid port_id=%u or queue_id=%u\n",
>>> +			port_id, queue_id);
>>> +		return 0;
>>> +	}
>>> +#endif
>>> +
>>> +	/* fetch pointer to queue data */
>>> +	p = &rte_eth_fp_ops[port_id];
>>> +	qd = p->rxq.data[queue_id];
>>>
>>>   #ifdef RTE_ETHDEV_DEBUG_RX
>>>   	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);
>>> -	RTE_FUNC_PTR_OR_ERR_RET(*dev->rx_pkt_burst, 0);
>>>
>>> -	if (queue_id >= dev->data->nb_rx_queues) {
>>> -		RTE_ETHDEV_LOG(ERR, "Invalid RX queue_id=%u\n", queue_id);
>>> +	if (qd == NULL) {
>>> +		RTE_ETHDEV_LOG(ERR, "Invalid RX queue_id=%u for port_id=%u\n",
>>
>> RX -> Rx
>>
>>> +			queue_id, port_id);
>>>   		return 0;
>>>   	}
>>>   #endif
>>> -	nb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],
>>> -				     rx_pkts, nb_pkts);
>>> +
>>> +	nb_rx = p->rx_pkt_burst(qd, rx_pkts, nb_pkts);
>>>
>>>   #ifdef RTE_ETHDEV_RXTX_CALLBACKS
>>> -	struct rte_eth_rxtx_callback *cb;
>>>
>>>   	/* __ATOMIC_RELEASE memory order was used when the
>>>   	 * call back was inserted into the list.
>>> @@ -5019,16 +5060,10 @@ rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,
>>>   	 * cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is
>>>   	 * not required.
>>>   	 */
>>> -	cb = __atomic_load_n(&dev->post_rx_burst_cbs[queue_id],
>>> -				__ATOMIC_RELAXED);
>>> -
>>> -	if (unlikely(cb != NULL)) {
>>> -		do {
>>> -			nb_rx = cb->fn.rx(port_id, queue_id, rx_pkts, nb_rx,
>>> -						nb_pkts, cb->param);
>>> -			cb = cb->next;
>>> -		} while (cb != NULL);
>>> -	}
>>> +	cb = __atomic_load_n((void **)&p->rxq.clbk[queue_id], __ATOMIC_RELAXED);
>>> +	if (unlikely(cb != NULL))
>>> +		nb_rx = rte_eth_call_rx_callbacks(port_id, queue_id, rx_pkts,
>>> +				nb_rx, nb_pkts, cb);
>>>   #endif
>>>
>>>   	rte_ethdev_trace_rx_burst(port_id, queue_id, (void **)rx_pkts, nb_rx);
>>> @@ -5051,16 +5086,27 @@ rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,
>>>   static inline int
>>>   rte_eth_rx_queue_count(uint16_t port_id, uint16_t queue_id)
>>>   {
>>> -	struct rte_eth_dev *dev;
>>> +	struct rte_eth_fp_ops *p;
>>> +	void *qd;
>>
>> p -> fpo, qd -> rxq
>>
>>> +
>>> +	if (port_id >= RTE_MAX_ETHPORTS ||
>>> +			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
>>> +		RTE_ETHDEV_LOG(ERR,
>>> +			"Invalid port_id=%u or queue_id=%u\n",
>>> +			port_id, queue_id);
>>> +		return -EINVAL;
>>> +	}
>>> +
>>> +	/* fetch pointer to queue data */
>>> +	p = &rte_eth_fp_ops[port_id];
>>> +	qd = p->rxq.data[queue_id];
>>>
>>>   	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);
>>> -	dev = &rte_eth_devices[port_id];
>>> -	RTE_FUNC_PTR_OR_ERR_RET(*dev->rx_queue_count, -ENOTSUP);
>>> -	if (queue_id >= dev->data->nb_rx_queues ||
>>> -	    dev->data->rx_queues[queue_id] == NULL)
>>> +	RTE_FUNC_PTR_OR_ERR_RET(*p->rx_queue_count, -ENOTSUP);
>>> +	if (qd == NULL)
>>>   		return -EINVAL;
>>>
>>> -	return (int)(*dev->rx_queue_count)(dev->data->rx_queues[queue_id]);
>>> +	return (int)(*p->rx_queue_count)(qd);
>>>   }
>>>
>>>   /**@{@name Rx hardware descriptor states
>>> @@ -5108,21 +5154,30 @@ static inline int
>>>   rte_eth_rx_descriptor_status(uint16_t port_id, uint16_t queue_id,
>>>   	uint16_t offset)
>>>   {
>>> -	struct rte_eth_dev *dev;
>>> -	void *rxq;
>>> +	struct rte_eth_fp_ops *p;
>>> +	void *qd;
>>
>> p -> fpo, qd -> rxq
>>
>>>
>>>   #ifdef RTE_ETHDEV_DEBUG_RX
>>> -	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);
>>> +	if (port_id >= RTE_MAX_ETHPORTS ||
>>> +			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
>>> +		RTE_ETHDEV_LOG(ERR,
>>> +			"Invalid port_id=%u or queue_id=%u\n",
>>> +			port_id, queue_id);
>>> +		return -EINVAL;
>>> +	}
>>>   #endif
>>> -	dev = &rte_eth_devices[port_id];
>>> +
>>> +	/* fetch pointer to queue data */
>>> +	p = &rte_eth_fp_ops[port_id];
>>> +	qd = p->rxq.data[queue_id];
>>> +
>>>   #ifdef RTE_ETHDEV_DEBUG_RX
>>> -	if (queue_id >= dev->data->nb_rx_queues)
>>> +	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);
>>> +	if (qd == NULL)
>>>   		return -ENODEV;
>>>   #endif
>>> -	RTE_FUNC_PTR_OR_ERR_RET(*dev->rx_descriptor_status, -ENOTSUP);
>>> -	rxq = dev->data->rx_queues[queue_id];
>>> -
>>> -	return (*dev->rx_descriptor_status)(rxq, offset);
>>> +	RTE_FUNC_PTR_OR_ERR_RET(*p->rx_descriptor_status, -ENOTSUP);
>>> +	return (*p->rx_descriptor_status)(qd, offset);
>>>   }
>>>
>>>   /**@{@name Tx hardware descriptor states
>>> @@ -5169,23 +5224,54 @@ rte_eth_rx_descriptor_status(uint16_t port_id, uint16_t queue_id,
>>>   static inline int rte_eth_tx_descriptor_status(uint16_t port_id,
>>>   	uint16_t queue_id, uint16_t offset)
>>>   {
>>> -	struct rte_eth_dev *dev;
>>> -	void *txq;
>>> +	struct rte_eth_fp_ops *p;
>>> +	void *qd;
>>
>> p -> fpo, qd -> txq
>>
>>>
>>>   #ifdef RTE_ETHDEV_DEBUG_TX
>>> -	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);
>>> +	if (port_id >= RTE_MAX_ETHPORTS ||
>>> +			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
>>> +		RTE_ETHDEV_LOG(ERR,
>>> +			"Invalid port_id=%u or queue_id=%u\n",
>>> +			port_id, queue_id);
>>> +		return -EINVAL;
>>> +	}
>>>   #endif
>>> -	dev = &rte_eth_devices[port_id];
>>> +
>>> +	/* fetch pointer to queue data */
>>> +	p = &rte_eth_fp_ops[port_id];
>>> +	qd = p->txq.data[queue_id];
>>> +
>>>   #ifdef RTE_ETHDEV_DEBUG_TX
>>> -	if (queue_id >= dev->data->nb_tx_queues)
>>> +	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);
>>> +	if (qd == NULL)
>>>   		return -ENODEV;
>>>   #endif
>>> -	RTE_FUNC_PTR_OR_ERR_RET(*dev->tx_descriptor_status, -ENOTSUP);
>>> -	txq = dev->data->tx_queues[queue_id];
>>> -
>>> -	return (*dev->tx_descriptor_status)(txq, offset);
>>> +	RTE_FUNC_PTR_OR_ERR_RET(*p->tx_descriptor_status, -ENOTSUP);
>>> +	return (*p->tx_descriptor_status)(qd, offset);
>>>   }
>>>
>>> +/**
>>> + * @internal
>>> + * Helper routine for eth driver tx_burst API.
>>> + * Should be called before entry PMD's rte_eth_tx_bulk implementation.
>>> + * Does necessary pre-processing - invokes TX callbacks if any, etc.
>>
>> TX -> Tx
>>
>>> + *
>>> + * @param port_id
>>> + *   The port identifier of the Ethernet device.
>>> + * @param queue_id
>>> + *   The index of the transmit queue through which output packets must be
>>> + *   sent.
>>> + * @param tx_pkts
>>> + *   The address of an array of *nb_pkts* pointers to *rte_mbuf* structures
>>> + *   which contain the output packets.
>>
>> *nb_pkts* -> @p nb_pkts
>>
>>> + * @param nb_pkts
>>> + *   The maximum number of packets to transmit.
>>> + * @return
>>> + *   The number of output packets to transmit.
>>> + */
>>> +uint16_t rte_eth_call_tx_callbacks(uint16_t port_id, uint16_t queue_id,
>>> +	struct rte_mbuf **tx_pkts, uint16_t nb_pkts, void *opaque);
>>> +
>>>   /**
>>>    * Send a burst of output packets on a transmit queue of an Ethernet device.
>>>    *
>>> @@ -5256,20 +5342,34 @@ static inline uint16_t
>>>   rte_eth_tx_burst(uint16_t port_id, uint16_t queue_id,
>>>   		 struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
>>>   {
>>> -	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
>>> +	struct rte_eth_fp_ops *p;
>>> +	void *cb, *qd;
>>
>> Same as above
>>
>>> +
>>> +#ifdef RTE_ETHDEV_DEBUG_TX
>>> +	if (port_id >= RTE_MAX_ETHPORTS ||
>>> +			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
>>> +		RTE_ETHDEV_LOG(ERR,
>>> +			"Invalid port_id=%u or queue_id=%u\n",
>>> +			port_id, queue_id);
>>> +		return 0;
>>> +	}
>>> +#endif
>>> +
>>> +	/* fetch pointer to queue data */
>>> +	p = &rte_eth_fp_ops[port_id];
>>> +	qd = p->txq.data[queue_id];
>>>
>>>   #ifdef RTE_ETHDEV_DEBUG_TX
>>>   	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);
>>> -	RTE_FUNC_PTR_OR_ERR_RET(*dev->tx_pkt_burst, 0);
>>>
>>> -	if (queue_id >= dev->data->nb_tx_queues) {
>>> -		RTE_ETHDEV_LOG(ERR, "Invalid TX queue_id=%u\n", queue_id);
>>> +	if (qd == NULL) {
>>> +		RTE_ETHDEV_LOG(ERR, "Invalid TX queue_id=%u for port_id=%u\n",
>>
>> TX -> Tx
>>
>>> +			queue_id, port_id);
>>>   		return 0;
>>>   	}
>>>   #endif
>>>
>>>   #ifdef RTE_ETHDEV_RXTX_CALLBACKS
>>> -	struct rte_eth_rxtx_callback *cb;
>>>
>>>   	/* __ATOMIC_RELEASE memory order was used when the
>>>   	 * call back was inserted into the list.
>>> @@ -5277,21 +5377,16 @@ rte_eth_tx_burst(uint16_t port_id, uint16_t queue_id,
>>>   	 * cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is
>>>   	 * not required.
>>>   	 */
>>> -	cb = __atomic_load_n(&dev->pre_tx_burst_cbs[queue_id],
>>> -				__ATOMIC_RELAXED);
>>> -
>>> -	if (unlikely(cb != NULL)) {
>>> -		do {
>>> -			nb_pkts = cb->fn.tx(port_id, queue_id, tx_pkts, nb_pkts,
>>> -					cb->param);
>>> -			cb = cb->next;
>>> -		} while (cb != NULL);
>>> -	}
>>> +	cb = __atomic_load_n((void **)&p->txq.clbk[queue_id], __ATOMIC_RELAXED);
>>> +	if (unlikely(cb != NULL))
>>> +		nb_pkts = rte_eth_call_tx_callbacks(port_id, queue_id, tx_pkts,
>>> +				nb_pkts, cb);
>>>   #endif
>>>
>>> -	rte_ethdev_trace_tx_burst(port_id, queue_id, (void **)tx_pkts,
>>> -		nb_pkts);
>>> -	return (*dev->tx_pkt_burst)(dev->data->tx_queues[queue_id], tx_pkts, nb_pkts);
>>> +	nb_pkts = p->tx_pkt_burst(qd, tx_pkts, nb_pkts);
>>> +
>>> +	rte_ethdev_trace_tx_burst(port_id, queue_id, (void **)tx_pkts, nb_pkts);
>>> +	return nb_pkts;
>>>   }
>>>
>>>   /**
>>> @@ -5354,31 +5449,42 @@ static inline uint16_t
>>>   rte_eth_tx_prepare(uint16_t port_id, uint16_t queue_id,
>>>   		struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
>>>   {
>>> -	struct rte_eth_dev *dev;
>>> +	struct rte_eth_fp_ops *p;
>>> +	void *qd;
>>
>> p->fpo, qd->txq
>>
>>>
>>>   #ifdef RTE_ETHDEV_DEBUG_TX
>>> -	if (!rte_eth_dev_is_valid_port(port_id)) {
>>> -		RTE_ETHDEV_LOG(ERR, "Invalid TX port_id=%u\n", port_id);
>>> +	if (port_id >= RTE_MAX_ETHPORTS ||
>>> +			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
>>> +		RTE_ETHDEV_LOG(ERR,
>>> +			"Invalid port_id=%u or queue_id=%u\n",
>>> +			port_id, queue_id);
>>>   		rte_errno = ENODEV;
>>>   		return 0;
>>>   	}
>>>   #endif
>>>
>>> -	dev = &rte_eth_devices[port_id];
>>> +	/* fetch pointer to queue data */
>>> +	p = &rte_eth_fp_ops[port_id];
>>> +	qd = p->txq.data[queue_id];
>>>
>>>   #ifdef RTE_ETHDEV_DEBUG_TX
>>> -	if (queue_id >= dev->data->nb_tx_queues) {
>>> -		RTE_ETHDEV_LOG(ERR, "Invalid TX queue_id=%u\n", queue_id);
>>> +	if (!rte_eth_dev_is_valid_port(port_id)) {
>>> +		RTE_ETHDEV_LOG(ERR, "Invalid TX port_id=%u\n", port_id);
>>
>> TX -> Tx
>>
>>> +		rte_errno = ENODEV;
>>> +		return 0;
>>> +	}
>>> +	if (qd == NULL) {
>>> +		RTE_ETHDEV_LOG(ERR, "Invalid TX queue_id=%u for port_id=%u\n",
>>
>> TX -> Tx
>>
>>> +			queue_id, port_id);
>>>   		rte_errno = EINVAL;
>>>   		return 0;
>>>   	}
>>>   #endif
>>>
>>> -	if (!dev->tx_pkt_prepare)
>>> +	if (!p->tx_pkt_prepare)
>>
>> Please, change it to compare vs NULL since you touch the line.
>> Just to be consistent with DPDK coding style and lines above.
> 
> Ok, I am also fond of explicit comparisons :)

Thanks.

> 
>   
>>>   		return nb_pkts;
>>>
>>> -	return (*dev->tx_pkt_prepare)(dev->data->tx_queues[queue_id],
>>> -			tx_pkts, nb_pkts);
>>> +	return p->tx_pkt_prepare(qd, tx_pkts, nb_pkts);
>>>   }
>>>
>>>   #else
>>> diff --git a/lib/ethdev/version.map b/lib/ethdev/version.map
>>> index 904bce6ea1..79e62dcf61 100644
>>> --- a/lib/ethdev/version.map
>>> +++ b/lib/ethdev/version.map
>>> @@ -7,6 +7,8 @@ DPDK_22 {
>>>   	rte_eth_allmulticast_disable;
>>>   	rte_eth_allmulticast_enable;
>>>   	rte_eth_allmulticast_get;
>>> +	rte_eth_call_rx_callbacks;
>>> +	rte_eth_call_tx_callbacks;
>>>   	rte_eth_dev_adjust_nb_rx_tx_desc;
>>>   	rte_eth_dev_callback_register;
>>>   	rte_eth_dev_callback_unregister;
>>> @@ -76,6 +78,7 @@ DPDK_22 {
>>>   	rte_eth_find_next_of;
>>>   	rte_eth_find_next_owned_by;
>>>   	rte_eth_find_next_sibling;
>>> +	rte_eth_fp_ops;
>>>   	rte_eth_iterator_cleanup;
>>>   	rte_eth_iterator_init;
>>>   	rte_eth_iterator_next;
>>>
>
diff mbox series

Patch

diff --git a/lib/ethdev/ethdev_private.c b/lib/ethdev/ethdev_private.c
index 3eeda6e9f9..1222c6f84e 100644
--- a/lib/ethdev/ethdev_private.c
+++ b/lib/ethdev/ethdev_private.c
@@ -226,3 +226,34 @@  eth_dev_fp_ops_setup(struct rte_eth_fp_ops *fpo,
 	fpo->txq.data = dev->data->tx_queues;
 	fpo->txq.clbk = (void **)(uintptr_t)dev->pre_tx_burst_cbs;
 }
+
+uint16_t
+rte_eth_call_rx_callbacks(uint16_t port_id, uint16_t queue_id,
+	struct rte_mbuf **rx_pkts, uint16_t nb_rx, uint16_t nb_pkts,
+	void *opaque)
+{
+	const struct rte_eth_rxtx_callback *cb = opaque;
+
+	while (cb != NULL) {
+		nb_rx = cb->fn.rx(port_id, queue_id, rx_pkts, nb_rx,
+				nb_pkts, cb->param);
+		cb = cb->next;
+	}
+
+	return nb_rx;
+}
+
+uint16_t
+rte_eth_call_tx_callbacks(uint16_t port_id, uint16_t queue_id,
+	struct rte_mbuf **tx_pkts, uint16_t nb_pkts, void *opaque)
+{
+	const struct rte_eth_rxtx_callback *cb = opaque;
+
+	while (cb != NULL) {
+		nb_pkts = cb->fn.tx(port_id, queue_id, tx_pkts, nb_pkts,
+				cb->param);
+		cb = cb->next;
+	}
+
+	return nb_pkts;
+}
diff --git a/lib/ethdev/rte_ethdev.h b/lib/ethdev/rte_ethdev.h
index cdd16d6e57..c0e1a40681 100644
--- a/lib/ethdev/rte_ethdev.h
+++ b/lib/ethdev/rte_ethdev.h
@@ -4904,6 +4904,33 @@  int rte_eth_representor_info_get(uint16_t port_id,
 
 #include <rte_ethdev_core.h>
 
+/**
+ * @internal
+ * Helper routine for eth driver rx_burst API.
+ * Should be called at exit from PMD's rte_eth_rx_bulk implementation.
+ * Does necessary post-processing - invokes RX callbacks if any, etc.
+ *
+ * @param port_id
+ *  The port identifier of the Ethernet device.
+ * @param queue_id
+ *  The index of the receive queue from which to retrieve input packets.
+ * @param rx_pkts
+ *   The address of an array of pointers to *rte_mbuf* structures that
+ *   have been retrieved from the device.
+ * @param nb_pkts
+ *   The number of packets that were retrieved from the device.
+ * @param nb_pkts
+ *   The number of elements in *rx_pkts* array.
+ * @param opaque
+ *   Opaque pointer of RX queue callback related data.
+ *
+ * @return
+ *  The number of packets effectively supplied to the *rx_pkts* array.
+ */
+uint16_t rte_eth_call_rx_callbacks(uint16_t port_id, uint16_t queue_id,
+		struct rte_mbuf **rx_pkts, uint16_t nb_rx, uint16_t nb_pkts,
+		void *opaque);
+
 /**
  *
  * Retrieve a burst of input packets from a receive queue of an Ethernet
@@ -4995,23 +5022,37 @@  static inline uint16_t
 rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,
 		 struct rte_mbuf **rx_pkts, const uint16_t nb_pkts)
 {
-	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
 	uint16_t nb_rx;
+	struct rte_eth_fp_ops *p;
+	void *cb, *qd;
+
+#ifdef RTE_ETHDEV_DEBUG_RX
+	if (port_id >= RTE_MAX_ETHPORTS ||
+			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
+		RTE_ETHDEV_LOG(ERR,
+			"Invalid port_id=%u or queue_id=%u\n",
+			port_id, queue_id);
+		return 0;
+	}
+#endif
+
+	/* fetch pointer to queue data */
+	p = &rte_eth_fp_ops[port_id];
+	qd = p->rxq.data[queue_id];
 
 #ifdef RTE_ETHDEV_DEBUG_RX
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);
-	RTE_FUNC_PTR_OR_ERR_RET(*dev->rx_pkt_burst, 0);
 
-	if (queue_id >= dev->data->nb_rx_queues) {
-		RTE_ETHDEV_LOG(ERR, "Invalid RX queue_id=%u\n", queue_id);
+	if (qd == NULL) {
+		RTE_ETHDEV_LOG(ERR, "Invalid RX queue_id=%u for port_id=%u\n",
+			queue_id, port_id);
 		return 0;
 	}
 #endif
-	nb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],
-				     rx_pkts, nb_pkts);
+
+	nb_rx = p->rx_pkt_burst(qd, rx_pkts, nb_pkts);
 
 #ifdef RTE_ETHDEV_RXTX_CALLBACKS
-	struct rte_eth_rxtx_callback *cb;
 
 	/* __ATOMIC_RELEASE memory order was used when the
 	 * call back was inserted into the list.
@@ -5019,16 +5060,10 @@  rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,
 	 * cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is
 	 * not required.
 	 */
-	cb = __atomic_load_n(&dev->post_rx_burst_cbs[queue_id],
-				__ATOMIC_RELAXED);
-
-	if (unlikely(cb != NULL)) {
-		do {
-			nb_rx = cb->fn.rx(port_id, queue_id, rx_pkts, nb_rx,
-						nb_pkts, cb->param);
-			cb = cb->next;
-		} while (cb != NULL);
-	}
+	cb = __atomic_load_n((void **)&p->rxq.clbk[queue_id], __ATOMIC_RELAXED);
+	if (unlikely(cb != NULL))
+		nb_rx = rte_eth_call_rx_callbacks(port_id, queue_id, rx_pkts,
+				nb_rx, nb_pkts, cb);
 #endif
 
 	rte_ethdev_trace_rx_burst(port_id, queue_id, (void **)rx_pkts, nb_rx);
@@ -5051,16 +5086,27 @@  rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,
 static inline int
 rte_eth_rx_queue_count(uint16_t port_id, uint16_t queue_id)
 {
-	struct rte_eth_dev *dev;
+	struct rte_eth_fp_ops *p;
+	void *qd;
+
+	if (port_id >= RTE_MAX_ETHPORTS ||
+			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
+		RTE_ETHDEV_LOG(ERR,
+			"Invalid port_id=%u or queue_id=%u\n",
+			port_id, queue_id);
+		return -EINVAL;
+	}
+
+	/* fetch pointer to queue data */
+	p = &rte_eth_fp_ops[port_id];
+	qd = p->rxq.data[queue_id];
 
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);
-	dev = &rte_eth_devices[port_id];
-	RTE_FUNC_PTR_OR_ERR_RET(*dev->rx_queue_count, -ENOTSUP);
-	if (queue_id >= dev->data->nb_rx_queues ||
-	    dev->data->rx_queues[queue_id] == NULL)
+	RTE_FUNC_PTR_OR_ERR_RET(*p->rx_queue_count, -ENOTSUP);
+	if (qd == NULL)
 		return -EINVAL;
 
-	return (int)(*dev->rx_queue_count)(dev->data->rx_queues[queue_id]);
+	return (int)(*p->rx_queue_count)(qd);
 }
 
 /**@{@name Rx hardware descriptor states
@@ -5108,21 +5154,30 @@  static inline int
 rte_eth_rx_descriptor_status(uint16_t port_id, uint16_t queue_id,
 	uint16_t offset)
 {
-	struct rte_eth_dev *dev;
-	void *rxq;
+	struct rte_eth_fp_ops *p;
+	void *qd;
 
 #ifdef RTE_ETHDEV_DEBUG_RX
-	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);
+	if (port_id >= RTE_MAX_ETHPORTS ||
+			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
+		RTE_ETHDEV_LOG(ERR,
+			"Invalid port_id=%u or queue_id=%u\n",
+			port_id, queue_id);
+		return -EINVAL;
+	}
 #endif
-	dev = &rte_eth_devices[port_id];
+
+	/* fetch pointer to queue data */
+	p = &rte_eth_fp_ops[port_id];
+	qd = p->rxq.data[queue_id];
+
 #ifdef RTE_ETHDEV_DEBUG_RX
-	if (queue_id >= dev->data->nb_rx_queues)
+	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);
+	if (qd == NULL)
 		return -ENODEV;
 #endif
-	RTE_FUNC_PTR_OR_ERR_RET(*dev->rx_descriptor_status, -ENOTSUP);
-	rxq = dev->data->rx_queues[queue_id];
-
-	return (*dev->rx_descriptor_status)(rxq, offset);
+	RTE_FUNC_PTR_OR_ERR_RET(*p->rx_descriptor_status, -ENOTSUP);
+	return (*p->rx_descriptor_status)(qd, offset);
 }
 
 /**@{@name Tx hardware descriptor states
@@ -5169,23 +5224,54 @@  rte_eth_rx_descriptor_status(uint16_t port_id, uint16_t queue_id,
 static inline int rte_eth_tx_descriptor_status(uint16_t port_id,
 	uint16_t queue_id, uint16_t offset)
 {
-	struct rte_eth_dev *dev;
-	void *txq;
+	struct rte_eth_fp_ops *p;
+	void *qd;
 
 #ifdef RTE_ETHDEV_DEBUG_TX
-	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);
+	if (port_id >= RTE_MAX_ETHPORTS ||
+			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
+		RTE_ETHDEV_LOG(ERR,
+			"Invalid port_id=%u or queue_id=%u\n",
+			port_id, queue_id);
+		return -EINVAL;
+	}
 #endif
-	dev = &rte_eth_devices[port_id];
+
+	/* fetch pointer to queue data */
+	p = &rte_eth_fp_ops[port_id];
+	qd = p->txq.data[queue_id];
+
 #ifdef RTE_ETHDEV_DEBUG_TX
-	if (queue_id >= dev->data->nb_tx_queues)
+	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);
+	if (qd == NULL)
 		return -ENODEV;
 #endif
-	RTE_FUNC_PTR_OR_ERR_RET(*dev->tx_descriptor_status, -ENOTSUP);
-	txq = dev->data->tx_queues[queue_id];
-
-	return (*dev->tx_descriptor_status)(txq, offset);
+	RTE_FUNC_PTR_OR_ERR_RET(*p->tx_descriptor_status, -ENOTSUP);
+	return (*p->tx_descriptor_status)(qd, offset);
 }
 
+/**
+ * @internal
+ * Helper routine for eth driver tx_burst API.
+ * Should be called before entry PMD's rte_eth_tx_bulk implementation.
+ * Does necessary pre-processing - invokes TX callbacks if any, etc.
+ *
+ * @param port_id
+ *   The port identifier of the Ethernet device.
+ * @param queue_id
+ *   The index of the transmit queue through which output packets must be
+ *   sent.
+ * @param tx_pkts
+ *   The address of an array of *nb_pkts* pointers to *rte_mbuf* structures
+ *   which contain the output packets.
+ * @param nb_pkts
+ *   The maximum number of packets to transmit.
+ * @return
+ *   The number of output packets to transmit.
+ */
+uint16_t rte_eth_call_tx_callbacks(uint16_t port_id, uint16_t queue_id,
+	struct rte_mbuf **tx_pkts, uint16_t nb_pkts, void *opaque);
+
 /**
  * Send a burst of output packets on a transmit queue of an Ethernet device.
  *
@@ -5256,20 +5342,34 @@  static inline uint16_t
 rte_eth_tx_burst(uint16_t port_id, uint16_t queue_id,
 		 struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
 {
-	struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+	struct rte_eth_fp_ops *p;
+	void *cb, *qd;
+
+#ifdef RTE_ETHDEV_DEBUG_TX
+	if (port_id >= RTE_MAX_ETHPORTS ||
+			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
+		RTE_ETHDEV_LOG(ERR,
+			"Invalid port_id=%u or queue_id=%u\n",
+			port_id, queue_id);
+		return 0;
+	}
+#endif
+
+	/* fetch pointer to queue data */
+	p = &rte_eth_fp_ops[port_id];
+	qd = p->txq.data[queue_id];
 
 #ifdef RTE_ETHDEV_DEBUG_TX
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);
-	RTE_FUNC_PTR_OR_ERR_RET(*dev->tx_pkt_burst, 0);
 
-	if (queue_id >= dev->data->nb_tx_queues) {
-		RTE_ETHDEV_LOG(ERR, "Invalid TX queue_id=%u\n", queue_id);
+	if (qd == NULL) {
+		RTE_ETHDEV_LOG(ERR, "Invalid TX queue_id=%u for port_id=%u\n",
+			queue_id, port_id);
 		return 0;
 	}
 #endif
 
 #ifdef RTE_ETHDEV_RXTX_CALLBACKS
-	struct rte_eth_rxtx_callback *cb;
 
 	/* __ATOMIC_RELEASE memory order was used when the
 	 * call back was inserted into the list.
@@ -5277,21 +5377,16 @@  rte_eth_tx_burst(uint16_t port_id, uint16_t queue_id,
 	 * cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is
 	 * not required.
 	 */
-	cb = __atomic_load_n(&dev->pre_tx_burst_cbs[queue_id],
-				__ATOMIC_RELAXED);
-
-	if (unlikely(cb != NULL)) {
-		do {
-			nb_pkts = cb->fn.tx(port_id, queue_id, tx_pkts, nb_pkts,
-					cb->param);
-			cb = cb->next;
-		} while (cb != NULL);
-	}
+	cb = __atomic_load_n((void **)&p->txq.clbk[queue_id], __ATOMIC_RELAXED);
+	if (unlikely(cb != NULL))
+		nb_pkts = rte_eth_call_tx_callbacks(port_id, queue_id, tx_pkts,
+				nb_pkts, cb);
 #endif
 
-	rte_ethdev_trace_tx_burst(port_id, queue_id, (void **)tx_pkts,
-		nb_pkts);
-	return (*dev->tx_pkt_burst)(dev->data->tx_queues[queue_id], tx_pkts, nb_pkts);
+	nb_pkts = p->tx_pkt_burst(qd, tx_pkts, nb_pkts);
+
+	rte_ethdev_trace_tx_burst(port_id, queue_id, (void **)tx_pkts, nb_pkts);
+	return nb_pkts;
 }
 
 /**
@@ -5354,31 +5449,42 @@  static inline uint16_t
 rte_eth_tx_prepare(uint16_t port_id, uint16_t queue_id,
 		struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
 {
-	struct rte_eth_dev *dev;
+	struct rte_eth_fp_ops *p;
+	void *qd;
 
 #ifdef RTE_ETHDEV_DEBUG_TX
-	if (!rte_eth_dev_is_valid_port(port_id)) {
-		RTE_ETHDEV_LOG(ERR, "Invalid TX port_id=%u\n", port_id);
+	if (port_id >= RTE_MAX_ETHPORTS ||
+			queue_id >= RTE_MAX_QUEUES_PER_PORT) {
+		RTE_ETHDEV_LOG(ERR,
+			"Invalid port_id=%u or queue_id=%u\n",
+			port_id, queue_id);
 		rte_errno = ENODEV;
 		return 0;
 	}
 #endif
 
-	dev = &rte_eth_devices[port_id];
+	/* fetch pointer to queue data */
+	p = &rte_eth_fp_ops[port_id];
+	qd = p->txq.data[queue_id];
 
 #ifdef RTE_ETHDEV_DEBUG_TX
-	if (queue_id >= dev->data->nb_tx_queues) {
-		RTE_ETHDEV_LOG(ERR, "Invalid TX queue_id=%u\n", queue_id);
+	if (!rte_eth_dev_is_valid_port(port_id)) {
+		RTE_ETHDEV_LOG(ERR, "Invalid TX port_id=%u\n", port_id);
+		rte_errno = ENODEV;
+		return 0;
+	}
+	if (qd == NULL) {
+		RTE_ETHDEV_LOG(ERR, "Invalid TX queue_id=%u for port_id=%u\n",
+			queue_id, port_id);
 		rte_errno = EINVAL;
 		return 0;
 	}
 #endif
 
-	if (!dev->tx_pkt_prepare)
+	if (!p->tx_pkt_prepare)
 		return nb_pkts;
 
-	return (*dev->tx_pkt_prepare)(dev->data->tx_queues[queue_id],
-			tx_pkts, nb_pkts);
+	return p->tx_pkt_prepare(qd, tx_pkts, nb_pkts);
 }
 
 #else
diff --git a/lib/ethdev/version.map b/lib/ethdev/version.map
index 904bce6ea1..79e62dcf61 100644
--- a/lib/ethdev/version.map
+++ b/lib/ethdev/version.map
@@ -7,6 +7,8 @@  DPDK_22 {
 	rte_eth_allmulticast_disable;
 	rte_eth_allmulticast_enable;
 	rte_eth_allmulticast_get;
+	rte_eth_call_rx_callbacks;
+	rte_eth_call_tx_callbacks;
 	rte_eth_dev_adjust_nb_rx_tx_desc;
 	rte_eth_dev_callback_register;
 	rte_eth_dev_callback_unregister;
@@ -76,6 +78,7 @@  DPDK_22 {
 	rte_eth_find_next_of;
 	rte_eth_find_next_owned_by;
 	rte_eth_find_next_sibling;
+	rte_eth_fp_ops;
 	rte_eth_iterator_cleanup;
 	rte_eth_iterator_init;
 	rte_eth_iterator_next;