[v3,1/9] ethdev: introduce Rx buffer split

Message ID 1602519585-5194-2-git-send-email-viacheslavo@nvidia.com (mailing list archive)
State Superseded, archived
Headers
Series ethdev: introduce Rx buffer split |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Performance-Testing fail build patch failure

Commit Message

Slava Ovsiienko Oct. 12, 2020, 4:19 p.m. UTC
  The DPDK datapath in the transmit direction is very flexible.
An application can build the multi-segment packet and manages
almost all data aspects - the memory pools where segments
are allocated from, the segment lengths, the memory attributes
like external buffers, registered for DMA, etc.

In the receiving direction, the datapath is much less flexible,
an application can only specify the memory pool to configure the
receiving queue and nothing more. In order to extend receiving
datapath capabilities it is proposed to add the way to provide
extended information how to split the packets being received.

The following structure is introduced to specify the Rx packet
segment:

struct rte_eth_rxseg {
    struct rte_mempool *mp; /* memory pools to allocate segment from */
    uint16_t length; /* segment maximal data length,
		       	configures "split point" */
    uint16_t offset; /* data offset from beginning
		       	of mbuf data buffer */
    uint32_t reserved; /* reserved field */
};

The new routine rte_eth_rxseg_queue_setup_ex() is introduced to
setup the given Rx queue using the new extended Rx packet segment
description:

int
rte_eth_rx_queue_setup_ex(uint16_t port_id, uint16_t rx_queue_id,
                          uint16_t nb_rx_desc, unsigned int socket_id,
                          const struct rte_eth_rxconf *rx_conf,
		          const struct rte_eth_rxseg *rx_seg,
                          uint16_t n_seg)

This routine presents the two new parameters:
    rx_seg - pointer the array of segment descriptions, each element
             describes the memory pool, maximal data length, initial
             data offset from the beginning of data buffer in mbuf.
	     This array allows to specify the different settings for
	     each segment in individual fashion.
    n_seg - number of elements in the array

The new offload flag RTE_ETH_RX_OFFLOAD_BUFFER_SPLIT in device
capabilities is introduced to present the way for PMD to report to
application about supporting Rx packet split to configurable
segments. Prior invoking the rte_eth_rx_queue_setup_ex() routine
application should check RTE_ETH_RX_OFFLOAD_BUFFER_SPLIT flag.

If the Rx queue is configured with new routine the packets being
received will be split into multiple segments pushed to the mbufs
with specified attributes. The PMD will split the received packets
into multiple segments according to the specification in the
description array:

- the first network buffer will be allocated from the memory pool,
  specified in the first segment description element, the second
  network buffer - from the pool in the second segment description
  element and so on. If there is no enough elements to describe
  the buffer for entire packet of maximal length the pool from the
  last valid element will be used to allocate the buffers from for the
  rest of segments

- the offsets from the segment description elements will provide
  the data offset from the buffer beginning except the first mbuf -
  for this one the offset is added to the RTE_PKTMBUF_HEADROOM to get
  actual offset from the buffer beginning. If there is no enough
  elements to describe the buffer for entire packet of maximal length
  the offsets for the rest of segment will be supposed to be zero.

- the data length being received to each segment is limited  by the
  length specified in the segment description element. The data
  receiving starts with filling up the first mbuf data buffer, if the
  specified maximal segment length is reached and there are data
  remaining (packet is longer than buffer in the first mbuf) the
  following data will be pushed to the next segment up to its own
  maximal length. If the first two segments is not enough to store
  all the packet remaining data  the next (third) segment will
  be engaged and so on. If the length in the segment description
  element is zero the actual buffer size will be deduced from
  the appropriate memory pool properties. If there is no enough
  elements to describe the buffer for entire packet of maximal
  length the buffer size will be deduced from the pool of the last
  valid element for the remaining segments.

For example, let's suppose we configured the Rx queue with the
following segments:
    seg0 - pool0, len0=14B, off0=2
    seg1 - pool1, len1=20B, off1=128B
    seg2 - pool2, len2=20B, off2=0B
    seg3 - pool3, len3=512B, off3=0B

The packet 46 bytes long will look like the following:
    seg0 - 14B long @ RTE_PKTMBUF_HEADROOM + 2 in mbuf from pool0
    seg1 - 20B long @ 128 in mbuf from pool1
    seg2 - 12B long @ 0 in mbuf from pool2

The packet 1500 bytes long will look like the following:
    seg0 - 14B @ RTE_PKTMBUF_HEADROOM + 2 in mbuf from pool0
    seg1 - 20B @ 128 in mbuf from pool1
    seg2 - 20B @ 0 in mbuf from pool2
    seg3 - 512B @ 0 in mbuf from pool3
    seg4 - 512B @ 0 in mbuf from pool3
    seg5 - 422B @ 0 in mbuf from pool3

The offload RTE_ETH_RX_OFFLOAD_SCATTER must be present and
configured to support new buffer split feature (if n_seg
is greater than one).

The new approach would allow splitting the ingress packets into
multiple parts pushed to the memory with different attributes.
For example, the packet headers can be pushed to the embedded
data buffers within mbufs and the application data into
the external buffers attached to mbufs allocated from the
different memory pools. The memory attributes for the split
parts may differ either - for example the application data
may be pushed into the external memory located on the dedicated
physical device, say GPU or NVMe. This would improve the DPDK
receiving datapath flexibility with preserving compatibility
with existing API.

Signed-off-by: Viacheslav Ovsiienko <viacheslavo@nvidia.com>
---
 doc/guides/nics/features.rst             |  15 +++
 doc/guides/rel_notes/release_20_11.rst   |   6 ++
 lib/librte_ethdev/rte_ethdev.c           | 178 +++++++++++++++++++++++++++++++
 lib/librte_ethdev/rte_ethdev.h           | 107 +++++++++++++++++++
 lib/librte_ethdev/rte_ethdev_driver.h    |  10 ++
 lib/librte_ethdev/rte_ethdev_version.map |   1 +
 6 files changed, 317 insertions(+)
  

Comments

Andrew Rybchenko Oct. 12, 2020, 4:38 p.m. UTC | #1
On 10/12/20 7:19 PM, Viacheslav Ovsiienko wrote:
> The DPDK datapath in the transmit direction is very flexible.
> An application can build the multi-segment packet and manages
> almost all data aspects - the memory pools where segments
> are allocated from, the segment lengths, the memory attributes
> like external buffers, registered for DMA, etc.
> 
> In the receiving direction, the datapath is much less flexible,
> an application can only specify the memory pool to configure the
> receiving queue and nothing more. In order to extend receiving
> datapath capabilities it is proposed to add the way to provide
> extended information how to split the packets being received.
> 
> The following structure is introduced to specify the Rx packet
> segment:
> 
> struct rte_eth_rxseg {
>     struct rte_mempool *mp; /* memory pools to allocate segment from */
>     uint16_t length; /* segment maximal data length,
> 		       	configures "split point" */
>     uint16_t offset; /* data offset from beginning
> 		       	of mbuf data buffer */
>     uint32_t reserved; /* reserved field */
> };
> 
> The new routine rte_eth_rxseg_queue_setup_ex() is introduced to
> setup the given Rx queue using the new extended Rx packet segment
> description:
> 
> int
> rte_eth_rx_queue_setup_ex(uint16_t port_id, uint16_t rx_queue_id,
>                           uint16_t nb_rx_desc, unsigned int socket_id,
>                           const struct rte_eth_rxconf *rx_conf,
> 		          const struct rte_eth_rxseg *rx_seg,
>                           uint16_t n_seg)
> 
> This routine presents the two new parameters:
>     rx_seg - pointer the array of segment descriptions, each element
>              describes the memory pool, maximal data length, initial
>              data offset from the beginning of data buffer in mbuf.
> 	     This array allows to specify the different settings for
> 	     each segment in individual fashion.
>     n_seg - number of elements in the array
> 
> The new offload flag RTE_ETH_RX_OFFLOAD_BUFFER_SPLIT in device
> capabilities is introduced to present the way for PMD to report to
> application about supporting Rx packet split to configurable
> segments. Prior invoking the rte_eth_rx_queue_setup_ex() routine
> application should check RTE_ETH_RX_OFFLOAD_BUFFER_SPLIT flag.
> 
> If the Rx queue is configured with new routine the packets being
> received will be split into multiple segments pushed to the mbufs
> with specified attributes. The PMD will split the received packets
> into multiple segments according to the specification in the
> description array:
> 
> - the first network buffer will be allocated from the memory pool,
>   specified in the first segment description element, the second
>   network buffer - from the pool in the second segment description
>   element and so on. If there is no enough elements to describe
>   the buffer for entire packet of maximal length the pool from the
>   last valid element will be used to allocate the buffers from for the
>   rest of segments
> 
> - the offsets from the segment description elements will provide
>   the data offset from the buffer beginning except the first mbuf -
>   for this one the offset is added to the RTE_PKTMBUF_HEADROOM to get
>   actual offset from the buffer beginning. If there is no enough
>   elements to describe the buffer for entire packet of maximal length
>   the offsets for the rest of segment will be supposed to be zero.
> 
> - the data length being received to each segment is limited  by the
>   length specified in the segment description element. The data
>   receiving starts with filling up the first mbuf data buffer, if the
>   specified maximal segment length is reached and there are data
>   remaining (packet is longer than buffer in the first mbuf) the
>   following data will be pushed to the next segment up to its own
>   maximal length. If the first two segments is not enough to store
>   all the packet remaining data  the next (third) segment will
>   be engaged and so on. If the length in the segment description
>   element is zero the actual buffer size will be deduced from
>   the appropriate memory pool properties. If there is no enough
>   elements to describe the buffer for entire packet of maximal
>   length the buffer size will be deduced from the pool of the last
>   valid element for the remaining segments.
> 
> For example, let's suppose we configured the Rx queue with the
> following segments:
>     seg0 - pool0, len0=14B, off0=2
>     seg1 - pool1, len1=20B, off1=128B
>     seg2 - pool2, len2=20B, off2=0B
>     seg3 - pool3, len3=512B, off3=0B
> 
> The packet 46 bytes long will look like the following:
>     seg0 - 14B long @ RTE_PKTMBUF_HEADROOM + 2 in mbuf from pool0
>     seg1 - 20B long @ 128 in mbuf from pool1
>     seg2 - 12B long @ 0 in mbuf from pool2
> 
> The packet 1500 bytes long will look like the following:
>     seg0 - 14B @ RTE_PKTMBUF_HEADROOM + 2 in mbuf from pool0
>     seg1 - 20B @ 128 in mbuf from pool1
>     seg2 - 20B @ 0 in mbuf from pool2
>     seg3 - 512B @ 0 in mbuf from pool3
>     seg4 - 512B @ 0 in mbuf from pool3
>     seg5 - 422B @ 0 in mbuf from pool3
> 
> The offload RTE_ETH_RX_OFFLOAD_SCATTER must be present and
> configured to support new buffer split feature (if n_seg
> is greater than one).
> 
> The new approach would allow splitting the ingress packets into
> multiple parts pushed to the memory with different attributes.
> For example, the packet headers can be pushed to the embedded
> data buffers within mbufs and the application data into
> the external buffers attached to mbufs allocated from the
> different memory pools. The memory attributes for the split
> parts may differ either - for example the application data
> may be pushed into the external memory located on the dedicated
> physical device, say GPU or NVMe. This would improve the DPDK
> receiving datapath flexibility with preserving compatibility
> with existing API.
> 
> Signed-off-by: Viacheslav Ovsiienko <viacheslavo@nvidia.com>
> ---
>  doc/guides/nics/features.rst             |  15 +++
>  doc/guides/rel_notes/release_20_11.rst   |   6 ++
>  lib/librte_ethdev/rte_ethdev.c           | 178 +++++++++++++++++++++++++++++++
>  lib/librte_ethdev/rte_ethdev.h           | 107 +++++++++++++++++++
>  lib/librte_ethdev/rte_ethdev_driver.h    |  10 ++
>  lib/librte_ethdev/rte_ethdev_version.map |   1 +
>  6 files changed, 317 insertions(+)
> 
> diff --git a/doc/guides/nics/features.rst b/doc/guides/nics/features.rst
> index dd8c955..21b91db 100644
> --- a/doc/guides/nics/features.rst
> +++ b/doc/guides/nics/features.rst
> @@ -185,6 +185,21 @@ Supports receiving segmented mbufs.
>  * **[related]    eth_dev_ops**: ``rx_pkt_burst``.
>  
>  
> +.. _nic_features_buffer_split:
> +
> +Buffer Split on Rx
> +------------
> +
> +Scatters the packets being received on specified boundaries to segmented mbufs.
> +
> +* **[uses]       rte_eth_rxconf,rte_eth_rxmode**: ``offloads:RTE_ETH_RX_OFFLOAD_BUFFER_SPLIT``.
> +* **[implements] datapath**: ``Buffer Split functionality``.
> +* **[implements] rte_eth_dev_data**: ``buffer_split``.
> +* **[provides]   rte_eth_dev_info**: ``rx_offload_capa:RTE_ETH_RX_OFFLOAD_BUFFER_SPLIT``.
> +* **[provides]   eth_dev_ops**: ``rxq_info_get:buffer_split``.
> +* **[related] API**: ``rte_eth_rxseg_queue_setup()``.
> +
> +
>  .. _nic_features_lro:
>  
>  LRO
> diff --git a/doc/guides/rel_notes/release_20_11.rst b/doc/guides/rel_notes/release_20_11.rst
> index 2cec9dd..d87247a 100644
> --- a/doc/guides/rel_notes/release_20_11.rst
> +++ b/doc/guides/rel_notes/release_20_11.rst
> @@ -60,6 +60,12 @@ New Features
>    Added the FEC API which provides functions for query FEC capabilities and
>    current FEC mode from device. Also, API for configuring FEC mode is also provided.
>  
> +* **Introduced extended buffer description for receiving.**
> +
> +  Added the extended Rx queue setup routine providing the individual
> +  descriptions for each Rx segment with maximal size, buffer offset and memory
> +  pool to allocate data buffers from.
> +
>  * **Updated Broadcom bnxt driver.**
>  
>    Updated the Broadcom bnxt driver with new features and improvements, including:
> diff --git a/lib/librte_ethdev/rte_ethdev.c b/lib/librte_ethdev/rte_ethdev.c
> index 59beb8a..3a55567 100644
> --- a/lib/librte_ethdev/rte_ethdev.c
> +++ b/lib/librte_ethdev/rte_ethdev.c
> @@ -105,6 +105,9 @@ struct rte_eth_xstats_name_off {
>  #define RTE_RX_OFFLOAD_BIT2STR(_name)	\
>  	{ DEV_RX_OFFLOAD_##_name, #_name }
>  
> +#define RTE_ETH_RX_OFFLOAD_BIT2STR(_name)	\
> +	{ RTE_ETH_RX_OFFLOAD_##_name, #_name }
> +
>  static const struct {
>  	uint64_t offload;
>  	const char *name;
> @@ -128,9 +131,11 @@ struct rte_eth_xstats_name_off {
>  	RTE_RX_OFFLOAD_BIT2STR(SCTP_CKSUM),
>  	RTE_RX_OFFLOAD_BIT2STR(OUTER_UDP_CKSUM),
>  	RTE_RX_OFFLOAD_BIT2STR(RSS_HASH),
> +	RTE_ETH_RX_OFFLOAD_BIT2STR(BUFFER_SPLIT),
>  };
>  
>  #undef RTE_RX_OFFLOAD_BIT2STR
> +#undef RTE_ETH_RX_OFFLOAD_BIT2STR
>  
>  #define RTE_TX_OFFLOAD_BIT2STR(_name)	\
>  	{ DEV_TX_OFFLOAD_##_name, #_name }
> @@ -1920,6 +1925,179 @@ struct rte_eth_dev *
>  }
>  
>  int
> +rte_eth_rxseg_queue_setup(uint16_t port_id, uint16_t rx_queue_id,
> +			  uint16_t nb_rx_desc, unsigned int socket_id,
> +			  const struct rte_eth_rxconf *rx_conf,
> +			  const struct rte_eth_rxseg *rx_seg, uint16_t n_seg)
> +{
> +	int ret;
> +	uint16_t seg_idx;
> +	uint32_t mbp_buf_size;

<start-of-dup>

> +	struct rte_eth_dev *dev;
> +	struct rte_eth_dev_info dev_info;
> +	struct rte_eth_rxconf local_conf;
> +	void **rxq;
> +
> +	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -EINVAL);
> +
> +	dev = &rte_eth_devices[port_id];
> +	if (rx_queue_id >= dev->data->nb_rx_queues) {
> +		RTE_ETHDEV_LOG(ERR, "Invalid RX queue_id=%u\n", rx_queue_id);
> +		return -EINVAL;
> +	}

<end-of-dup>

> +
> +	if (rx_seg == NULL) {
> +		RTE_ETHDEV_LOG(ERR, "Invalid null description pointer\n");
> +		return -EINVAL;
> +	}
> +
> +	if (n_seg == 0) {
> +		RTE_ETHDEV_LOG(ERR, "Invalid zero description number\n");
> +		return -EINVAL;
> +	}
> +
> +	RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rxseg_queue_setup, -ENOTSUP);
> +

<start-of-dup>

> +	/*
> +	 * Check the size of the mbuf data buffer.
> +	 * This value must be provided in the private data of the memory pool.
> +	 * First check that the memory pool has a valid private data.
> +	 */
> +	ret = rte_eth_dev_info_get(port_id, &dev_info);
> +	if (ret != 0)
> +		return ret;

<end-of-dup>

> +
> +	for (seg_idx = 0; seg_idx < n_seg; seg_idx++) {
> +		struct rte_mempool *mp = rx_seg[seg_idx].mp;
> +
> +		if (mp->private_data_size <
> +				sizeof(struct rte_pktmbuf_pool_private)) {
> +			RTE_ETHDEV_LOG(ERR, "%s private_data_size %d < %d\n",
> +				mp->name, (int)mp->private_data_size,
> +				(int)sizeof(struct rte_pktmbuf_pool_private));
> +			return -ENOSPC;
> +		}
> +
> +		mbp_buf_size = rte_pktmbuf_data_room_size(mp);
> +		if (mbp_buf_size < rx_seg[seg_idx].length +
> +				   rx_seg[seg_idx].offset +
> +				   (seg_idx ? 0 :
> +				    (uint32_t)RTE_PKTMBUF_HEADROOM)) {
> +			RTE_ETHDEV_LOG(ERR,
> +				"%s mbuf_data_room_size %d < %d"
> +				" (segment length=%d + segment offset=%d)\n",
> +				mp->name, (int)mbp_buf_size,
> +				(int)(rx_seg[seg_idx].length +
> +				      rx_seg[seg_idx].offset),
> +				(int)rx_seg[seg_idx].length,
> +				(int)rx_seg[seg_idx].offset);
> +			return -EINVAL;
> +		}
> +	}
> +

<start-of-huge-dup>

> +	/* Use default specified by driver, if nb_rx_desc is zero */
> +	if (nb_rx_desc == 0) {
> +		nb_rx_desc = dev_info.default_rxportconf.ring_size;
> +		/* If driver default is also zero, fall back on EAL default */
> +		if (nb_rx_desc == 0)
> +			nb_rx_desc = RTE_ETH_DEV_FALLBACK_RX_RINGSIZE;
> +	}
> +
> +	if (nb_rx_desc > dev_info.rx_desc_lim.nb_max ||
> +			nb_rx_desc < dev_info.rx_desc_lim.nb_min ||
> +			nb_rx_desc % dev_info.rx_desc_lim.nb_align != 0) {
> +
> +		RTE_ETHDEV_LOG(ERR,
> +			"Invalid value for nb_rx_desc(=%hu), should be: "
> +			"<= %hu, >= %hu, and a product of %hu\n",
> +			nb_rx_desc, dev_info.rx_desc_lim.nb_max,
> +			dev_info.rx_desc_lim.nb_min,
> +			dev_info.rx_desc_lim.nb_align);
> +		return -EINVAL;
> +	}
> +
> +	if (dev->data->dev_started &&
> +		!(dev_info.dev_capa &
> +			RTE_ETH_DEV_CAPA_RUNTIME_RX_QUEUE_SETUP))
> +		return -EBUSY;
> +
> +	if (dev->data->dev_started &&
> +		(dev->data->rx_queue_state[rx_queue_id] !=
> +			RTE_ETH_QUEUE_STATE_STOPPED))
> +		return -EBUSY;
> +
> +	rxq = dev->data->rx_queues;
> +	if (rxq[rx_queue_id]) {
> +		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_release,
> +					-ENOTSUP);
> +		(*dev->dev_ops->rx_queue_release)(rxq[rx_queue_id]);
> +		rxq[rx_queue_id] = NULL;
> +	}
> +
> +	if (rx_conf == NULL)
> +		rx_conf = &dev_info.default_rxconf;
> +
> +	local_conf = *rx_conf;
> +
> +	/*
> +	 * If an offloading has already been enabled in
> +	 * rte_eth_dev_configure(), it has been enabled on all queues,
> +	 * so there is no need to enable it in this queue again.
> +	 * The local_conf.offloads input to underlying PMD only carries
> +	 * those offloadings which are only enabled on this queue and
> +	 * not enabled on all queues.
> +	 */
> +	local_conf.offloads &= ~dev->data->dev_conf.rxmode.offloads;
> +
> +	/*
> +	 * New added offloadings for this queue are those not enabled in
> +	 * rte_eth_dev_configure() and they must be per-queue type.
> +	 * A pure per-port offloading can't be enabled on a queue while
> +	 * disabled on another queue. A pure per-port offloading can't
> +	 * be enabled for any queue as new added one if it hasn't been
> +	 * enabled in rte_eth_dev_configure().
> +	 */
> +	if ((local_conf.offloads & dev_info.rx_queue_offload_capa) !=
> +	     local_conf.offloads) {
> +		RTE_ETHDEV_LOG(ERR,
> +			"Ethdev port_id=%d rx_queue_id=%d, new added offloads"
> +			" 0x%"PRIx64" must be within per-queue offload"
> +			" capabilities 0x%"PRIx64" in %s()\n",
> +			port_id, rx_queue_id, local_conf.offloads,
> +			dev_info.rx_queue_offload_capa,
> +			__func__);
> +		return -EINVAL;
> +	}
> +
> +	/*
> +	 * If LRO is enabled, check that the maximum aggregated packet
> +	 * size is supported by the configured device.
> +	 */
> +	if (local_conf.offloads & DEV_RX_OFFLOAD_TCP_LRO) {
> +		if (dev->data->dev_conf.rxmode.max_lro_pkt_size == 0)
> +			dev->data->dev_conf.rxmode.max_lro_pkt_size =
> +				dev->data->dev_conf.rxmode.max_rx_pkt_len;
> +		int ret = check_lro_pkt_size(port_id,
> +				dev->data->dev_conf.rxmode.max_lro_pkt_size,
> +				dev->data->dev_conf.rxmode.max_rx_pkt_len,
> +				dev_info.max_lro_pkt_size);
> +		if (ret != 0)
> +			return ret;
> +	}

<end-of-huge-dup>

IMO It is not acceptable to duplication so much code.
It is simply unmaintainable.

NACK

> +
> +	ret = (*dev->dev_ops->rxseg_queue_setup)(dev, rx_queue_id, nb_rx_desc,
> +						 socket_id, &local_conf,
> +						 rx_seg, n_seg);
> +	if (!ret) {
> +		if (!dev->data->min_rx_buf_size ||
> +		    dev->data->min_rx_buf_size > mbp_buf_size)
> +			dev->data->min_rx_buf_size = mbp_buf_size;
> +	}
> +
> +	return eth_err(port_id, ret);
> +}
> +
> +int
>  rte_eth_rx_hairpin_queue_setup(uint16_t port_id, uint16_t rx_queue_id,
>  			       uint16_t nb_rx_desc,
>  			       const struct rte_eth_hairpin_conf *conf)
> diff --git a/lib/librte_ethdev/rte_ethdev.h b/lib/librte_ethdev/rte_ethdev.h
> index 3a31f94..bbf25c8 100644
> --- a/lib/librte_ethdev/rte_ethdev.h
> +++ b/lib/librte_ethdev/rte_ethdev.h
> @@ -970,6 +970,16 @@ struct rte_eth_txmode {
>  };
>  
>  /**
> + * A structure used to configure an RX packet segment to split.
> + */
> +struct rte_eth_rxseg {
> +	struct rte_mempool *mp; /**< Memory pools to allocate segment from */
> +	uint16_t length; /**< Segment data length, configures split point. */
> +	uint16_t offset; /**< Data offset from beginning of mbuf data buffer */
> +	uint32_t reserved; /**< Reserved field */
> +};
> +
> +/**
>   * A structure used to configure an RX ring of an Ethernet port.
>   */
>  struct rte_eth_rxconf {
> @@ -1260,6 +1270,7 @@ struct rte_eth_conf {
>  #define DEV_RX_OFFLOAD_SCTP_CKSUM	0x00020000
>  #define DEV_RX_OFFLOAD_OUTER_UDP_CKSUM  0x00040000
>  #define DEV_RX_OFFLOAD_RSS_HASH		0x00080000
> +#define RTE_ETH_RX_OFFLOAD_BUFFER_SPLIT 0x00100000
>  
>  #define DEV_RX_OFFLOAD_CHECKSUM (DEV_RX_OFFLOAD_IPV4_CKSUM | \
>  				 DEV_RX_OFFLOAD_UDP_CKSUM | \
> @@ -2037,6 +2048,102 @@ int rte_eth_rx_queue_setup(uint16_t port_id, uint16_t rx_queue_id,
>  		uint16_t nb_rx_desc, unsigned int socket_id,
>  		const struct rte_eth_rxconf *rx_conf,
>  		struct rte_mempool *mb_pool);
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice.
> + *
> + * Allocate and set up a receive queue for an Ethernet device
> + * with specifying receiving segments parameters.
> + *
> + * The function allocates a contiguous block of memory for *nb_rx_desc*
> + * receive descriptors from a memory zone associated with *socket_id*.
> + * The descriptors might be divided into groups by PMD to receive the data
> + * into multi-segment packet presented by the chain of mbufs.
> + *
> + * Each descriptor within the group is initialized accordingly with
> + * the network buffers allocated from the specified memory pool and with
> + * specified buffer offset and maximal segment length.
> + *
> + * @param port_id
> + *   The port identifier of the Ethernet device.
> + * @param rx_queue_id
> + *   The index of the receive queue to set up.
> + *   The value must be in the range [0, nb_rx_queue - 1] previously supplied
> + *   to rte_eth_dev_configure().
> + * @param nb_rx_desc
> + *   The number of receive descriptors to allocate for the receive ring.
> + * @param socket_id
> + *   The *socket_id* argument is the socket identifier in case of NUMA.
> + *   The value can be *SOCKET_ID_ANY* if there is no NUMA constraint for
> + *   the DMA memory allocated for the receive descriptors of the ring.
> + * @param rx_conf
> + *   The pointer to the configuration data to be used for the receive queue.
> + *   NULL value is allowed, in which case default RX configuration
> + *   will be used.
> + *   The *rx_conf* structure contains an *rx_thresh* structure with the values
> + *   of the Prefetch, Host, and Write-Back threshold registers of the receive
> + *   ring.
> + *   In addition it contains the hardware offloads features to activate using
> + *   the DEV_RX_OFFLOAD_* flags.
> + *   If an offloading set in rx_conf->offloads
> + *   hasn't been set in the input argument eth_conf->rxmode.offloads
> + *   to rte_eth_dev_configure(), it is a new added offloading, it must be
> + *   per-queue type and it is enabled for the queue.
> + *   No need to repeat any bit in rx_conf->offloads which has already been
> + *   enabled in rte_eth_dev_configure() at port level. An offloading enabled
> + *   at port level can't be disabled at queue level.
> + * @param rx_seg
> + *   The pointer to the array of segment descriptions, each element describes
> + *   the memory pool, maximal segment data length, initial data offset from
> + *   the beginning of data buffer in mbuf. This allow to specify the dedicated
> + *   properties for each segment in the receiving buffer - pool, buffer
> + *   offset, maximal segment size. If RTE_ETH_RX_OFFLOAD_BUFFER_SPLIT offload
> + *   flag is configured the PMD will split the received packets into multiple
> + *   segments according to the specification in the description array:
> + *   - the first network buffer will be allocated from the memory pool,
> + *     specified in the first segment description element, the second
> + *     network buffer - from the pool in the second segment description
> + *     element and so on. If there is no enough elements to describe
> + *     the buffer for entire packet of maximal length the pool from the last
> + *     valid element will be used to allocate the buffers from for the rest
> + *     of segments.
> + *   - the offsets from the segment description elements will provide the
> + *     data offset from the buffer beginning except the first mbuf - for this
> + *     one the offset is added to the RTE_PKTMBUF_HEADROOM to get actual
> + *     offset from the buffer beginning. If there is no enough elements
> + *     to describe the buffer for entire packet of maximal length the offsets
> + *     for the rest of segment will be supposed to be zero.
> + *   - the data length being received to each segment is limited by the
> + *     length specified in the segment description element. The data receiving
> + *     starts with filling up the first mbuf data buffer, if the specified
> + *     maximal segment length is reached and there are data remaining
> + *     (packet is longer than buffer in the first mbuf) the following data
> + *     will be pushed to the next segment up to its own length. If the first
> + *     two segments is not enough to store all the packet data the next
> + *     (third) segment will be engaged and so on. If the length in the segment
> + *     description element is zero the actual buffer size will be deduced
> + *     from the appropriate memory pool properties. If there is no enough
> + *     elements to describe the buffer for entire packet of maximal length
> + *     the buffer size will be deduced from the pool of the last valid
> + *     element for the all remaining segments.
> + * @param n_seg
> + *   The number of elements in the segment description array.
> + * @return
> + *   - 0: Success, receive queue correctly set up.
> + *   - -EIO: if device is removed.
> + *   - -EINVAL: The segment descriptors array is empty (pointer to is null or
> + *      zero number of elements) or the size of network buffers which can be
> + *      allocated from this memory pool does not fit the various buffer sizes
> + *      allowed by the device controller.
> + *   - -ENOMEM: Unable to allocate the receive ring descriptors or to
> + *      allocate network memory buffers from the memory pool when
> + *      initializing receive descriptors.
> + */
> +__rte_experimental
> +int rte_eth_rxseg_queue_setup(uint16_t port_id, uint16_t rx_queue_id,
> +		uint16_t nb_rx_desc, unsigned int socket_id,
> +		const struct rte_eth_rxconf *rx_conf,
> +		const struct rte_eth_rxseg *rx_seg, uint16_t n_seg);
>  
>  /**
>   * @warning
> diff --git a/lib/librte_ethdev/rte_ethdev_driver.h b/lib/librte_ethdev/rte_ethdev_driver.h
> index 35cc4fb..5dee210 100644
> --- a/lib/librte_ethdev/rte_ethdev_driver.h
> +++ b/lib/librte_ethdev/rte_ethdev_driver.h
> @@ -264,6 +264,15 @@ typedef int (*eth_rx_queue_setup_t)(struct rte_eth_dev *dev,
>  				    struct rte_mempool *mb_pool);
>  /**< @internal Set up a receive queue of an Ethernet device. */
>  
> +typedef int (*eth_rxseg_queue_setup_t)(struct rte_eth_dev *dev,
> +				       uint16_t rx_queue_id,
> +				       uint16_t nb_rx_desc,
> +				       unsigned int socket_id,
> +				       const struct rte_eth_rxconf *rx_conf,
> +				       const struct rte_eth_rxseg *rx_seg,
> +				       uint16_t n_seg);
> +/**< @internal extended Set up a receive queue of an Ethernet device. */
> +
>  typedef int (*eth_tx_queue_setup_t)(struct rte_eth_dev *dev,
>  				    uint16_t tx_queue_id,
>  				    uint16_t nb_tx_desc,
> @@ -711,6 +720,7 @@ struct eth_dev_ops {
>  	eth_queue_start_t          tx_queue_start;/**< Start TX for a queue. */
>  	eth_queue_stop_t           tx_queue_stop; /**< Stop TX for a queue. */
>  	eth_rx_queue_setup_t       rx_queue_setup;/**< Set up device RX queue. */
> +	eth_rxseg_queue_setup_t    rxseg_queue_setup;/**< Extended RX setup. */
>  	eth_queue_release_t        rx_queue_release; /**< Release RX queue. */
>  
>  	eth_rx_enable_intr_t       rx_queue_intr_enable;  /**< Enable Rx queue interrupt. */
> diff --git a/lib/librte_ethdev/rte_ethdev_version.map b/lib/librte_ethdev/rte_ethdev_version.map
> index f8a0945..d4b9849 100644
> --- a/lib/librte_ethdev/rte_ethdev_version.map
> +++ b/lib/librte_ethdev/rte_ethdev_version.map
> @@ -195,6 +195,7 @@ EXPERIMENTAL {
>  	rte_flow_get_aged_flows;
>  
>  	# Marked as experimental in 20.11
> +	rte_eth_rxseg_queue_setup;
>  	rte_tm_capabilities_get;
>  	rte_tm_get_number_of_leaf_nodes;
>  	rte_tm_hierarchy_commit;
>
  
Thomas Monjalon Oct. 12, 2020, 5:03 p.m. UTC | #2
12/10/2020 18:38, Andrew Rybchenko:
> On 10/12/20 7:19 PM, Viacheslav Ovsiienko wrote:
> >  int
> > +rte_eth_rxseg_queue_setup(uint16_t port_id, uint16_t rx_queue_id,
> > +			  uint16_t nb_rx_desc, unsigned int socket_id,
> > +			  const struct rte_eth_rxconf *rx_conf,
> > +			  const struct rte_eth_rxseg *rx_seg, uint16_t n_seg)
> > +{
> > +	int ret;
> > +	uint16_t seg_idx;
> > +	uint32_t mbp_buf_size;
> 
> <start-of-dup>
> 
> > +	struct rte_eth_dev *dev;
> > +	struct rte_eth_dev_info dev_info;
> > +	struct rte_eth_rxconf local_conf;
> > +	void **rxq;
> > +
> > +	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -EINVAL);
> > +
> > +	dev = &rte_eth_devices[port_id];
> > +	if (rx_queue_id >= dev->data->nb_rx_queues) {
> > +		RTE_ETHDEV_LOG(ERR, "Invalid RX queue_id=%u\n", rx_queue_id);
> > +		return -EINVAL;
> > +	}
> 
> <end-of-dup>
> 
> > +
> > +	if (rx_seg == NULL) {
> > +		RTE_ETHDEV_LOG(ERR, "Invalid null description pointer\n");
> > +		return -EINVAL;
> > +	}
> > +
> > +	if (n_seg == 0) {
> > +		RTE_ETHDEV_LOG(ERR, "Invalid zero description number\n");
> > +		return -EINVAL;
> > +	}
> > +
> > +	RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rxseg_queue_setup, -ENOTSUP);
> > +
> 
> <start-of-dup>
> 
> > +	/*
> > +	 * Check the size of the mbuf data buffer.
> > +	 * This value must be provided in the private data of the memory pool.
> > +	 * First check that the memory pool has a valid private data.
> > +	 */
> > +	ret = rte_eth_dev_info_get(port_id, &dev_info);
> > +	if (ret != 0)
> > +		return ret;
> 
> <end-of-dup>
> 
> > +
> > +	for (seg_idx = 0; seg_idx < n_seg; seg_idx++) {
> > +		struct rte_mempool *mp = rx_seg[seg_idx].mp;
> > +
> > +		if (mp->private_data_size <
> > +				sizeof(struct rte_pktmbuf_pool_private)) {
> > +			RTE_ETHDEV_LOG(ERR, "%s private_data_size %d < %d\n",
> > +				mp->name, (int)mp->private_data_size,
> > +				(int)sizeof(struct rte_pktmbuf_pool_private));
> > +			return -ENOSPC;
> > +		}
> > +
> > +		mbp_buf_size = rte_pktmbuf_data_room_size(mp);
> > +		if (mbp_buf_size < rx_seg[seg_idx].length +
> > +				   rx_seg[seg_idx].offset +
> > +				   (seg_idx ? 0 :
> > +				    (uint32_t)RTE_PKTMBUF_HEADROOM)) {
> > +			RTE_ETHDEV_LOG(ERR,
> > +				"%s mbuf_data_room_size %d < %d"
> > +				" (segment length=%d + segment offset=%d)\n",
> > +				mp->name, (int)mbp_buf_size,
> > +				(int)(rx_seg[seg_idx].length +
> > +				      rx_seg[seg_idx].offset),
> > +				(int)rx_seg[seg_idx].length,
> > +				(int)rx_seg[seg_idx].offset);
> > +			return -EINVAL;
> > +		}
> > +	}
> > +
> 
> <start-of-huge-dup>
> 
> > +	/* Use default specified by driver, if nb_rx_desc is zero */
> > +	if (nb_rx_desc == 0) {
> > +		nb_rx_desc = dev_info.default_rxportconf.ring_size;
> > +		/* If driver default is also zero, fall back on EAL default */
> > +		if (nb_rx_desc == 0)
> > +			nb_rx_desc = RTE_ETH_DEV_FALLBACK_RX_RINGSIZE;
> > +	}
> > +
> > +	if (nb_rx_desc > dev_info.rx_desc_lim.nb_max ||
> > +			nb_rx_desc < dev_info.rx_desc_lim.nb_min ||
> > +			nb_rx_desc % dev_info.rx_desc_lim.nb_align != 0) {
> > +
> > +		RTE_ETHDEV_LOG(ERR,
> > +			"Invalid value for nb_rx_desc(=%hu), should be: "
> > +			"<= %hu, >= %hu, and a product of %hu\n",
> > +			nb_rx_desc, dev_info.rx_desc_lim.nb_max,
> > +			dev_info.rx_desc_lim.nb_min,
> > +			dev_info.rx_desc_lim.nb_align);
> > +		return -EINVAL;
> > +	}
> > +
> > +	if (dev->data->dev_started &&
> > +		!(dev_info.dev_capa &
> > +			RTE_ETH_DEV_CAPA_RUNTIME_RX_QUEUE_SETUP))
> > +		return -EBUSY;
> > +
> > +	if (dev->data->dev_started &&
> > +		(dev->data->rx_queue_state[rx_queue_id] !=
> > +			RTE_ETH_QUEUE_STATE_STOPPED))
> > +		return -EBUSY;
> > +
> > +	rxq = dev->data->rx_queues;
> > +	if (rxq[rx_queue_id]) {
> > +		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_release,
> > +					-ENOTSUP);
> > +		(*dev->dev_ops->rx_queue_release)(rxq[rx_queue_id]);
> > +		rxq[rx_queue_id] = NULL;
> > +	}
> > +
> > +	if (rx_conf == NULL)
> > +		rx_conf = &dev_info.default_rxconf;
> > +
> > +	local_conf = *rx_conf;
> > +
> > +	/*
> > +	 * If an offloading has already been enabled in
> > +	 * rte_eth_dev_configure(), it has been enabled on all queues,
> > +	 * so there is no need to enable it in this queue again.
> > +	 * The local_conf.offloads input to underlying PMD only carries
> > +	 * those offloadings which are only enabled on this queue and
> > +	 * not enabled on all queues.
> > +	 */
> > +	local_conf.offloads &= ~dev->data->dev_conf.rxmode.offloads;
> > +
> > +	/*
> > +	 * New added offloadings for this queue are those not enabled in
> > +	 * rte_eth_dev_configure() and they must be per-queue type.
> > +	 * A pure per-port offloading can't be enabled on a queue while
> > +	 * disabled on another queue. A pure per-port offloading can't
> > +	 * be enabled for any queue as new added one if it hasn't been
> > +	 * enabled in rte_eth_dev_configure().
> > +	 */
> > +	if ((local_conf.offloads & dev_info.rx_queue_offload_capa) !=
> > +	     local_conf.offloads) {
> > +		RTE_ETHDEV_LOG(ERR,
> > +			"Ethdev port_id=%d rx_queue_id=%d, new added offloads"
> > +			" 0x%"PRIx64" must be within per-queue offload"
> > +			" capabilities 0x%"PRIx64" in %s()\n",
> > +			port_id, rx_queue_id, local_conf.offloads,
> > +			dev_info.rx_queue_offload_capa,
> > +			__func__);
> > +		return -EINVAL;
> > +	}
> > +
> > +	/*
> > +	 * If LRO is enabled, check that the maximum aggregated packet
> > +	 * size is supported by the configured device.
> > +	 */
> > +	if (local_conf.offloads & DEV_RX_OFFLOAD_TCP_LRO) {
> > +		if (dev->data->dev_conf.rxmode.max_lro_pkt_size == 0)
> > +			dev->data->dev_conf.rxmode.max_lro_pkt_size =
> > +				dev->data->dev_conf.rxmode.max_rx_pkt_len;
> > +		int ret = check_lro_pkt_size(port_id,
> > +				dev->data->dev_conf.rxmode.max_lro_pkt_size,
> > +				dev->data->dev_conf.rxmode.max_rx_pkt_len,
> > +				dev_info.max_lro_pkt_size);
> > +		if (ret != 0)
> > +			return ret;
> > +	}
> 
> <end-of-huge-dup>
> 
> IMO It is not acceptable to duplication so much code.
> It is simply unmaintainable.
> 
> NACK

Can it be solved by making rte_eth_rx_queue_setup() a wrapper
on top of this new rte_eth_rxseg_queue_setup() ?
  
Andrew Rybchenko Oct. 12, 2020, 5:11 p.m. UTC | #3
On 10/12/20 8:03 PM, Thomas Monjalon wrote:
> 12/10/2020 18:38, Andrew Rybchenko:
>> On 10/12/20 7:19 PM, Viacheslav Ovsiienko wrote:
>>>  int
>>> +rte_eth_rxseg_queue_setup(uint16_t port_id, uint16_t rx_queue_id,
>>> +			  uint16_t nb_rx_desc, unsigned int socket_id,
>>> +			  const struct rte_eth_rxconf *rx_conf,
>>> +			  const struct rte_eth_rxseg *rx_seg, uint16_t n_seg)
>>> +{
>>> +	int ret;
>>> +	uint16_t seg_idx;
>>> +	uint32_t mbp_buf_size;
>>
>> <start-of-dup>
>>
>>> +	struct rte_eth_dev *dev;
>>> +	struct rte_eth_dev_info dev_info;
>>> +	struct rte_eth_rxconf local_conf;
>>> +	void **rxq;
>>> +
>>> +	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -EINVAL);
>>> +
>>> +	dev = &rte_eth_devices[port_id];
>>> +	if (rx_queue_id >= dev->data->nb_rx_queues) {
>>> +		RTE_ETHDEV_LOG(ERR, "Invalid RX queue_id=%u\n", rx_queue_id);
>>> +		return -EINVAL;
>>> +	}
>>
>> <end-of-dup>
>>
>>> +
>>> +	if (rx_seg == NULL) {
>>> +		RTE_ETHDEV_LOG(ERR, "Invalid null description pointer\n");
>>> +		return -EINVAL;
>>> +	}
>>> +
>>> +	if (n_seg == 0) {
>>> +		RTE_ETHDEV_LOG(ERR, "Invalid zero description number\n");
>>> +		return -EINVAL;
>>> +	}
>>> +
>>> +	RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rxseg_queue_setup, -ENOTSUP);
>>> +
>>
>> <start-of-dup>
>>
>>> +	/*
>>> +	 * Check the size of the mbuf data buffer.
>>> +	 * This value must be provided in the private data of the memory pool.
>>> +	 * First check that the memory pool has a valid private data.
>>> +	 */
>>> +	ret = rte_eth_dev_info_get(port_id, &dev_info);
>>> +	if (ret != 0)
>>> +		return ret;
>>
>> <end-of-dup>
>>
>>> +
>>> +	for (seg_idx = 0; seg_idx < n_seg; seg_idx++) {
>>> +		struct rte_mempool *mp = rx_seg[seg_idx].mp;
>>> +
>>> +		if (mp->private_data_size <
>>> +				sizeof(struct rte_pktmbuf_pool_private)) {
>>> +			RTE_ETHDEV_LOG(ERR, "%s private_data_size %d < %d\n",
>>> +				mp->name, (int)mp->private_data_size,
>>> +				(int)sizeof(struct rte_pktmbuf_pool_private));
>>> +			return -ENOSPC;
>>> +		}
>>> +
>>> +		mbp_buf_size = rte_pktmbuf_data_room_size(mp);
>>> +		if (mbp_buf_size < rx_seg[seg_idx].length +
>>> +				   rx_seg[seg_idx].offset +
>>> +				   (seg_idx ? 0 :
>>> +				    (uint32_t)RTE_PKTMBUF_HEADROOM)) {
>>> +			RTE_ETHDEV_LOG(ERR,
>>> +				"%s mbuf_data_room_size %d < %d"
>>> +				" (segment length=%d + segment offset=%d)\n",
>>> +				mp->name, (int)mbp_buf_size,
>>> +				(int)(rx_seg[seg_idx].length +
>>> +				      rx_seg[seg_idx].offset),
>>> +				(int)rx_seg[seg_idx].length,
>>> +				(int)rx_seg[seg_idx].offset);
>>> +			return -EINVAL;
>>> +		}
>>> +	}
>>> +
>>
>> <start-of-huge-dup>
>>
>>> +	/* Use default specified by driver, if nb_rx_desc is zero */
>>> +	if (nb_rx_desc == 0) {
>>> +		nb_rx_desc = dev_info.default_rxportconf.ring_size;
>>> +		/* If driver default is also zero, fall back on EAL default */
>>> +		if (nb_rx_desc == 0)
>>> +			nb_rx_desc = RTE_ETH_DEV_FALLBACK_RX_RINGSIZE;
>>> +	}
>>> +
>>> +	if (nb_rx_desc > dev_info.rx_desc_lim.nb_max ||
>>> +			nb_rx_desc < dev_info.rx_desc_lim.nb_min ||
>>> +			nb_rx_desc % dev_info.rx_desc_lim.nb_align != 0) {
>>> +
>>> +		RTE_ETHDEV_LOG(ERR,
>>> +			"Invalid value for nb_rx_desc(=%hu), should be: "
>>> +			"<= %hu, >= %hu, and a product of %hu\n",
>>> +			nb_rx_desc, dev_info.rx_desc_lim.nb_max,
>>> +			dev_info.rx_desc_lim.nb_min,
>>> +			dev_info.rx_desc_lim.nb_align);
>>> +		return -EINVAL;
>>> +	}
>>> +
>>> +	if (dev->data->dev_started &&
>>> +		!(dev_info.dev_capa &
>>> +			RTE_ETH_DEV_CAPA_RUNTIME_RX_QUEUE_SETUP))
>>> +		return -EBUSY;
>>> +
>>> +	if (dev->data->dev_started &&
>>> +		(dev->data->rx_queue_state[rx_queue_id] !=
>>> +			RTE_ETH_QUEUE_STATE_STOPPED))
>>> +		return -EBUSY;
>>> +
>>> +	rxq = dev->data->rx_queues;
>>> +	if (rxq[rx_queue_id]) {
>>> +		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_release,
>>> +					-ENOTSUP);
>>> +		(*dev->dev_ops->rx_queue_release)(rxq[rx_queue_id]);
>>> +		rxq[rx_queue_id] = NULL;
>>> +	}
>>> +
>>> +	if (rx_conf == NULL)
>>> +		rx_conf = &dev_info.default_rxconf;
>>> +
>>> +	local_conf = *rx_conf;
>>> +
>>> +	/*
>>> +	 * If an offloading has already been enabled in
>>> +	 * rte_eth_dev_configure(), it has been enabled on all queues,
>>> +	 * so there is no need to enable it in this queue again.
>>> +	 * The local_conf.offloads input to underlying PMD only carries
>>> +	 * those offloadings which are only enabled on this queue and
>>> +	 * not enabled on all queues.
>>> +	 */
>>> +	local_conf.offloads &= ~dev->data->dev_conf.rxmode.offloads;
>>> +
>>> +	/*
>>> +	 * New added offloadings for this queue are those not enabled in
>>> +	 * rte_eth_dev_configure() and they must be per-queue type.
>>> +	 * A pure per-port offloading can't be enabled on a queue while
>>> +	 * disabled on another queue. A pure per-port offloading can't
>>> +	 * be enabled for any queue as new added one if it hasn't been
>>> +	 * enabled in rte_eth_dev_configure().
>>> +	 */
>>> +	if ((local_conf.offloads & dev_info.rx_queue_offload_capa) !=
>>> +	     local_conf.offloads) {
>>> +		RTE_ETHDEV_LOG(ERR,
>>> +			"Ethdev port_id=%d rx_queue_id=%d, new added offloads"
>>> +			" 0x%"PRIx64" must be within per-queue offload"
>>> +			" capabilities 0x%"PRIx64" in %s()\n",
>>> +			port_id, rx_queue_id, local_conf.offloads,
>>> +			dev_info.rx_queue_offload_capa,
>>> +			__func__);
>>> +		return -EINVAL;
>>> +	}
>>> +
>>> +	/*
>>> +	 * If LRO is enabled, check that the maximum aggregated packet
>>> +	 * size is supported by the configured device.
>>> +	 */
>>> +	if (local_conf.offloads & DEV_RX_OFFLOAD_TCP_LRO) {
>>> +		if (dev->data->dev_conf.rxmode.max_lro_pkt_size == 0)
>>> +			dev->data->dev_conf.rxmode.max_lro_pkt_size =
>>> +				dev->data->dev_conf.rxmode.max_rx_pkt_len;
>>> +		int ret = check_lro_pkt_size(port_id,
>>> +				dev->data->dev_conf.rxmode.max_lro_pkt_size,
>>> +				dev->data->dev_conf.rxmode.max_rx_pkt_len,
>>> +				dev_info.max_lro_pkt_size);
>>> +		if (ret != 0)
>>> +			return ret;
>>> +	}
>>
>> <end-of-huge-dup>
>>
>> IMO It is not acceptable to duplication so much code.
>> It is simply unmaintainable.
>>
>> NACK
> 
> Can it be solved by making rte_eth_rx_queue_setup() a wrapper
> on top of this new rte_eth_rxseg_queue_setup() ?
> 

Could be, but strictly speaking it will break arguments
validation order and error reporting in various cases.
So, refactoring is required to keep it consistent.
  
Slava Ovsiienko Oct. 12, 2020, 5:11 p.m. UTC | #4
> -----Original Message-----
> From: Thomas Monjalon <thomas@monjalon.net>
> Sent: Monday, October 12, 2020 20:03
> To: Slava Ovsiienko <viacheslavo@nvidia.com>; Andrew Rybchenko
> <andrew.rybchenko@oktetlabs.ru>
> Cc: dev@dpdk.org; stephen@networkplumber.org; ferruh.yigit@intel.com;
> olivier.matz@6wind.com; jerinjacobk@gmail.com;
> maxime.coquelin@redhat.com; david.marchand@redhat.com
> Subject: Re: [dpdk-dev] [PATCH v3 1/9] ethdev: introduce Rx buffer split
> 
> 12/10/2020 18:38, Andrew Rybchenko:
> > On 10/12/20 7:19 PM, Viacheslav Ovsiienko wrote:
> > >  int
> > > +rte_eth_rxseg_queue_setup(uint16_t port_id, uint16_t rx_queue_id,
> > > +			  uint16_t nb_rx_desc, unsigned int socket_id,
> > > +			  const struct rte_eth_rxconf *rx_conf,
> > > +			  const struct rte_eth_rxseg *rx_seg, uint16_t n_seg) {
> > > +	int ret;
> > > +	uint16_t seg_idx;
> > > +	uint32_t mbp_buf_size;
> >
> > <start-of-dup>
> >
> > > +	struct rte_eth_dev *dev;
> > > +	struct rte_eth_dev_info dev_info;
> > > +	struct rte_eth_rxconf local_conf;
> > > +	void **rxq;
> > > +
> > > +	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -EINVAL);
> > > +
> > > +	dev = &rte_eth_devices[port_id];
> > > +	if (rx_queue_id >= dev->data->nb_rx_queues) {
> > > +		RTE_ETHDEV_LOG(ERR, "Invalid RX queue_id=%u\n",
> rx_queue_id);
> > > +		return -EINVAL;
> > > +	}
> >
> > <end-of-dup>
> >
> > > +
> > > +	if (rx_seg == NULL) {
> > > +		RTE_ETHDEV_LOG(ERR, "Invalid null description pointer\n");
> > > +		return -EINVAL;
> > > +	}
> > > +
> > > +	if (n_seg == 0) {
> > > +		RTE_ETHDEV_LOG(ERR, "Invalid zero description
> number\n");
> > > +		return -EINVAL;
> > > +	}
> > > +
> > > +	RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rxseg_queue_setup,
> > > +-ENOTSUP);
> > > +
> >
> > <start-of-dup>
> >
> > > +	/*
> > > +	 * Check the size of the mbuf data buffer.
> > > +	 * This value must be provided in the private data of the memory
> pool.
> > > +	 * First check that the memory pool has a valid private data.
> > > +	 */
> > > +	ret = rte_eth_dev_info_get(port_id, &dev_info);
> > > +	if (ret != 0)
> > > +		return ret;
> >
> > <end-of-dup>
> >
> > > +
> > > +	for (seg_idx = 0; seg_idx < n_seg; seg_idx++) {
> > > +		struct rte_mempool *mp = rx_seg[seg_idx].mp;
> > > +
> > > +		if (mp->private_data_size <
> > > +				sizeof(struct rte_pktmbuf_pool_private)) {
> > > +			RTE_ETHDEV_LOG(ERR, "%s private_data_size %d <
> %d\n",
> > > +				mp->name, (int)mp->private_data_size,
> > > +				(int)sizeof(struct
> rte_pktmbuf_pool_private));
> > > +			return -ENOSPC;
> > > +		}
> > > +
> > > +		mbp_buf_size = rte_pktmbuf_data_room_size(mp);
> > > +		if (mbp_buf_size < rx_seg[seg_idx].length +
> > > +				   rx_seg[seg_idx].offset +
> > > +				   (seg_idx ? 0 :
> > > +				    (uint32_t)RTE_PKTMBUF_HEADROOM)) {
> > > +			RTE_ETHDEV_LOG(ERR,
> > > +				"%s mbuf_data_room_size %d < %d"
> > > +				" (segment length=%d + segment
> offset=%d)\n",
> > > +				mp->name, (int)mbp_buf_size,
> > > +				(int)(rx_seg[seg_idx].length +
> > > +				      rx_seg[seg_idx].offset),
> > > +				(int)rx_seg[seg_idx].length,
> > > +				(int)rx_seg[seg_idx].offset);
> > > +			return -EINVAL;
> > > +		}
> > > +	}
> > > +
> >
> > <start-of-huge-dup>
> >
> > > +	/* Use default specified by driver, if nb_rx_desc is zero */
> > > +	if (nb_rx_desc == 0) {
> > > +		nb_rx_desc = dev_info.default_rxportconf.ring_size;
> > > +		/* If driver default is also zero, fall back on EAL default */
> > > +		if (nb_rx_desc == 0)
> > > +			nb_rx_desc =
> RTE_ETH_DEV_FALLBACK_RX_RINGSIZE;
> > > +	}
> > > +
> > > +	if (nb_rx_desc > dev_info.rx_desc_lim.nb_max ||
> > > +			nb_rx_desc < dev_info.rx_desc_lim.nb_min ||
> > > +			nb_rx_desc % dev_info.rx_desc_lim.nb_align != 0) {
> > > +
> > > +		RTE_ETHDEV_LOG(ERR,
> > > +			"Invalid value for nb_rx_desc(=%hu), should be: "
> > > +			"<= %hu, >= %hu, and a product of %hu\n",
> > > +			nb_rx_desc, dev_info.rx_desc_lim.nb_max,
> > > +			dev_info.rx_desc_lim.nb_min,
> > > +			dev_info.rx_desc_lim.nb_align);
> > > +		return -EINVAL;
> > > +	}
> > > +
> > > +	if (dev->data->dev_started &&
> > > +		!(dev_info.dev_capa &
> > > +			RTE_ETH_DEV_CAPA_RUNTIME_RX_QUEUE_SETUP))
> > > +		return -EBUSY;
> > > +
> > > +	if (dev->data->dev_started &&
> > > +		(dev->data->rx_queue_state[rx_queue_id] !=
> > > +			RTE_ETH_QUEUE_STATE_STOPPED))
> > > +		return -EBUSY;
> > > +
> > > +	rxq = dev->data->rx_queues;
> > > +	if (rxq[rx_queue_id]) {
> > > +		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops-
> >rx_queue_release,
> > > +					-ENOTSUP);
> > > +		(*dev->dev_ops->rx_queue_release)(rxq[rx_queue_id]);
> > > +		rxq[rx_queue_id] = NULL;
> > > +	}
> > > +
> > > +	if (rx_conf == NULL)
> > > +		rx_conf = &dev_info.default_rxconf;
> > > +
> > > +	local_conf = *rx_conf;
> > > +
> > > +	/*
> > > +	 * If an offloading has already been enabled in
> > > +	 * rte_eth_dev_configure(), it has been enabled on all queues,
> > > +	 * so there is no need to enable it in this queue again.
> > > +	 * The local_conf.offloads input to underlying PMD only carries
> > > +	 * those offloadings which are only enabled on this queue and
> > > +	 * not enabled on all queues.
> > > +	 */
> > > +	local_conf.offloads &= ~dev->data->dev_conf.rxmode.offloads;
> > > +
> > > +	/*
> > > +	 * New added offloadings for this queue are those not enabled in
> > > +	 * rte_eth_dev_configure() and they must be per-queue type.
> > > +	 * A pure per-port offloading can't be enabled on a queue while
> > > +	 * disabled on another queue. A pure per-port offloading can't
> > > +	 * be enabled for any queue as new added one if it hasn't been
> > > +	 * enabled in rte_eth_dev_configure().
> > > +	 */
> > > +	if ((local_conf.offloads & dev_info.rx_queue_offload_capa) !=
> > > +	     local_conf.offloads) {
> > > +		RTE_ETHDEV_LOG(ERR,
> > > +			"Ethdev port_id=%d rx_queue_id=%d, new added
> offloads"
> > > +			" 0x%"PRIx64" must be within per-queue offload"
> > > +			" capabilities 0x%"PRIx64" in %s()\n",
> > > +			port_id, rx_queue_id, local_conf.offloads,
> > > +			dev_info.rx_queue_offload_capa,
> > > +			__func__);
> > > +		return -EINVAL;
> > > +	}
> > > +
> > > +	/*
> > > +	 * If LRO is enabled, check that the maximum aggregated packet
> > > +	 * size is supported by the configured device.
> > > +	 */
> > > +	if (local_conf.offloads & DEV_RX_OFFLOAD_TCP_LRO) {
> > > +		if (dev->data->dev_conf.rxmode.max_lro_pkt_size == 0)
> > > +			dev->data->dev_conf.rxmode.max_lro_pkt_size =
> > > +				dev->data-
> >dev_conf.rxmode.max_rx_pkt_len;
> > > +		int ret = check_lro_pkt_size(port_id,
> > > +				dev->data-
> >dev_conf.rxmode.max_lro_pkt_size,
> > > +				dev->data-
> >dev_conf.rxmode.max_rx_pkt_len,
> > > +				dev_info.max_lro_pkt_size);
> > > +		if (ret != 0)
> > > +			return ret;
> > > +	}
> >
> > <end-of-huge-dup>
> >
> > IMO It is not acceptable to duplication so much code.
> > It is simply unmaintainable.
> >
> > NACK
> 
> Can it be solved by making rte_eth_rx_queue_setup() a wrapper on top of
> this new rte_eth_rxseg_queue_setup() ?
> 
It would be the code refactoring. The more simple solution - provide the subroutine to perform the common part of parameters check.

It seems there are no strong decision-making pro's and con's for these two approaches.
As I said - from my side the main concern of including segment descriptions into config structure
is introducing ambiguity of some kind. But, if we decide to switch to this approach - will handle.

With best regards, Slava
  
Slava Ovsiienko Oct. 12, 2020, 8:22 p.m. UTC | #5
Hi, Andrew

You are right - the code duplication of rte_eth_rx_queue_setup() code was large
and it did not look well indeed.

I've updated the code, now rte_eth_rx_queue_setup() and rte_eth_rxseg_queue_setup()
share the underlying internal routine __rte_eth_rx_queue_setup().

Of course, there is some refactoring, but it is merely straightforward, and I hope you
will find it acceptable, please see the v4 of the patchset.

As I said, I do not see the decision-making con or pro for the case.
Anyway, if we decide to move the segment descriptions to the config struct - there is
just small step remaining over existing code to implement you that approach.

With best regards, Slava

> -----Original Message-----
> From: Andrew Rybchenko <andrew.rybchenko@oktetlabs.ru>
> Sent: Monday, October 12, 2020 20:11
> To: NBU-Contact-Thomas Monjalon <thomas@monjalon.net>; Slava
> Ovsiienko <viacheslavo@nvidia.com>
> Cc: dev@dpdk.org; stephen@networkplumber.org; ferruh.yigit@intel.com;
> olivier.matz@6wind.com; jerinjacobk@gmail.com;
> maxime.coquelin@redhat.com; david.marchand@redhat.com
> Subject: Re: [dpdk-dev] [PATCH v3 1/9] ethdev: introduce Rx buffer split
> 
> On 10/12/20 8:03 PM, Thomas Monjalon wrote:
> > 12/10/2020 18:38, Andrew Rybchenko:
> >> On 10/12/20 7:19 PM, Viacheslav Ovsiienko wrote:
> >>>  int
> >>> +rte_eth_rxseg_queue_setup(uint16_t port_id, uint16_t rx_queue_id,
> >>> +			  uint16_t nb_rx_desc, unsigned int socket_id,
> >>> +			  const struct rte_eth_rxconf *rx_conf,
> >>> +			  const struct rte_eth_rxseg *rx_seg, uint16_t n_seg) {
> >>> +	int ret;
> >>> +	uint16_t seg_idx;
> >>> +	uint32_t mbp_buf_size;
> >>
> >> <start-of-dup>
> >>
> >>> +	struct rte_eth_dev *dev;
> >>> +	struct rte_eth_dev_info dev_info;
> >>> +	struct rte_eth_rxconf local_conf;
> >>> +	void **rxq;
> >>> +
> >>> +	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -EINVAL);
> >>> +
> >>> +	dev = &rte_eth_devices[port_id];
> >>> +	if (rx_queue_id >= dev->data->nb_rx_queues) {
> >>> +		RTE_ETHDEV_LOG(ERR, "Invalid RX queue_id=%u\n",
> rx_queue_id);
> >>> +		return -EINVAL;
> >>> +	}
> >>
> >> <end-of-dup>
> >>
> >>> +
> >>> +	if (rx_seg == NULL) {
> >>> +		RTE_ETHDEV_LOG(ERR, "Invalid null description pointer\n");
> >>> +		return -EINVAL;
> >>> +	}
> >>> +
> >>> +	if (n_seg == 0) {
> >>> +		RTE_ETHDEV_LOG(ERR, "Invalid zero description
> number\n");
> >>> +		return -EINVAL;
> >>> +	}
> >>> +
> >>> +	RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rxseg_queue_setup,
> >>> +-ENOTSUP);
> >>> +
> >>
> >> <start-of-dup>
> >>
> >>> +	/*
> >>> +	 * Check the size of the mbuf data buffer.
> >>> +	 * This value must be provided in the private data of the memory
> pool.
> >>> +	 * First check that the memory pool has a valid private data.
> >>> +	 */
> >>> +	ret = rte_eth_dev_info_get(port_id, &dev_info);
> >>> +	if (ret != 0)
> >>> +		return ret;
> >>
> >> <end-of-dup>
> >>
> >>> +
> >>> +	for (seg_idx = 0; seg_idx < n_seg; seg_idx++) {
> >>> +		struct rte_mempool *mp = rx_seg[seg_idx].mp;
> >>> +
> >>> +		if (mp->private_data_size <
> >>> +				sizeof(struct rte_pktmbuf_pool_private)) {
> >>> +			RTE_ETHDEV_LOG(ERR, "%s private_data_size %d <
> %d\n",
> >>> +				mp->name, (int)mp->private_data_size,
> >>> +				(int)sizeof(struct
> rte_pktmbuf_pool_private));
> >>> +			return -ENOSPC;
> >>> +		}
> >>> +
> >>> +		mbp_buf_size = rte_pktmbuf_data_room_size(mp);
> >>> +		if (mbp_buf_size < rx_seg[seg_idx].length +
> >>> +				   rx_seg[seg_idx].offset +
> >>> +				   (seg_idx ? 0 :
> >>> +				    (uint32_t)RTE_PKTMBUF_HEADROOM)) {
> >>> +			RTE_ETHDEV_LOG(ERR,
> >>> +				"%s mbuf_data_room_size %d < %d"
> >>> +				" (segment length=%d + segment
> offset=%d)\n",
> >>> +				mp->name, (int)mbp_buf_size,
> >>> +				(int)(rx_seg[seg_idx].length +
> >>> +				      rx_seg[seg_idx].offset),
> >>> +				(int)rx_seg[seg_idx].length,
> >>> +				(int)rx_seg[seg_idx].offset);
> >>> +			return -EINVAL;
> >>> +		}
> >>> +	}
> >>> +
> >>
> >> <start-of-huge-dup>
> >>
> >>> +	/* Use default specified by driver, if nb_rx_desc is zero */
> >>> +	if (nb_rx_desc == 0) {
> >>> +		nb_rx_desc = dev_info.default_rxportconf.ring_size;
> >>> +		/* If driver default is also zero, fall back on EAL default */
> >>> +		if (nb_rx_desc == 0)
> >>> +			nb_rx_desc =
> RTE_ETH_DEV_FALLBACK_RX_RINGSIZE;
> >>> +	}
> >>> +
> >>> +	if (nb_rx_desc > dev_info.rx_desc_lim.nb_max ||
> >>> +			nb_rx_desc < dev_info.rx_desc_lim.nb_min ||
> >>> +			nb_rx_desc % dev_info.rx_desc_lim.nb_align != 0) {
> >>> +
> >>> +		RTE_ETHDEV_LOG(ERR,
> >>> +			"Invalid value for nb_rx_desc(=%hu), should be: "
> >>> +			"<= %hu, >= %hu, and a product of %hu\n",
> >>> +			nb_rx_desc, dev_info.rx_desc_lim.nb_max,
> >>> +			dev_info.rx_desc_lim.nb_min,
> >>> +			dev_info.rx_desc_lim.nb_align);
> >>> +		return -EINVAL;
> >>> +	}
> >>> +
> >>> +	if (dev->data->dev_started &&
> >>> +		!(dev_info.dev_capa &
> >>> +			RTE_ETH_DEV_CAPA_RUNTIME_RX_QUEUE_SETUP))
> >>> +		return -EBUSY;
> >>> +
> >>> +	if (dev->data->dev_started &&
> >>> +		(dev->data->rx_queue_state[rx_queue_id] !=
> >>> +			RTE_ETH_QUEUE_STATE_STOPPED))
> >>> +		return -EBUSY;
> >>> +
> >>> +	rxq = dev->data->rx_queues;
> >>> +	if (rxq[rx_queue_id]) {
> >>> +		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops-
> >rx_queue_release,
> >>> +					-ENOTSUP);
> >>> +		(*dev->dev_ops->rx_queue_release)(rxq[rx_queue_id]);
> >>> +		rxq[rx_queue_id] = NULL;
> >>> +	}
> >>> +
> >>> +	if (rx_conf == NULL)
> >>> +		rx_conf = &dev_info.default_rxconf;
> >>> +
> >>> +	local_conf = *rx_conf;
> >>> +
> >>> +	/*
> >>> +	 * If an offloading has already been enabled in
> >>> +	 * rte_eth_dev_configure(), it has been enabled on all queues,
> >>> +	 * so there is no need to enable it in this queue again.
> >>> +	 * The local_conf.offloads input to underlying PMD only carries
> >>> +	 * those offloadings which are only enabled on this queue and
> >>> +	 * not enabled on all queues.
> >>> +	 */
> >>> +	local_conf.offloads &= ~dev->data->dev_conf.rxmode.offloads;
> >>> +
> >>> +	/*
> >>> +	 * New added offloadings for this queue are those not enabled in
> >>> +	 * rte_eth_dev_configure() and they must be per-queue type.
> >>> +	 * A pure per-port offloading can't be enabled on a queue while
> >>> +	 * disabled on another queue. A pure per-port offloading can't
> >>> +	 * be enabled for any queue as new added one if it hasn't been
> >>> +	 * enabled in rte_eth_dev_configure().
> >>> +	 */
> >>> +	if ((local_conf.offloads & dev_info.rx_queue_offload_capa) !=
> >>> +	     local_conf.offloads) {
> >>> +		RTE_ETHDEV_LOG(ERR,
> >>> +			"Ethdev port_id=%d rx_queue_id=%d, new added
> offloads"
> >>> +			" 0x%"PRIx64" must be within per-queue offload"
> >>> +			" capabilities 0x%"PRIx64" in %s()\n",
> >>> +			port_id, rx_queue_id, local_conf.offloads,
> >>> +			dev_info.rx_queue_offload_capa,
> >>> +			__func__);
> >>> +		return -EINVAL;
> >>> +	}
> >>> +
> >>> +	/*
> >>> +	 * If LRO is enabled, check that the maximum aggregated packet
> >>> +	 * size is supported by the configured device.
> >>> +	 */
> >>> +	if (local_conf.offloads & DEV_RX_OFFLOAD_TCP_LRO) {
> >>> +		if (dev->data->dev_conf.rxmode.max_lro_pkt_size == 0)
> >>> +			dev->data->dev_conf.rxmode.max_lro_pkt_size =
> >>> +				dev->data-
> >dev_conf.rxmode.max_rx_pkt_len;
> >>> +		int ret = check_lro_pkt_size(port_id,
> >>> +				dev->data-
> >dev_conf.rxmode.max_lro_pkt_size,
> >>> +				dev->data-
> >dev_conf.rxmode.max_rx_pkt_len,
> >>> +				dev_info.max_lro_pkt_size);
> >>> +		if (ret != 0)
> >>> +			return ret;
> >>> +	}
> >>
> >> <end-of-huge-dup>
> >>
> >> IMO It is not acceptable to duplication so much code.
> >> It is simply unmaintainable.
> >>
> >> NACK
> >
> > Can it be solved by making rte_eth_rx_queue_setup() a wrapper on top
> > of this new rte_eth_rxseg_queue_setup() ?
> >
> 
> Could be, but strictly speaking it will break arguments validation order and
> error reporting in various cases.
> So, refactoring is required to keep it consistent.
  

Patch

diff --git a/doc/guides/nics/features.rst b/doc/guides/nics/features.rst
index dd8c955..21b91db 100644
--- a/doc/guides/nics/features.rst
+++ b/doc/guides/nics/features.rst
@@ -185,6 +185,21 @@  Supports receiving segmented mbufs.
 * **[related]    eth_dev_ops**: ``rx_pkt_burst``.
 
 
+.. _nic_features_buffer_split:
+
+Buffer Split on Rx
+------------
+
+Scatters the packets being received on specified boundaries to segmented mbufs.
+
+* **[uses]       rte_eth_rxconf,rte_eth_rxmode**: ``offloads:RTE_ETH_RX_OFFLOAD_BUFFER_SPLIT``.
+* **[implements] datapath**: ``Buffer Split functionality``.
+* **[implements] rte_eth_dev_data**: ``buffer_split``.
+* **[provides]   rte_eth_dev_info**: ``rx_offload_capa:RTE_ETH_RX_OFFLOAD_BUFFER_SPLIT``.
+* **[provides]   eth_dev_ops**: ``rxq_info_get:buffer_split``.
+* **[related] API**: ``rte_eth_rxseg_queue_setup()``.
+
+
 .. _nic_features_lro:
 
 LRO
diff --git a/doc/guides/rel_notes/release_20_11.rst b/doc/guides/rel_notes/release_20_11.rst
index 2cec9dd..d87247a 100644
--- a/doc/guides/rel_notes/release_20_11.rst
+++ b/doc/guides/rel_notes/release_20_11.rst
@@ -60,6 +60,12 @@  New Features
   Added the FEC API which provides functions for query FEC capabilities and
   current FEC mode from device. Also, API for configuring FEC mode is also provided.
 
+* **Introduced extended buffer description for receiving.**
+
+  Added the extended Rx queue setup routine providing the individual
+  descriptions for each Rx segment with maximal size, buffer offset and memory
+  pool to allocate data buffers from.
+
 * **Updated Broadcom bnxt driver.**
 
   Updated the Broadcom bnxt driver with new features and improvements, including:
diff --git a/lib/librte_ethdev/rte_ethdev.c b/lib/librte_ethdev/rte_ethdev.c
index 59beb8a..3a55567 100644
--- a/lib/librte_ethdev/rte_ethdev.c
+++ b/lib/librte_ethdev/rte_ethdev.c
@@ -105,6 +105,9 @@  struct rte_eth_xstats_name_off {
 #define RTE_RX_OFFLOAD_BIT2STR(_name)	\
 	{ DEV_RX_OFFLOAD_##_name, #_name }
 
+#define RTE_ETH_RX_OFFLOAD_BIT2STR(_name)	\
+	{ RTE_ETH_RX_OFFLOAD_##_name, #_name }
+
 static const struct {
 	uint64_t offload;
 	const char *name;
@@ -128,9 +131,11 @@  struct rte_eth_xstats_name_off {
 	RTE_RX_OFFLOAD_BIT2STR(SCTP_CKSUM),
 	RTE_RX_OFFLOAD_BIT2STR(OUTER_UDP_CKSUM),
 	RTE_RX_OFFLOAD_BIT2STR(RSS_HASH),
+	RTE_ETH_RX_OFFLOAD_BIT2STR(BUFFER_SPLIT),
 };
 
 #undef RTE_RX_OFFLOAD_BIT2STR
+#undef RTE_ETH_RX_OFFLOAD_BIT2STR
 
 #define RTE_TX_OFFLOAD_BIT2STR(_name)	\
 	{ DEV_TX_OFFLOAD_##_name, #_name }
@@ -1920,6 +1925,179 @@  struct rte_eth_dev *
 }
 
 int
+rte_eth_rxseg_queue_setup(uint16_t port_id, uint16_t rx_queue_id,
+			  uint16_t nb_rx_desc, unsigned int socket_id,
+			  const struct rte_eth_rxconf *rx_conf,
+			  const struct rte_eth_rxseg *rx_seg, uint16_t n_seg)
+{
+	int ret;
+	uint16_t seg_idx;
+	uint32_t mbp_buf_size;
+	struct rte_eth_dev *dev;
+	struct rte_eth_dev_info dev_info;
+	struct rte_eth_rxconf local_conf;
+	void **rxq;
+
+	RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -EINVAL);
+
+	dev = &rte_eth_devices[port_id];
+	if (rx_queue_id >= dev->data->nb_rx_queues) {
+		RTE_ETHDEV_LOG(ERR, "Invalid RX queue_id=%u\n", rx_queue_id);
+		return -EINVAL;
+	}
+
+	if (rx_seg == NULL) {
+		RTE_ETHDEV_LOG(ERR, "Invalid null description pointer\n");
+		return -EINVAL;
+	}
+
+	if (n_seg == 0) {
+		RTE_ETHDEV_LOG(ERR, "Invalid zero description number\n");
+		return -EINVAL;
+	}
+
+	RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rxseg_queue_setup, -ENOTSUP);
+
+	/*
+	 * Check the size of the mbuf data buffer.
+	 * This value must be provided in the private data of the memory pool.
+	 * First check that the memory pool has a valid private data.
+	 */
+	ret = rte_eth_dev_info_get(port_id, &dev_info);
+	if (ret != 0)
+		return ret;
+
+	for (seg_idx = 0; seg_idx < n_seg; seg_idx++) {
+		struct rte_mempool *mp = rx_seg[seg_idx].mp;
+
+		if (mp->private_data_size <
+				sizeof(struct rte_pktmbuf_pool_private)) {
+			RTE_ETHDEV_LOG(ERR, "%s private_data_size %d < %d\n",
+				mp->name, (int)mp->private_data_size,
+				(int)sizeof(struct rte_pktmbuf_pool_private));
+			return -ENOSPC;
+		}
+
+		mbp_buf_size = rte_pktmbuf_data_room_size(mp);
+		if (mbp_buf_size < rx_seg[seg_idx].length +
+				   rx_seg[seg_idx].offset +
+				   (seg_idx ? 0 :
+				    (uint32_t)RTE_PKTMBUF_HEADROOM)) {
+			RTE_ETHDEV_LOG(ERR,
+				"%s mbuf_data_room_size %d < %d"
+				" (segment length=%d + segment offset=%d)\n",
+				mp->name, (int)mbp_buf_size,
+				(int)(rx_seg[seg_idx].length +
+				      rx_seg[seg_idx].offset),
+				(int)rx_seg[seg_idx].length,
+				(int)rx_seg[seg_idx].offset);
+			return -EINVAL;
+		}
+	}
+
+	/* Use default specified by driver, if nb_rx_desc is zero */
+	if (nb_rx_desc == 0) {
+		nb_rx_desc = dev_info.default_rxportconf.ring_size;
+		/* If driver default is also zero, fall back on EAL default */
+		if (nb_rx_desc == 0)
+			nb_rx_desc = RTE_ETH_DEV_FALLBACK_RX_RINGSIZE;
+	}
+
+	if (nb_rx_desc > dev_info.rx_desc_lim.nb_max ||
+			nb_rx_desc < dev_info.rx_desc_lim.nb_min ||
+			nb_rx_desc % dev_info.rx_desc_lim.nb_align != 0) {
+
+		RTE_ETHDEV_LOG(ERR,
+			"Invalid value for nb_rx_desc(=%hu), should be: "
+			"<= %hu, >= %hu, and a product of %hu\n",
+			nb_rx_desc, dev_info.rx_desc_lim.nb_max,
+			dev_info.rx_desc_lim.nb_min,
+			dev_info.rx_desc_lim.nb_align);
+		return -EINVAL;
+	}
+
+	if (dev->data->dev_started &&
+		!(dev_info.dev_capa &
+			RTE_ETH_DEV_CAPA_RUNTIME_RX_QUEUE_SETUP))
+		return -EBUSY;
+
+	if (dev->data->dev_started &&
+		(dev->data->rx_queue_state[rx_queue_id] !=
+			RTE_ETH_QUEUE_STATE_STOPPED))
+		return -EBUSY;
+
+	rxq = dev->data->rx_queues;
+	if (rxq[rx_queue_id]) {
+		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_release,
+					-ENOTSUP);
+		(*dev->dev_ops->rx_queue_release)(rxq[rx_queue_id]);
+		rxq[rx_queue_id] = NULL;
+	}
+
+	if (rx_conf == NULL)
+		rx_conf = &dev_info.default_rxconf;
+
+	local_conf = *rx_conf;
+
+	/*
+	 * If an offloading has already been enabled in
+	 * rte_eth_dev_configure(), it has been enabled on all queues,
+	 * so there is no need to enable it in this queue again.
+	 * The local_conf.offloads input to underlying PMD only carries
+	 * those offloadings which are only enabled on this queue and
+	 * not enabled on all queues.
+	 */
+	local_conf.offloads &= ~dev->data->dev_conf.rxmode.offloads;
+
+	/*
+	 * New added offloadings for this queue are those not enabled in
+	 * rte_eth_dev_configure() and they must be per-queue type.
+	 * A pure per-port offloading can't be enabled on a queue while
+	 * disabled on another queue. A pure per-port offloading can't
+	 * be enabled for any queue as new added one if it hasn't been
+	 * enabled in rte_eth_dev_configure().
+	 */
+	if ((local_conf.offloads & dev_info.rx_queue_offload_capa) !=
+	     local_conf.offloads) {
+		RTE_ETHDEV_LOG(ERR,
+			"Ethdev port_id=%d rx_queue_id=%d, new added offloads"
+			" 0x%"PRIx64" must be within per-queue offload"
+			" capabilities 0x%"PRIx64" in %s()\n",
+			port_id, rx_queue_id, local_conf.offloads,
+			dev_info.rx_queue_offload_capa,
+			__func__);
+		return -EINVAL;
+	}
+
+	/*
+	 * If LRO is enabled, check that the maximum aggregated packet
+	 * size is supported by the configured device.
+	 */
+	if (local_conf.offloads & DEV_RX_OFFLOAD_TCP_LRO) {
+		if (dev->data->dev_conf.rxmode.max_lro_pkt_size == 0)
+			dev->data->dev_conf.rxmode.max_lro_pkt_size =
+				dev->data->dev_conf.rxmode.max_rx_pkt_len;
+		int ret = check_lro_pkt_size(port_id,
+				dev->data->dev_conf.rxmode.max_lro_pkt_size,
+				dev->data->dev_conf.rxmode.max_rx_pkt_len,
+				dev_info.max_lro_pkt_size);
+		if (ret != 0)
+			return ret;
+	}
+
+	ret = (*dev->dev_ops->rxseg_queue_setup)(dev, rx_queue_id, nb_rx_desc,
+						 socket_id, &local_conf,
+						 rx_seg, n_seg);
+	if (!ret) {
+		if (!dev->data->min_rx_buf_size ||
+		    dev->data->min_rx_buf_size > mbp_buf_size)
+			dev->data->min_rx_buf_size = mbp_buf_size;
+	}
+
+	return eth_err(port_id, ret);
+}
+
+int
 rte_eth_rx_hairpin_queue_setup(uint16_t port_id, uint16_t rx_queue_id,
 			       uint16_t nb_rx_desc,
 			       const struct rte_eth_hairpin_conf *conf)
diff --git a/lib/librte_ethdev/rte_ethdev.h b/lib/librte_ethdev/rte_ethdev.h
index 3a31f94..bbf25c8 100644
--- a/lib/librte_ethdev/rte_ethdev.h
+++ b/lib/librte_ethdev/rte_ethdev.h
@@ -970,6 +970,16 @@  struct rte_eth_txmode {
 };
 
 /**
+ * A structure used to configure an RX packet segment to split.
+ */
+struct rte_eth_rxseg {
+	struct rte_mempool *mp; /**< Memory pools to allocate segment from */
+	uint16_t length; /**< Segment data length, configures split point. */
+	uint16_t offset; /**< Data offset from beginning of mbuf data buffer */
+	uint32_t reserved; /**< Reserved field */
+};
+
+/**
  * A structure used to configure an RX ring of an Ethernet port.
  */
 struct rte_eth_rxconf {
@@ -1260,6 +1270,7 @@  struct rte_eth_conf {
 #define DEV_RX_OFFLOAD_SCTP_CKSUM	0x00020000
 #define DEV_RX_OFFLOAD_OUTER_UDP_CKSUM  0x00040000
 #define DEV_RX_OFFLOAD_RSS_HASH		0x00080000
+#define RTE_ETH_RX_OFFLOAD_BUFFER_SPLIT 0x00100000
 
 #define DEV_RX_OFFLOAD_CHECKSUM (DEV_RX_OFFLOAD_IPV4_CKSUM | \
 				 DEV_RX_OFFLOAD_UDP_CKSUM | \
@@ -2037,6 +2048,102 @@  int rte_eth_rx_queue_setup(uint16_t port_id, uint16_t rx_queue_id,
 		uint16_t nb_rx_desc, unsigned int socket_id,
 		const struct rte_eth_rxconf *rx_conf,
 		struct rte_mempool *mb_pool);
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice.
+ *
+ * Allocate and set up a receive queue for an Ethernet device
+ * with specifying receiving segments parameters.
+ *
+ * The function allocates a contiguous block of memory for *nb_rx_desc*
+ * receive descriptors from a memory zone associated with *socket_id*.
+ * The descriptors might be divided into groups by PMD to receive the data
+ * into multi-segment packet presented by the chain of mbufs.
+ *
+ * Each descriptor within the group is initialized accordingly with
+ * the network buffers allocated from the specified memory pool and with
+ * specified buffer offset and maximal segment length.
+ *
+ * @param port_id
+ *   The port identifier of the Ethernet device.
+ * @param rx_queue_id
+ *   The index of the receive queue to set up.
+ *   The value must be in the range [0, nb_rx_queue - 1] previously supplied
+ *   to rte_eth_dev_configure().
+ * @param nb_rx_desc
+ *   The number of receive descriptors to allocate for the receive ring.
+ * @param socket_id
+ *   The *socket_id* argument is the socket identifier in case of NUMA.
+ *   The value can be *SOCKET_ID_ANY* if there is no NUMA constraint for
+ *   the DMA memory allocated for the receive descriptors of the ring.
+ * @param rx_conf
+ *   The pointer to the configuration data to be used for the receive queue.
+ *   NULL value is allowed, in which case default RX configuration
+ *   will be used.
+ *   The *rx_conf* structure contains an *rx_thresh* structure with the values
+ *   of the Prefetch, Host, and Write-Back threshold registers of the receive
+ *   ring.
+ *   In addition it contains the hardware offloads features to activate using
+ *   the DEV_RX_OFFLOAD_* flags.
+ *   If an offloading set in rx_conf->offloads
+ *   hasn't been set in the input argument eth_conf->rxmode.offloads
+ *   to rte_eth_dev_configure(), it is a new added offloading, it must be
+ *   per-queue type and it is enabled for the queue.
+ *   No need to repeat any bit in rx_conf->offloads which has already been
+ *   enabled in rte_eth_dev_configure() at port level. An offloading enabled
+ *   at port level can't be disabled at queue level.
+ * @param rx_seg
+ *   The pointer to the array of segment descriptions, each element describes
+ *   the memory pool, maximal segment data length, initial data offset from
+ *   the beginning of data buffer in mbuf. This allow to specify the dedicated
+ *   properties for each segment in the receiving buffer - pool, buffer
+ *   offset, maximal segment size. If RTE_ETH_RX_OFFLOAD_BUFFER_SPLIT offload
+ *   flag is configured the PMD will split the received packets into multiple
+ *   segments according to the specification in the description array:
+ *   - the first network buffer will be allocated from the memory pool,
+ *     specified in the first segment description element, the second
+ *     network buffer - from the pool in the second segment description
+ *     element and so on. If there is no enough elements to describe
+ *     the buffer for entire packet of maximal length the pool from the last
+ *     valid element will be used to allocate the buffers from for the rest
+ *     of segments.
+ *   - the offsets from the segment description elements will provide the
+ *     data offset from the buffer beginning except the first mbuf - for this
+ *     one the offset is added to the RTE_PKTMBUF_HEADROOM to get actual
+ *     offset from the buffer beginning. If there is no enough elements
+ *     to describe the buffer for entire packet of maximal length the offsets
+ *     for the rest of segment will be supposed to be zero.
+ *   - the data length being received to each segment is limited by the
+ *     length specified in the segment description element. The data receiving
+ *     starts with filling up the first mbuf data buffer, if the specified
+ *     maximal segment length is reached and there are data remaining
+ *     (packet is longer than buffer in the first mbuf) the following data
+ *     will be pushed to the next segment up to its own length. If the first
+ *     two segments is not enough to store all the packet data the next
+ *     (third) segment will be engaged and so on. If the length in the segment
+ *     description element is zero the actual buffer size will be deduced
+ *     from the appropriate memory pool properties. If there is no enough
+ *     elements to describe the buffer for entire packet of maximal length
+ *     the buffer size will be deduced from the pool of the last valid
+ *     element for the all remaining segments.
+ * @param n_seg
+ *   The number of elements in the segment description array.
+ * @return
+ *   - 0: Success, receive queue correctly set up.
+ *   - -EIO: if device is removed.
+ *   - -EINVAL: The segment descriptors array is empty (pointer to is null or
+ *      zero number of elements) or the size of network buffers which can be
+ *      allocated from this memory pool does not fit the various buffer sizes
+ *      allowed by the device controller.
+ *   - -ENOMEM: Unable to allocate the receive ring descriptors or to
+ *      allocate network memory buffers from the memory pool when
+ *      initializing receive descriptors.
+ */
+__rte_experimental
+int rte_eth_rxseg_queue_setup(uint16_t port_id, uint16_t rx_queue_id,
+		uint16_t nb_rx_desc, unsigned int socket_id,
+		const struct rte_eth_rxconf *rx_conf,
+		const struct rte_eth_rxseg *rx_seg, uint16_t n_seg);
 
 /**
  * @warning
diff --git a/lib/librte_ethdev/rte_ethdev_driver.h b/lib/librte_ethdev/rte_ethdev_driver.h
index 35cc4fb..5dee210 100644
--- a/lib/librte_ethdev/rte_ethdev_driver.h
+++ b/lib/librte_ethdev/rte_ethdev_driver.h
@@ -264,6 +264,15 @@  typedef int (*eth_rx_queue_setup_t)(struct rte_eth_dev *dev,
 				    struct rte_mempool *mb_pool);
 /**< @internal Set up a receive queue of an Ethernet device. */
 
+typedef int (*eth_rxseg_queue_setup_t)(struct rte_eth_dev *dev,
+				       uint16_t rx_queue_id,
+				       uint16_t nb_rx_desc,
+				       unsigned int socket_id,
+				       const struct rte_eth_rxconf *rx_conf,
+				       const struct rte_eth_rxseg *rx_seg,
+				       uint16_t n_seg);
+/**< @internal extended Set up a receive queue of an Ethernet device. */
+
 typedef int (*eth_tx_queue_setup_t)(struct rte_eth_dev *dev,
 				    uint16_t tx_queue_id,
 				    uint16_t nb_tx_desc,
@@ -711,6 +720,7 @@  struct eth_dev_ops {
 	eth_queue_start_t          tx_queue_start;/**< Start TX for a queue. */
 	eth_queue_stop_t           tx_queue_stop; /**< Stop TX for a queue. */
 	eth_rx_queue_setup_t       rx_queue_setup;/**< Set up device RX queue. */
+	eth_rxseg_queue_setup_t    rxseg_queue_setup;/**< Extended RX setup. */
 	eth_queue_release_t        rx_queue_release; /**< Release RX queue. */
 
 	eth_rx_enable_intr_t       rx_queue_intr_enable;  /**< Enable Rx queue interrupt. */
diff --git a/lib/librte_ethdev/rte_ethdev_version.map b/lib/librte_ethdev/rte_ethdev_version.map
index f8a0945..d4b9849 100644
--- a/lib/librte_ethdev/rte_ethdev_version.map
+++ b/lib/librte_ethdev/rte_ethdev_version.map
@@ -195,6 +195,7 @@  EXPERIMENTAL {
 	rte_flow_get_aged_flows;
 
 	# Marked as experimental in 20.11
+	rte_eth_rxseg_queue_setup;
 	rte_tm_capabilities_get;
 	rte_tm_get_number_of_leaf_nodes;
 	rte_tm_hierarchy_commit;