[dpdk-dev,v1,5/5] ixgbe: Add LRO support

Message ID 1425412123-5227-6-git-send-email-vladz@cloudius-systems.com (mailing list archive)
State Superseded, archived
Headers

Commit Message

Vladislav Zolotarov March 3, 2015, 7:48 p.m. UTC
  - Only x540 and 82599 devices support LRO.
    - Add the appropriate HW configuration.
    - Add RSC aware rx_pkt_burst() handlers:
       - Implemented bulk allocation and non-bulk allocation versions.
       - Add LRO-specific fields to rte_eth_rxmode, to rte_eth_dev_data
         and to igb_rx_queue.
       - Use the appropriate handler when LRO is requested.

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
---
 lib/librte_ether/rte_ethdev.h       |   7 +-
 lib/librte_pmd_ixgbe/ixgbe_ethdev.c |  17 ++
 lib/librte_pmd_ixgbe/ixgbe_ethdev.h |   5 +
 lib/librte_pmd_ixgbe/ixgbe_rxtx.c   | 563 +++++++++++++++++++++++++++++++++++-
 lib/librte_pmd_ixgbe/ixgbe_rxtx.h   |   6 +
 5 files changed, 591 insertions(+), 7 deletions(-)
  

Comments

Stephen Hemminger March 4, 2015, 12:33 a.m. UTC | #1
On Tue,  3 Mar 2015 21:48:43 +0200
Vlad Zolotarov <vladz@cloudius-systems.com> wrote:

> +		lro_bulk_alloc: 1, /**< RX LRO with bulk alloc is ON(1) / OFF(0) */

This is an internal decision and should not be exposed in the API.
We need less knobs not more.
  
Stephen Hemminger March 4, 2015, 12:33 a.m. UTC | #2
On Tue,  3 Mar 2015 21:48:43 +0200
Vlad Zolotarov <vladz@cloudius-systems.com> wrote:

> +	next_desc:
> +		/*
> +		 * The code in this whole file uses the volatile pointer to
> +		 * ensure the read ordering of the status and the rest of the
> +		 * descriptor fields (on the compiler level only!!!). This is so
> +		 * UGLY - why not to just use the compiler barrier instead? DPDK
> +		 * even has the rte_compiler_barrier() for that.
> +		 *
> +		 * But most importantly this is just wrong because this doesn't
> +		 * ensure memory ordering in a general case at all. For
> +		 * instance, DPDK is supposed to work on Power CPUs where
> +		 * compiler barrier may just not be enough!
> +		 *
> +		 * I tried to write only this function properly to have a
> +		 * starting point (as a part of an LRO/RSC series) but the
> +		 * compiler cursed at me when I tried to cast away the
> +		 * "volatile" from rx_ring (yes, it's volatile too!!!). So, I'm
> +		 * keeping it the way it is for now.
> +		 *
> +		 * The code in this file is broken in so many other places and
> +		 * will just not work on a big endian CPU anyway therefore the
> +		 * lines below will have to be revisited together with the rest
> +		 * of the ixgbe PMD.
> +		 *
> +		 * TODO:
> +		 *    - Get rid of "volatile" crap and let the compiler do its
> +		 *      job.
> +		 *    - Use the proper memory barrier (rte_rmb()) to ensure the
> +		 *      memory ordering below.

This comment screams "this is broken".
Why not get proper architecture independent barriers in DPDK first.
  
Stephen Hemminger March 4, 2015, 12:34 a.m. UTC | #3
On Tue,  3 Mar 2015 21:48:43 +0200
Vlad Zolotarov <vladz@cloudius-systems.com> wrote:

> +
> +		if (!bulk_alloc) {
> +			__le64 dma =
> +			  rte_cpu_to_le_64(RTE_MBUF_DATA_DMA_ADDR_DEFAULT(nmb));
> +			/*
> +			 * Update RX descriptor with the physical address of the
> +			 * new data buffer of the new allocated mbuf.
> +			 */
> +			rxe->mbuf = nmb;
> +
> +                        rxm->data_off = RTE_PKTMBUF_HEADROOM;

Incorrect indentation.
  
Stephen Hemminger March 4, 2015, 12:36 a.m. UTC | #4
On Tue,  3 Mar 2015 21:48:43 +0200
Vlad Zolotarov <vladz@cloudius-systems.com> wrote:

>     - Only x540 and 82599 devices support LRO.
>     - Add the appropriate HW configuration.
>     - Add RSC aware rx_pkt_burst() handlers:
>        - Implemented bulk allocation and non-bulk allocation versions.
>        - Add LRO-specific fields to rte_eth_rxmode, to rte_eth_dev_data
>          and to igb_rx_queue.
>        - Use the appropriate handler when LRO is requested.
> 
> Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>

Checkpatch warnings (edited to remove ones that should be ok)


WARNING: 'recieved' may be misspelled - perhaps 'received'?
#196: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:1426:
+ * @rx_pkts table of recieved packets

WARNING: Missing a blank line after declarations
#223: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:1453:
+	struct igb_rx_queue *rxq = rx_queue;
+	volatile union ixgbe_adv_rx_desc *rx_ring = rxq->rx_ring;



WARNING: labels should not be indented
#246: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:1476:
+	next_desc:

WARNING: quoted string split across lines
#285: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:1515:
+		PMD_RX_LOG(DEBUG, "port_id=%u queue_id=%u rx_id=%u "
+				  "staterr=0x%x data_len=%u",

WARNING: quoted string split across lines
#293: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:1523:
+				PMD_RX_LOG(DEBUG, "RX mbuf alloc failed "
+						  "port_id=%u queue_id=%u",

WARNING: Missing a blank line after declarations
#302: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:1532:
+			uint16_t next_rdt = rxq->rx_free_trigger;
+			if (!ixgbe_rx_alloc_bufs(rxq, false)) {

WARNING: quoted string split across lines
#309: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:1539:
+				PMD_RX_LOG(DEBUG, "RX bulk alloc failed "
+						  "port_id=%u queue_id=%u",

ERROR: code indent should use tabs where possible
#350: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:1580:
+                        rxm->data_off = RTE_PKTMBUF_HEADROOM;$

WARNING: please, no spaces at the start of a line
#350: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:1580:
+                        rxm->data_off = RTE_PKTMBUF_HEADROOM;$

WARNING: quoted string split across lines
#452: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:1682:
+		PMD_RX_LOG(DEBUG, "port_id=%u queue_id=%u rx_tail=%u "
+			   "nb_hold=%u nb_rx=%u",

WARNING: quoted string split across lines
#536: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:2580:
+	PMD_INIT_LOG(DEBUG, "sw_ring=%p sw_rsc_ring=%p hw_ring=%p "
+			    "dma_addr=0x%"PRIx64,

WARNING: Possible switch case/default not preceeded by break or fallthrough comment
#617: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:3926:
+	default:

WARNING: quoted string split across lines
#648: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:3966:
+		PMD_INIT_LOG(CRIT, "LRO is requested on HW that doesn't "
+				   "support it");

WARNING: quoted string split across lines
#693: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:4022:
+			PMD_INIT_LOG(CRIT, "LRO can't be enabled when HW CRC "
+					    "is disabled");

WARNING: quoted string split across lines
#711: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:4084:
+				PMD_INIT_LOG(INFO, "split_hdr_size less than "
+						   "128 bytes (%d)!",

WARNING: quoted string split across lines
#835: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:4267:
+			PMD_INIT_LOG(INFO, "LRO is requested. Using a bulk "
+					   "allocation version");

WARNING: quoted string split across lines
#840: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:4272:
+			PMD_INIT_LOG(INFO, "LRO is requested. Using a single "
+					   "allocation version");
  
Vladislav Zolotarov March 4, 2015, 7:24 a.m. UTC | #5
On 03/04/15 02:33, Stephen Hemminger wrote:
> On Tue,  3 Mar 2015 21:48:43 +0200
> Vlad Zolotarov <vladz@cloudius-systems.com> wrote:
>
>> +		lro_bulk_alloc: 1, /**< RX LRO with bulk alloc is ON(1) / OFF(0) */
> This is an internal decision and should not be exposed in the API.
> We need less knobs not more.


I get your point. I'll move this to ixgbe-specific dev data.
  
Vladislav Zolotarov March 4, 2015, 7:57 a.m. UTC | #6
On 03/04/15 02:33, Stephen Hemminger wrote:
> On Tue,  3 Mar 2015 21:48:43 +0200
> Vlad Zolotarov <vladz@cloudius-systems.com> wrote:
>
>> +	next_desc:
>> +		/*
>> +		 * The code in this whole file uses the volatile pointer to
>> +		 * ensure the read ordering of the status and the rest of the
>> +		 * descriptor fields (on the compiler level only!!!). This is so
>> +		 * UGLY - why not to just use the compiler barrier instead? DPDK
>> +		 * even has the rte_compiler_barrier() for that.
>> +		 *
>> +		 * But most importantly this is just wrong because this doesn't
>> +		 * ensure memory ordering in a general case at all. For
>> +		 * instance, DPDK is supposed to work on Power CPUs where
>> +		 * compiler barrier may just not be enough!
>> +		 *
>> +		 * I tried to write only this function properly to have a
>> +		 * starting point (as a part of an LRO/RSC series) but the
>> +		 * compiler cursed at me when I tried to cast away the
>> +		 * "volatile" from rx_ring (yes, it's volatile too!!!). So, I'm
>> +		 * keeping it the way it is for now.
>> +		 *
>> +		 * The code in this file is broken in so many other places and
>> +		 * will just not work on a big endian CPU anyway therefore the
>> +		 * lines below will have to be revisited together with the rest
>> +		 * of the ixgbe PMD.
>> +		 *
>> +		 * TODO:
>> +		 *    - Get rid of "volatile" crap and let the compiler do its
>> +		 *      job.
>> +		 *    - Use the proper memory barrier (rte_rmb()) to ensure the
>> +		 *      memory ordering below.
> This comment screams "this is broken".
> Why not get proper architecture independent barriers in DPDK first.

This series is orthogonal to the issue above. I just couldn't stand to 
mention this ugliness when I noticed it on the way.
Note that although this is obviously not the right way to write this 
kind of code it is still not a bug and most likely the performance 
implications are minimal here.
The only overhead is that there may be read "too much" data from the 
descriptor that we may not actually need. The descriptor is 16 bytes so 
this doesn't seem to be a critical issue.

So, fixing the above issue may wait, especially since the same s..t may 
be found in other Intel PMDs (see i40e for example). Fixing this issue 
should be a matter of a massive cleanup series that cover all the 
relevant PMDs. Of course we may start with ixgbe but even in this single 
PMD there are at least 3 non-LRO related functions that have to be 
fixed, so IMHO even fixing ONLY ixgbe should be a matter of a separate 
series.
  
Vladislav Zolotarov March 4, 2015, 7:57 a.m. UTC | #7
On 03/04/15 02:34, Stephen Hemminger wrote:
> On Tue,  3 Mar 2015 21:48:43 +0200
> Vlad Zolotarov <vladz@cloudius-systems.com> wrote:
>
>> +
>> +		if (!bulk_alloc) {
>> +			__le64 dma =
>> +			  rte_cpu_to_le_64(RTE_MBUF_DATA_DMA_ADDR_DEFAULT(nmb));
>> +			/*
>> +			 * Update RX descriptor with the physical address of the
>> +			 * new data buffer of the new allocated mbuf.
>> +			 */
>> +			rxe->mbuf = nmb;
>> +
>> +                        rxm->data_off = RTE_PKTMBUF_HEADROOM;
> Incorrect indentation.

Oops... ;)
  
Vladislav Zolotarov March 4, 2015, 7:59 a.m. UTC | #8
On 03/04/15 02:36, Stephen Hemminger wrote:
> On Tue,  3 Mar 2015 21:48:43 +0200
> Vlad Zolotarov <vladz@cloudius-systems.com> wrote:
>
>>      - Only x540 and 82599 devices support LRO.
>>      - Add the appropriate HW configuration.
>>      - Add RSC aware rx_pkt_burst() handlers:
>>         - Implemented bulk allocation and non-bulk allocation versions.
>>         - Add LRO-specific fields to rte_eth_rxmode, to rte_eth_dev_data
>>           and to igb_rx_queue.
>>         - Use the appropriate handler when LRO is requested.
>>
>> Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
> Checkpatch warnings (edited to remove ones that should be ok)

I was unaware that checkpatch rules apply here - at least looking at the 
current code it looks like it... ;)
But I'm all for it! I'll fix all the issues and respin.

Thanks.

>
>
> WARNING: 'recieved' may be misspelled - perhaps 'received'?
> #196: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:1426:
> + * @rx_pkts table of recieved packets
>
> WARNING: Missing a blank line after declarations
> #223: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:1453:
> +	struct igb_rx_queue *rxq = rx_queue;
> +	volatile union ixgbe_adv_rx_desc *rx_ring = rxq->rx_ring;
>
>
>
> WARNING: labels should not be indented
> #246: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:1476:
> +	next_desc:
>
> WARNING: quoted string split across lines
> #285: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:1515:
> +		PMD_RX_LOG(DEBUG, "port_id=%u queue_id=%u rx_id=%u "
> +				  "staterr=0x%x data_len=%u",
>
> WARNING: quoted string split across lines
> #293: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:1523:
> +				PMD_RX_LOG(DEBUG, "RX mbuf alloc failed "
> +						  "port_id=%u queue_id=%u",
>
> WARNING: Missing a blank line after declarations
> #302: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:1532:
> +			uint16_t next_rdt = rxq->rx_free_trigger;
> +			if (!ixgbe_rx_alloc_bufs(rxq, false)) {
>
> WARNING: quoted string split across lines
> #309: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:1539:
> +				PMD_RX_LOG(DEBUG, "RX bulk alloc failed "
> +						  "port_id=%u queue_id=%u",
>
> ERROR: code indent should use tabs where possible
> #350: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:1580:
> +                        rxm->data_off = RTE_PKTMBUF_HEADROOM;$
>
> WARNING: please, no spaces at the start of a line
> #350: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:1580:
> +                        rxm->data_off = RTE_PKTMBUF_HEADROOM;$
>
> WARNING: quoted string split across lines
> #452: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:1682:
> +		PMD_RX_LOG(DEBUG, "port_id=%u queue_id=%u rx_tail=%u "
> +			   "nb_hold=%u nb_rx=%u",
>
> WARNING: quoted string split across lines
> #536: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:2580:
> +	PMD_INIT_LOG(DEBUG, "sw_ring=%p sw_rsc_ring=%p hw_ring=%p "
> +			    "dma_addr=0x%"PRIx64,
>
> WARNING: Possible switch case/default not preceeded by break or fallthrough comment
> #617: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:3926:
> +	default:
>
> WARNING: quoted string split across lines
> #648: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:3966:
> +		PMD_INIT_LOG(CRIT, "LRO is requested on HW that doesn't "
> +				   "support it");
>
> WARNING: quoted string split across lines
> #693: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:4022:
> +			PMD_INIT_LOG(CRIT, "LRO can't be enabled when HW CRC "
> +					    "is disabled");
>
> WARNING: quoted string split across lines
> #711: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:4084:
> +				PMD_INIT_LOG(INFO, "split_hdr_size less than "
> +						   "128 bytes (%d)!",
>
> WARNING: quoted string split across lines
> #835: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:4267:
> +			PMD_INIT_LOG(INFO, "LRO is requested. Using a bulk "
> +					   "allocation version");
>
> WARNING: quoted string split across lines
> #840: FILE: lib/librte_pmd_ixgbe/ixgbe_rxtx.c:4272:
> +			PMD_INIT_LOG(INFO, "LRO is requested. Using a single "
> +					   "allocation version");
>
>
  
Avi Kivity March 4, 2015, 8:05 a.m. UTC | #9
On 03/04/2015 02:33 AM, Stephen Hemminger wrote:
> On Tue,  3 Mar 2015 21:48:43 +0200
> Vlad Zolotarov <vladz@cloudius-systems.com> wrote:
>
>> +		 * TODO:
>> +		 *    - Get rid of "volatile" crap and let the compiler do its
>> +		 *      job.
>> +		 *    - Use the proper memory barrier (rte_rmb()) to ensure the
>> +		 *      memory ordering below.
> This comment screams "this is broken".
> Why not get proper architecture independent barriers in DPDK first.

C11 has arch independent memory barriers, so this can be as simple as 
-std=gnu11 (default in gcc 5, anyway).

Not only do we get the barriers for free, but they are also properly 
integrated with the compiler, so for example a release barrier won't 
stop the compiler from hoisting a later accesses to before the store, or 
cause spurious reloads, due to the memory clobber.
  
Stephen Hemminger March 4, 2015, 6:51 p.m. UTC | #10
On Wed, 04 Mar 2015 09:59:38 +0200
Vlad Zolotarov <vladz@cloudius-systems.com> wrote:

> > Checkpatch warnings (edited to remove ones that should be ok)  
> 
> I was unaware that checkpatch rules apply here - at least looking at the 
> current code it looks like it... ;)
> But I'm all for it! I'll fix all the issues and respin

Checkpatch doesn't generally have to apply. But is good and useful tool
to find obvious style gaffs.

Just don't let it rule your world
  
Stephen Hemminger March 4, 2015, 6:54 p.m. UTC | #11
On Wed, 04 Mar 2015 09:57:24 +0200
Vlad Zolotarov <vladz@cloudius-systems.com> wrote:

> 
> 
> On 03/04/15 02:33, Stephen Hemminger wrote:
> > On Tue,  3 Mar 2015 21:48:43 +0200
> > Vlad Zolotarov <vladz@cloudius-systems.com> wrote:
> >
> >> +	next_desc:
> >> +		/*
> >> +		 * The code in this whole file uses the volatile pointer to
> >> +		 * ensure the read ordering of the status and the rest of the
> >> +		 * descriptor fields (on the compiler level only!!!). This is so
> >> +		 * UGLY - why not to just use the compiler barrier instead? DPDK
> >> +		 * even has the rte_compiler_barrier() for that.
> >> +		 *
> >> +		 * But most importantly this is just wrong because this doesn't
> >> +		 * ensure memory ordering in a general case at all. For
> >> +		 * instance, DPDK is supposed to work on Power CPUs where
> >> +		 * compiler barrier may just not be enough!
> >> +		 *
> >> +		 * I tried to write only this function properly to have a
> >> +		 * starting point (as a part of an LRO/RSC series) but the
> >> +		 * compiler cursed at me when I tried to cast away the
> >> +		 * "volatile" from rx_ring (yes, it's volatile too!!!). So, I'm
> >> +		 * keeping it the way it is for now.
> >> +		 *
> >> +		 * The code in this file is broken in so many other places and
> >> +		 * will just not work on a big endian CPU anyway therefore the
> >> +		 * lines below will have to be revisited together with the rest
> >> +		 * of the ixgbe PMD.
> >> +		 *
> >> +		 * TODO:
> >> +		 *    - Get rid of "volatile" crap and let the compiler do its
> >> +		 *      job.
> >> +		 *    - Use the proper memory barrier (rte_rmb()) to ensure the
> >> +		 *      memory ordering below.
> > This comment screams "this is broken".
> > Why not get proper architecture independent barriers in DPDK first.
> 
> This series is orthogonal to the issue above. I just couldn't stand to 
> mention this ugliness when I noticed it on the way.
> Note that although this is obviously not the right way to write this 
> kind of code it is still not a bug and most likely the performance 
> implications are minimal here.
> The only overhead is that there may be read "too much" data from the 
> descriptor that we may not actually need. The descriptor is 16 bytes so 
> this doesn't seem to be a critical issue.
> 
> So, fixing the above issue may wait, especially since the same s..t may 
> be found in other Intel PMDs (see i40e for example). Fixing this issue 
> should be a matter of a massive cleanup series that cover all the 
> relevant PMDs. Of course we may start with ixgbe but even in this single 
> PMD there are at least 3 non-LRO related functions that have to be 
> fixed, so IMHO even fixing ONLY ixgbe should be a matter of a separate 
> series.

In userspace-rcu and kernel there is a simple macro that would make this
kind of code more sane.

What about adding:

#define rte_access_once(x)  (*(volatile typeof(x) *)&(x))

Then doing
            rxdp = rte_access_once(rx_ring + idx);
  
Vladislav Zolotarov March 5, 2015, 9:36 a.m. UTC | #12
On 03/04/15 20:54, Stephen Hemminger wrote:
> On Wed, 04 Mar 2015 09:57:24 +0200
> Vlad Zolotarov <vladz@cloudius-systems.com> wrote:
>
>>
>> On 03/04/15 02:33, Stephen Hemminger wrote:
>>> On Tue,  3 Mar 2015 21:48:43 +0200
>>> Vlad Zolotarov <vladz@cloudius-systems.com> wrote:
>>>
>>>> +	next_desc:
>>>> +		/*
>>>> +		 * The code in this whole file uses the volatile pointer to
>>>> +		 * ensure the read ordering of the status and the rest of the
>>>> +		 * descriptor fields (on the compiler level only!!!). This is so
>>>> +		 * UGLY - why not to just use the compiler barrier instead? DPDK
>>>> +		 * even has the rte_compiler_barrier() for that.
>>>> +		 *
>>>> +		 * But most importantly this is just wrong because this doesn't
>>>> +		 * ensure memory ordering in a general case at all. For
>>>> +		 * instance, DPDK is supposed to work on Power CPUs where
>>>> +		 * compiler barrier may just not be enough!
>>>> +		 *
>>>> +		 * I tried to write only this function properly to have a
>>>> +		 * starting point (as a part of an LRO/RSC series) but the
>>>> +		 * compiler cursed at me when I tried to cast away the
>>>> +		 * "volatile" from rx_ring (yes, it's volatile too!!!). So, I'm
>>>> +		 * keeping it the way it is for now.
>>>> +		 *
>>>> +		 * The code in this file is broken in so many other places and
>>>> +		 * will just not work on a big endian CPU anyway therefore the
>>>> +		 * lines below will have to be revisited together with the rest
>>>> +		 * of the ixgbe PMD.
>>>> +		 *
>>>> +		 * TODO:
>>>> +		 *    - Get rid of "volatile" crap and let the compiler do its
>>>> +		 *      job.
>>>> +		 *    - Use the proper memory barrier (rte_rmb()) to ensure the
>>>> +		 *      memory ordering below.
>>> This comment screams "this is broken".
>>> Why not get proper architecture independent barriers in DPDK first.
>> This series is orthogonal to the issue above. I just couldn't stand to
>> mention this ugliness when I noticed it on the way.
>> Note that although this is obviously not the right way to write this
>> kind of code it is still not a bug and most likely the performance
>> implications are minimal here.
>> The only overhead is that there may be read "too much" data from the
>> descriptor that we may not actually need. The descriptor is 16 bytes so
>> this doesn't seem to be a critical issue.
>>
>> So, fixing the above issue may wait, especially since the same s..t may
>> be found in other Intel PMDs (see i40e for example). Fixing this issue
>> should be a matter of a massive cleanup series that cover all the
>> relevant PMDs. Of course we may start with ixgbe but even in this single
>> PMD there are at least 3 non-LRO related functions that have to be
>> fixed, so IMHO even fixing ONLY ixgbe should be a matter of a separate
>> series.
> In userspace-rcu and kernel there is a simple macro that would make this
> kind of code more sane.
>
> What about adding:
>
> #define rte_access_once(x)  (*(volatile typeof(x) *)&(x))
>
> Then doing
>              rxdp = rte_access_once(rx_ring + idx);

This workaround doesn't address the described above issue - it just 
hides it inside a macro, which is even uglier.
The main reason I haven't fixed this issue in (at least) a function I've 
added is that the hw->rx_ring (HW ring) is defined as volatile and this 
fact is used all over the file in different places and all such places 
have to be fixed if I drop the "volatile" qualifier which should be the 
first thing to do.

>
>
>
>
  

Patch

diff --git a/lib/librte_ether/rte_ethdev.h b/lib/librte_ether/rte_ethdev.h
index 8db3127..f5eff81 100644
--- a/lib/librte_ether/rte_ethdev.h
+++ b/lib/librte_ether/rte_ethdev.h
@@ -320,14 +320,15 @@  struct rte_eth_rxmode {
 	enum rte_eth_rx_mq_mode mq_mode;
 	uint32_t max_rx_pkt_len;  /**< Only used if jumbo_frame enabled. */
 	uint16_t split_hdr_size;  /**< hdr buf size (header_split enabled).*/
-	uint8_t header_split : 1, /**< Header Split enable. */
+	uint16_t header_split : 1, /**< Header Split enable. */
 		hw_ip_checksum   : 1, /**< IP/UDP/TCP checksum offload enable. */
 		hw_vlan_filter   : 1, /**< VLAN filter enable. */
 		hw_vlan_strip    : 1, /**< VLAN strip enable. */
 		hw_vlan_extend   : 1, /**< Extended VLAN enable. */
 		jumbo_frame      : 1, /**< Jumbo Frame Receipt enable. */
 		hw_strip_crc     : 1, /**< Enable CRC stripping by hardware. */
-		enable_scatter   : 1; /**< Enable scatter packets rx handler */
+		enable_scatter   : 1, /**< Enable scatter packets rx handler */
+		enable_lro       : 1; /**< Enable LRO */
 };
 
 /**
@@ -1515,6 +1516,8 @@  struct rte_eth_dev_data {
 	uint8_t port_id;           /**< Device [external] port identifier. */
 	uint8_t promiscuous   : 1, /**< RX promiscuous mode ON(1) / OFF(0). */
 		scattered_rx : 1,  /**< RX of scattered packets is ON(1) / OFF(0) */
+		lro          : 1,  /**< RX LRO is ON(1) / OFF(0) */
+		lro_bulk_alloc: 1, /**< RX LRO with bulk alloc is ON(1) / OFF(0) */
 		all_multicast : 1, /**< RX all multicast mode ON(1) / OFF(0). */
 		dev_started : 1;   /**< Device state: STARTED(1) / STOPPED(0). */
 };
diff --git a/lib/librte_pmd_ixgbe/ixgbe_ethdev.c b/lib/librte_pmd_ixgbe/ixgbe_ethdev.c
index 9bdc046..a5a4cb8 100644
--- a/lib/librte_pmd_ixgbe/ixgbe_ethdev.c
+++ b/lib/librte_pmd_ixgbe/ixgbe_ethdev.c
@@ -762,6 +762,14 @@  eth_ixgbe_dev_init(__attribute__((unused)) struct eth_driver *eth_drv,
 
 		if (eth_dev->data->scattered_rx)
 			eth_dev->rx_pkt_burst = ixgbe_recv_scattered_pkts;
+
+		if (eth_dev->data->lro) {
+			if (eth_dev->data->lro_bulk_alloc)
+				eth_dev->rx_pkt_burst = ixgbe_recv_pkts_lro_bulk_alloc;
+			else
+				eth_dev->rx_pkt_burst = ixgbe_recv_pkts_lro;
+		}
+
 		return 0;
 	}
 	pci_dev = eth_dev->pci_dev;
@@ -1641,6 +1649,8 @@  ixgbe_dev_stop(struct rte_eth_dev *dev)
 
 	/* Clear stored conf */
 	dev->data->scattered_rx = 0;
+	dev->data->lro = 0;
+	dev->data->lro_bulk_alloc = 0;
 
 	/* Clear recorded link status */
 	memset(&link, 0, sizeof(link));
@@ -2009,6 +2019,13 @@  ixgbe_dev_info_get(struct rte_eth_dev *dev, struct rte_eth_dev_info *dev_info)
 		DEV_RX_OFFLOAD_IPV4_CKSUM |
 		DEV_RX_OFFLOAD_UDP_CKSUM  |
 		DEV_RX_OFFLOAD_TCP_CKSUM;
+
+#ifdef RTE_ETHDEV_LRO_SUPPORT
+	if (hw->mac.type == ixgbe_mac_82599EB ||
+	    hw->mac.type == ixgbe_mac_X540)
+		dev_info->rx_offload_capa |= DEV_RX_OFFLOAD_TCP_LRO;
+#endif
+
 	dev_info->tx_offload_capa =
 		DEV_TX_OFFLOAD_VLAN_INSERT |
 		DEV_TX_OFFLOAD_IPV4_CKSUM  |
diff --git a/lib/librte_pmd_ixgbe/ixgbe_ethdev.h b/lib/librte_pmd_ixgbe/ixgbe_ethdev.h
index a549f5c..e206584 100644
--- a/lib/librte_pmd_ixgbe/ixgbe_ethdev.h
+++ b/lib/librte_pmd_ixgbe/ixgbe_ethdev.h
@@ -349,6 +349,11 @@  uint16_t ixgbe_recv_pkts_bulk_alloc(void *rx_queue, struct rte_mbuf **rx_pkts,
 uint16_t ixgbe_recv_scattered_pkts(void *rx_queue,
 		struct rte_mbuf **rx_pkts, uint16_t nb_pkts);
 
+uint16_t ixgbe_recv_pkts_lro(void *rx_queue,
+		struct rte_mbuf **rx_pkts, uint16_t nb_pkts);
+uint16_t ixgbe_recv_pkts_lro_bulk_alloc(void *rx_queue,
+		struct rte_mbuf **rx_pkts, uint16_t nb_pkts);
+
 uint16_t ixgbe_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts,
 		uint16_t nb_pkts);
 
diff --git a/lib/librte_pmd_ixgbe/ixgbe_rxtx.c b/lib/librte_pmd_ixgbe/ixgbe_rxtx.c
index 4c67a9e..9a70204 100644
--- a/lib/librte_pmd_ixgbe/ixgbe_rxtx.c
+++ b/lib/librte_pmd_ixgbe/ixgbe_rxtx.c
@@ -1366,6 +1366,15 @@  ixgbe_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
 }
 
 /**
+ * Detect an RSC descriptor.
+ */
+static inline uint32_t ixgbe_rsc_count(union ixgbe_adv_rx_desc *rx)
+{
+	return (rte_le_to_cpu_32(rx->wb.lower.lo_dword.data) &
+		IXGBE_RXDADV_RSCCNT_MASK) >> IXGBE_RXDADV_RSCCNT_SHIFT;
+}
+
+/**
  * Initialize the first mbuf of the returned packet:
  *    - RX port identifier,
  *    - hardware offload data, if any:
@@ -1410,6 +1419,290 @@  static inline void ixgbe_fill_cluster_head_buf(
 	}
 }
 
+/**
+ * Bulk receive handler for and LRO case.
+ *
+ * @rx_queue Rx queue handle
+ * @rx_pkts table of recieved packets
+ * @nb_pkts size of rx_pkts table
+ * @bulk_alloc if TRUE bulk allocation is used for a HW ring refilling
+ *
+ * Handles the Rx HW ring completions when RSC feature is configured. Uses an
+ * additional ring of igb_rsc_entry's that will hold the relevant RSC info.
+ *
+ * We use the same logic as in Lunux and in FreeBSD ixgbe drivers:
+ * 1) When non-EOP RSC completion arrives:
+ *    a) Update the HEAD of the current RSC aggregation cluster with the new
+ *       segment's data length.
+ *    b) Set the "next" pointer of the current segment to point to the segment
+ *       at the NEXTP index.
+ *    c) Pass the HEAD of RSC aggregation cluster on to the next NEXTP entry
+ *       in the sw_rsc_ring.
+ * 2) When EOP arrives we just update the cluster's total length and offload
+ *    flags and deliver the cluster up to the upper layers. In our case - put it
+ *    in the rx_pkts table.
+ *
+ * Returns the number of received packets/clusters (according to the "bulk
+ * receive" interface).
+ */
+static inline uint16_t
+_recv_pkts_lro(void *rx_queue, struct rte_mbuf **rx_pkts, uint16_t nb_pkts,
+	       bool bulk_alloc)
+{
+	struct igb_rx_queue *rxq = rx_queue;
+	volatile union ixgbe_adv_rx_desc *rx_ring = rxq->rx_ring;
+	struct igb_rx_entry *sw_ring = rxq->sw_ring;
+	struct igb_rsc_entry *sw_rsc_ring = rxq->sw_rsc_ring;
+	uint16_t rx_id = rxq->rx_tail;
+	uint16_t nb_rx = 0;
+	uint16_t nb_hold = rxq->nb_rx_hold;
+	uint16_t prev_id = rxq->rx_tail;
+
+	while (nb_rx < nb_pkts) {
+		bool eop;
+		struct igb_rx_entry *rxe;
+		struct igb_rsc_entry *rsc_entry;
+		struct igb_rsc_entry *next_rsc_entry;
+		struct igb_rx_entry *next_rxe;
+		struct rte_mbuf *first_seg;
+		struct rte_mbuf *rxm;
+		struct rte_mbuf *nmb;
+		union ixgbe_adv_rx_desc rxd;
+		uint16_t data_len;
+		uint16_t next_id;
+		volatile union ixgbe_adv_rx_desc *rxdp;
+		uint32_t staterr;
+
+	next_desc:
+		/*
+		 * The code in this whole file uses the volatile pointer to
+		 * ensure the read ordering of the status and the rest of the
+		 * descriptor fields (on the compiler level only!!!). This is so
+		 * UGLY - why not to just use the compiler barrier instead? DPDK
+		 * even has the rte_compiler_barrier() for that.
+		 *
+		 * But most importantly this is just wrong because this doesn't
+		 * ensure memory ordering in a general case at all. For
+		 * instance, DPDK is supposed to work on Power CPUs where
+		 * compiler barrier may just not be enough!
+		 *
+		 * I tried to write only this function properly to have a
+		 * starting point (as a part of an LRO/RSC series) but the
+		 * compiler cursed at me when I tried to cast away the
+		 * "volatile" from rx_ring (yes, it's volatile too!!!). So, I'm
+		 * keeping it the way it is for now.
+		 *
+		 * The code in this file is broken in so many other places and
+		 * will just not work on a big endian CPU anyway therefore the
+		 * lines below will have to be revisited together with the rest
+		 * of the ixgbe PMD.
+		 *
+		 * TODO:
+		 *    - Get rid of "volatile" crap and let the compiler do its
+		 *      job.
+		 *    - Use the proper memory barrier (rte_rmb()) to ensure the
+		 *      memory ordering below.
+		 */
+		rxdp = &rx_ring[rx_id];
+		staterr = rte_le_to_cpu_32(rxdp->wb.upper.status_error);
+
+		if (!(staterr & IXGBE_RXDADV_STAT_DD))
+			break;
+
+		rxd = *rxdp;
+
+		PMD_RX_LOG(DEBUG, "port_id=%u queue_id=%u rx_id=%u "
+				  "staterr=0x%x data_len=%u",
+			   rxq->port_id, rxq->queue_id, rx_id, staterr,
+			   rte_le_to_cpu_16(rxd.wb.upper.length));
+
+		if (!bulk_alloc) {
+			nmb = rte_rxmbuf_alloc(rxq->mb_pool);
+			if (nmb == NULL) {
+				PMD_RX_LOG(DEBUG, "RX mbuf alloc failed "
+						  "port_id=%u queue_id=%u",
+					   rxq->port_id, rxq->queue_id);
+
+				rte_eth_devices[rxq->port_id].data->
+							rx_mbuf_alloc_failed++;
+				break;
+			}
+		} else if (nb_hold > rxq->rx_free_thresh) {
+			uint16_t next_rdt = rxq->rx_free_trigger;
+			if (!ixgbe_rx_alloc_bufs(rxq, false)) {
+				rte_wmb();
+				IXGBE_PCI_REG_WRITE(rxq->rdt_reg_addr,
+						    next_rdt);
+				nb_hold -= rxq->rx_free_thresh;
+			} else {
+				PMD_RX_LOG(DEBUG, "RX bulk alloc failed "
+						  "port_id=%u queue_id=%u",
+					   rxq->port_id, rxq->queue_id);
+
+				rte_eth_devices[rxq->port_id].data->
+							rx_mbuf_alloc_failed++;
+				break;
+			}
+		}
+
+		nb_hold++;
+		rxe = &sw_ring[rx_id];
+		eop = staterr & IXGBE_RXDADV_STAT_EOP;
+
+		next_id = rx_id + 1;
+		if (next_id == rxq->nb_rx_desc)
+			next_id = 0;
+
+		/* Prefetch next mbuf while processing current one. */
+		rte_ixgbe_prefetch(sw_ring[next_id].mbuf);
+
+		/*
+		 * When next RX descriptor is on a cache-line boundary,
+		 * prefetch the next 4 RX descriptors and the next 4 pointers
+		 * to mbufs.
+		 */
+		if ((next_id & 0x3) == 0) {
+			rte_ixgbe_prefetch(&rx_ring[next_id]);
+			rte_ixgbe_prefetch(&sw_ring[next_id]);
+		}
+
+		rxm = rxe->mbuf;
+
+		if (!bulk_alloc) {
+			__le64 dma =
+			  rte_cpu_to_le_64(RTE_MBUF_DATA_DMA_ADDR_DEFAULT(nmb));
+			/*
+			 * Update RX descriptor with the physical address of the
+			 * new data buffer of the new allocated mbuf.
+			 */
+			rxe->mbuf = nmb;
+
+                        rxm->data_off = RTE_PKTMBUF_HEADROOM;
+			rxdp->read.hdr_addr = dma;
+			rxdp->read.pkt_addr = dma;
+		}
+		/*
+		 * Set data length & data buffer address of mbuf.
+		 */
+		data_len = rte_le_to_cpu_16(rxd.wb.upper.length);
+		rxm->data_len = data_len;
+
+		if (!eop) {
+			uint16_t nextp_id;
+			/*
+			 * Get next descriptor index:
+			 *  - For RSC it's in the NEXTP field.
+			 *  - For a scattered packet - it's just a following
+			 *    descriptor.
+			 */
+			if (ixgbe_rsc_count(&rxd))
+				nextp_id =
+					(staterr & IXGBE_RXDADV_NEXTP_MASK) >>
+						       IXGBE_RXDADV_NEXTP_SHIFT;
+			else
+				nextp_id = next_id;
+
+			next_rsc_entry = &sw_rsc_ring[nextp_id];
+			next_rxe = &sw_ring[nextp_id];
+			rte_ixgbe_prefetch(next_rxe);
+		}
+
+		rsc_entry = &sw_rsc_ring[rx_id];
+		first_seg = rsc_entry->fbuf;
+		rsc_entry->fbuf = NULL;
+
+		/*
+		 * If this is the first buffer of the received packet,
+		 * set the pointer to the first mbuf of the packet and
+		 * initialize its context.
+		 * Otherwise, update the total length and the number of segments
+		 * of the current scattered packet, and update the pointer to
+		 * the last mbuf of the current packet.
+		 */
+		if (first_seg == NULL) {
+			first_seg = rxm;
+			first_seg->pkt_len = data_len;
+			first_seg->nb_segs = 1;
+		} else {
+			first_seg->pkt_len += data_len;
+			first_seg->nb_segs++;
+		}
+
+		prev_id = rx_id;
+		rx_id = next_id;
+
+		/*
+		 * If this is not the last buffer of the received packet, update
+		 * the pointer to the first mbuf at the NEXTP entry in the
+		 * sw_rsc_ring and continue to parse the RX ring.
+		 */
+		if (!eop) {
+			rxm->next = next_rxe->mbuf;
+			next_rsc_entry->fbuf = first_seg;
+			goto next_desc;
+		}
+
+		/*
+		 * This is the last buffer of the received packet - return
+		 * the current cluster to the user.
+		 */
+		rxm->next = NULL;
+
+		/* Initialize the first mbuf of the returned packet */
+		ixgbe_fill_cluster_head_buf(first_seg, &rxd, rxq->port_id,
+					    staterr);
+
+		/* Prefetch data of first segment, if configured to do so. */
+		rte_packet_prefetch((char *)first_seg->buf_addr +
+			first_seg->data_off);
+
+		/*
+		 * Store the mbuf address into the next entry of the array
+		 * of returned packets.
+		 */
+		rx_pkts[nb_rx++] = first_seg;
+	}
+
+	/*
+	 * Record index of the next RX descriptor to probe.
+	 */
+	rxq->rx_tail = rx_id;
+
+	/*
+	 * If the number of free RX descriptors is greater than the RX free
+	 * threshold of the queue, advance the Receive Descriptor Tail (RDT)
+	 * register.
+	 * Update the RDT with the value of the last processed RX descriptor
+	 * minus 1, to guarantee that the RDT register is never equal to the
+	 * RDH register, which creates a "full" ring situtation from the
+	 * hardware point of view...
+	 */
+	if (!bulk_alloc && nb_hold > rxq->rx_free_thresh) {
+		PMD_RX_LOG(DEBUG, "port_id=%u queue_id=%u rx_tail=%u "
+			   "nb_hold=%u nb_rx=%u",
+			   rxq->port_id, rxq->queue_id, rx_id, nb_hold, nb_rx);
+
+		IXGBE_PCI_REG_WRITE(rxq->rdt_reg_addr, prev_id);
+		nb_hold = 0;
+	}
+
+	rxq->nb_rx_hold = nb_hold;
+	return nb_rx;
+}
+
+uint16_t
+ixgbe_recv_pkts_lro(void *rx_queue, struct rte_mbuf **rx_pkts, uint16_t nb_pkts)
+{
+	return _recv_pkts_lro(rx_queue, rx_pkts, nb_pkts, false);
+}
+
+uint16_t
+ixgbe_recv_pkts_lro_bulk_alloc(void *rx_queue, struct rte_mbuf **rx_pkts,
+			       uint16_t nb_pkts)
+{
+	return _recv_pkts_lro(rx_queue, rx_pkts, nb_pkts, true);
+}
+
 uint16_t
 ixgbe_recv_scattered_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
 			  uint16_t nb_pkts)
@@ -2022,6 +2315,7 @@  ixgbe_rx_queue_release(struct igb_rx_queue *rxq)
 	if (rxq != NULL) {
 		ixgbe_rx_queue_release_mbufs(rxq);
 		rte_free(rxq->sw_ring);
+		rte_free(rxq->sw_rsc_ring);
 		rte_free(rxq);
 	}
 }
@@ -2149,6 +2443,7 @@  ixgbe_reset_rx_queue(struct igb_rx_queue *rxq)
 	rxq->nb_rx_hold = 0;
 	rxq->pkt_first_seg = NULL;
 	rxq->pkt_last_seg = NULL;
+	rxq->rsc_en = 0;
 }
 
 int
@@ -2164,6 +2459,14 @@  ixgbe_dev_rx_queue_setup(struct rte_eth_dev *dev,
 	struct ixgbe_hw     *hw;
 	int use_def_burst_func = 1;
 	uint16_t len;
+	struct rte_eth_dev_info dev_info = { 0 };
+	struct rte_eth_rxmode *dev_rx_mode = &dev->data->dev_conf.rxmode;
+	bool rsc_requested = false;
+
+	dev->dev_ops->dev_infos_get(dev, &dev_info);
+	if ((dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_LRO) &&
+	    dev_rx_mode->enable_lro)
+		rsc_requested = true;
 
 	PMD_INIT_FUNC_TRACE();
 	hw = IXGBE_DEV_PRIVATE_TO_HW(dev->data->dev_private);
@@ -2255,12 +2558,28 @@  ixgbe_dev_rx_queue_setup(struct rte_eth_dev *dev,
 	rxq->sw_ring = rte_zmalloc_socket("rxq->sw_ring",
 					  sizeof(struct igb_rx_entry) * len,
 					  RTE_CACHE_LINE_SIZE, socket_id);
-	if (rxq->sw_ring == NULL) {
+	if (!rxq->sw_ring) {
 		ixgbe_rx_queue_release(rxq);
 		return (-ENOMEM);
 	}
-	PMD_INIT_LOG(DEBUG, "sw_ring=%p hw_ring=%p dma_addr=0x%"PRIx64,
-		     rxq->sw_ring, rxq->rx_ring, rxq->rx_ring_phys_addr);
+
+	if (rsc_requested) {
+		rxq->sw_rsc_ring =
+			rte_zmalloc_socket("rxq->sw_rsc_ring",
+					   sizeof(struct igb_rsc_entry) * len,
+					   RTE_CACHE_LINE_SIZE, socket_id);
+		if (!rxq->sw_rsc_ring) {
+			ixgbe_rx_queue_release(rxq);
+			return (-ENOMEM);
+		}
+	} else {
+		rxq->sw_rsc_ring = NULL;
+	}
+
+	PMD_INIT_LOG(DEBUG, "sw_ring=%p sw_rsc_ring=%p hw_ring=%p "
+			    "dma_addr=0x%"PRIx64,
+		     rxq->sw_ring, rxq->sw_rsc_ring, rxq->rx_ring,
+		     rxq->rx_ring_phys_addr);
 
 	/*
 	 * Certain constraints must be met in order to use the bulk buffer
@@ -3533,6 +3852,83 @@  ixgbe_dev_mq_tx_configure(struct rte_eth_dev *dev)
 	return 0;
 }
 
+/**
+ * get_rscctl_maxdesc - Calculate the RSCCTL[n].MAXDESC for PF
+ *
+ * Return the RSCCTL[n].MAXDESC for 82599 and x540 PF devices according to the
+ * spec rev. 3.0 chapter 8.2.3.8.13.
+ *
+ * @pool Memory pool of the Rx queue
+ */
+static inline uint32_t get_rscctl_maxdesc(struct rte_mempool *pool)
+{
+	struct rte_pktmbuf_pool_private *mp_priv = rte_mempool_get_priv(pool);
+
+	/* MAXDESC * SRRCTL.BSIZEPKT must not exceed 64 KB minus one */
+	uint16_t maxdesc =
+		65535 / (mp_priv->mbuf_data_room_size - RTE_PKTMBUF_HEADROOM);
+
+	if (maxdesc >= 16)
+		return IXGBE_RSCCTL_MAXDESC_16;
+	else if (maxdesc >= 8)
+		return IXGBE_RSCCTL_MAXDESC_8;
+	else if (maxdesc >= 4)
+		return IXGBE_RSCCTL_MAXDESC_4;
+	else
+		return IXGBE_RSCCTL_MAXDESC_1;
+}
+
+/* (Taken from FreeBSD tree)
+** Setup the correct IVAR register for a particular MSIX interrupt
+**   (yes this is all very magic and confusing :)
+**  - entry is the register array entry
+**  - vector is the MSIX vector for this queue
+**  - type is RX/TX/MISC
+*/
+static void
+ixgbe_set_ivar(struct rte_eth_dev *dev, u8 entry, u8 vector, s8 type)
+{
+	struct ixgbe_hw *hw = IXGBE_DEV_PRIVATE_TO_HW(dev->data->dev_private);
+	u32 ivar, index;
+
+	vector |= IXGBE_IVAR_ALLOC_VAL;
+
+	switch (hw->mac.type) {
+
+	case ixgbe_mac_82598EB:
+		if (type == -1)
+			entry = IXGBE_IVAR_OTHER_CAUSES_INDEX;
+		else
+			entry += (type * 64);
+		index = (entry >> 2) & 0x1F;
+		ivar = IXGBE_READ_REG(hw, IXGBE_IVAR(index));
+		ivar &= ~(0xFF << (8 * (entry & 0x3)));
+		ivar |= (vector << (8 * (entry & 0x3)));
+		IXGBE_WRITE_REG(hw, IXGBE_IVAR(index), ivar);
+		break;
+
+	case ixgbe_mac_82599EB:
+	case ixgbe_mac_X540:
+		if (type == -1) { /* MISC IVAR */
+			index = (entry & 1) * 8;
+			ivar = IXGBE_READ_REG(hw, IXGBE_IVAR_MISC);
+			ivar &= ~(0xFF << index);
+			ivar |= (vector << index);
+			IXGBE_WRITE_REG(hw, IXGBE_IVAR_MISC, ivar);
+		} else {	/* RX/TX IVARS */
+			index = (16 * (entry & 1)) + (8 * type);
+			ivar = IXGBE_READ_REG(hw, IXGBE_IVAR(entry >> 1));
+			ivar &= ~(0xFF << index);
+			ivar |= (vector << index);
+			IXGBE_WRITE_REG(hw, IXGBE_IVAR(entry >> 1), ivar);
+		}
+
+	default:
+		break;
+	}
+}
+
+
 /*
  * Initializes Receive Unit.
  */
@@ -3549,10 +3945,27 @@  ixgbe_dev_rx_init(struct rte_eth_dev *dev)
 	uint32_t maxfrs;
 	uint32_t srrctl;
 	uint32_t rdrxctl;
+	uint32_t rscctl;
+	uint32_t psrtype;
+	uint32_t rfctl;
 	uint32_t rxcsum;
 	uint16_t buf_size;
 	uint16_t i;
 	struct rte_eth_rxmode *rx_conf = &dev->data->dev_conf.rxmode;
+	struct rte_eth_dev_info dev_info = { 0 };
+	bool rsc_capable = false;
+	int bulk_alloc_cond = 0;
+
+	/* Sanity check */
+	dev->dev_ops->dev_infos_get(dev, &dev_info);
+	if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_LRO)
+		rsc_capable = true;
+
+	if (!rsc_capable && rx_conf->enable_lro) {
+		PMD_INIT_LOG(CRIT, "LRO is requested on HW that doesn't "
+				   "support it");
+		return -EINVAL;
+	}
 
 	PMD_INIT_FUNC_TRACE();
 	hw = IXGBE_DEV_PRIVATE_TO_HW(dev->data->dev_private);
@@ -3572,13 +3985,44 @@  ixgbe_dev_rx_init(struct rte_eth_dev *dev)
 	IXGBE_WRITE_REG(hw, IXGBE_FCTRL, fctrl);
 
 	/*
+	 * RFCTL configuration
+	 *
+	 * Since NFS packets coalescing is not supported - clear RFCTL.NFSW_DIS
+	 * and RFCTL.NFSR_DIS when RSC is enabled.
+	 */
+	if (rsc_capable) {
+		rfctl = IXGBE_READ_REG(hw, IXGBE_RFCTL);
+		if (rx_conf->enable_lro) {
+			rfctl &= ~(IXGBE_RFCTL_RSC_DIS | IXGBE_RFCTL_NFSW_DIS |
+				   IXGBE_RFCTL_NFSR_DIS);
+		} else {
+			rfctl |= IXGBE_RFCTL_RSC_DIS;
+		}
+
+		IXGBE_WRITE_REG(hw, IXGBE_RFCTL, rfctl);
+	}
+
+
+	/*
 	 * Configure CRC stripping, if any.
 	 */
 	hlreg0 = IXGBE_READ_REG(hw, IXGBE_HLREG0);
 	if (rx_conf->hw_strip_crc)
 		hlreg0 |= IXGBE_HLREG0_RXCRCSTRP;
-	else
+	else {
 		hlreg0 &= ~IXGBE_HLREG0_RXCRCSTRP;
+		if (rx_conf->enable_lro) {
+			/*
+			 * According to chapter of 4.6.7.2.1 of the Spec Rev.
+			 * 3.0 RSC configuration requires HW CRC stripping being
+			 * enabled. If user requested both HW CRC stripping off
+			 * and RSC on - return an error.
+			 */
+			PMD_INIT_LOG(CRIT, "LRO can't be enabled when HW CRC "
+					    "is disabled");
+			return -EINVAL;
+		}
+	}
 
 	/*
 	 * Configure jumbo frame support, if any.
@@ -3630,9 +4074,18 @@  ixgbe_dev_rx_init(struct rte_eth_dev *dev)
 		 * Configure Header Split
 		 */
 		if (rx_conf->header_split) {
+			/*
+			 * Print a warning if split_hdr_size is less
+			 * than 128 bytes when RSC is requested.
+			 */
+			if (rx_conf->enable_lro &&
+			    rx_conf->split_hdr_size < 128)
+				PMD_INIT_LOG(INFO, "split_hdr_size less than "
+						   "128 bytes (%d)!",
+					     rx_conf->split_hdr_size);
+
 			if (hw->mac.type == ixgbe_mac_82599EB) {
 				/* Must setup the PSRTYPE register */
-				uint32_t psrtype;
 				psrtype = IXGBE_PSRTYPE_TCPHDR |
 					IXGBE_PSRTYPE_UDPHDR   |
 					IXGBE_PSRTYPE_IPV4HDR  |
@@ -3645,7 +4098,20 @@  ixgbe_dev_rx_init(struct rte_eth_dev *dev)
 			srrctl |= IXGBE_SRRCTL_DESCTYPE_HDR_SPLIT_ALWAYS;
 		} else
 #endif
+		{
 			srrctl = IXGBE_SRRCTL_DESCTYPE_ADV_ONEBUF;
+			/*
+			 * Following the 4.6.7.2.1 chapter of the 82599/x540
+			 * Spec if RSC is enabled the SRRCTL[n].BSIZEHEADER
+			 * should be configured even if header split is not
+			 * enabled. In the later case we will configure it 128
+			 * bytes following the recommendation in the spec.
+			 */
+			if (rx_conf->enable_lro)
+				srrctl |=
+				     ((128 << IXGBE_SRRCTL_BSIZEHDRSIZE_SHIFT) &
+						    IXGBE_SRRCTL_BSIZEHDR_MASK);
+		}
 
 		/* Set if packets are dropped when no descriptors available */
 		if (rxq->drop_en)
@@ -3662,6 +4128,13 @@  ixgbe_dev_rx_init(struct rte_eth_dev *dev)
 				       RTE_PKTMBUF_HEADROOM);
 		srrctl |= ((buf_size >> IXGBE_SRRCTL_BSIZEPKT_SHIFT) &
 			   IXGBE_SRRCTL_BSIZEPKT_MASK);
+
+		/*
+		 * TODO: Consider setting the Receive Descriptor Minimum
+		 * Threshold Size for and RSC case. This is not an obviously
+		 * beneficiary option but the one worth considering...
+		 */
+
 		IXGBE_WRITE_REG(hw, IXGBE_SRRCTL(rxq->reg_idx), srrctl);
 
 		buf_size = (uint16_t) ((srrctl & IXGBE_SRRCTL_BSIZEPKT_MASK) <<
@@ -3679,6 +4152,55 @@  ixgbe_dev_rx_init(struct rte_eth_dev *dev)
 			dev->rx_pkt_burst = ixgbe_recv_scattered_pkts;
 #endif
 		}
+
+		/* RSC per-queue configuration */
+		if (rx_conf->enable_lro) {
+			uint32_t eitr;
+
+			rscctl =
+				IXGBE_READ_REG(hw, IXGBE_RSCCTL(rxq->reg_idx));
+			psrtype =
+				IXGBE_READ_REG(hw, IXGBE_PSRTYPE(rxq->reg_idx));
+			eitr = IXGBE_READ_REG(hw, IXGBE_EITR(rxq->reg_idx));
+
+			rscctl |= IXGBE_RSCCTL_RSCEN;
+			rscctl |= get_rscctl_maxdesc(rxq->mb_pool);
+			psrtype |= IXGBE_PSRTYPE_TCPHDR;
+
+			/*
+			 * RSC: Set ITR interval corresponding to 2K ints/s.
+			 *
+			 * Full-sized RSC aggregations for a 10Gb/s link will
+			 * arrive at about 20K aggregation/s rate.
+			 *
+			 * 2K inst/s rate will make only 10% of the
+			 * aggregations to be closed due to the interrupt timer
+			 * expiration for a streaming at wire-speed case.
+			 *
+			 * For a sparse streaming case this setting will yield
+			 * at most 500us latency for a single RSC aggregation.
+			 */
+			eitr   |= (2000 | IXGBE_EITR_CNT_WDIS);
+
+			IXGBE_WRITE_REG(hw, IXGBE_RSCCTL(rxq->reg_idx), rscctl);
+			IXGBE_WRITE_REG(hw, IXGBE_PSRTYPE(rxq->reg_idx),
+								       psrtype);
+			IXGBE_WRITE_REG(hw, IXGBE_EITR(rxq->reg_idx), eitr);
+
+			/*
+			 * RSC requires the mapping of the queue to the
+			 * interrupt vector.
+			 */
+			ixgbe_set_ivar(dev, rxq->reg_idx, i, 0);
+
+			rxq->rsc_en = 1;
+		}
+
+		/*
+		 * We may use bulk allocation only all queues satisfy the
+		 * preconditions.
+		 */
+		bulk_alloc_cond |= check_rx_burst_bulk_alloc_preconditions(rxq);
 	}
 
 	if (rx_conf->enable_scatter) {
@@ -3722,6 +4244,37 @@  ixgbe_dev_rx_init(struct rte_eth_dev *dev)
 		IXGBE_WRITE_REG(hw, IXGBE_RDRXCTL, rdrxctl);
 	}
 
+	/* Finalize RSC configuration  */
+	if (rx_conf->enable_lro) {
+		/*
+		 * Follow the instructions in the 4.6.7.2.1 of the Spec Rev. 3.0
+		 */
+		rdrxctl = IXGBE_READ_REG(hw, IXGBE_RDRXCTL);
+		rdrxctl |= IXGBE_RDRXCTL_RSCACKC;
+		IXGBE_WRITE_REG(hw, IXGBE_RDRXCTL, rdrxctl);
+
+		PMD_INIT_LOG(INFO, "enabling LRO mode");
+
+		dev->data->lro = 1;
+		/*
+		 * Initialize the appropriate LRO callback. If all queues
+		 * satisfy the bulk allocation preconditions (bulk_alloc_cond
+		 * is zero) then we may use bulk allocation. Otherwise use a
+		 * single allocation version.
+		 */
+		if (!bulk_alloc_cond) {
+			PMD_INIT_LOG(INFO, "LRO is requested. Using a bulk "
+					   "allocation version");
+			dev->rx_pkt_burst = ixgbe_recv_pkts_lro_bulk_alloc;
+			dev->data->lro_bulk_alloc = 1;
+		} else {
+			PMD_INIT_LOG(INFO, "LRO is requested. Using a single "
+					   "allocation version");
+			dev->rx_pkt_burst = ixgbe_recv_pkts_lro;
+		}
+	}
+
+
 	return 0;
 }
 
diff --git a/lib/librte_pmd_ixgbe/ixgbe_rxtx.h b/lib/librte_pmd_ixgbe/ixgbe_rxtx.h
index 329007c..e9f8b6c 100644
--- a/lib/librte_pmd_ixgbe/ixgbe_rxtx.h
+++ b/lib/librte_pmd_ixgbe/ixgbe_rxtx.h
@@ -79,6 +79,10 @@  struct igb_rx_entry {
 	struct rte_mbuf *mbuf; /**< mbuf associated with RX descriptor. */
 };
 
+struct igb_rsc_entry {
+	struct rte_mbuf *fbuf; /**< First segment of the fragmented packet. */
+};
+
 /**
  * Structure associated with each descriptor of the TX ring of a TX queue.
  */
@@ -105,6 +109,7 @@  struct igb_rx_queue {
 	volatile uint32_t   *rdt_reg_addr; /**< RDT register address. */
 	volatile uint32_t   *rdh_reg_addr; /**< RDH register address. */
 	struct igb_rx_entry *sw_ring; /**< address of RX software ring. */
+	struct igb_rsc_entry *sw_rsc_ring; /**< address of RSC software ring. */
 	struct rte_mbuf *pkt_first_seg; /**< First segment of current packet. */
 	struct rte_mbuf *pkt_last_seg; /**< Last segment of current packet. */
 	uint64_t            mbuf_initializer; /**< value to init mbufs */
@@ -126,6 +131,7 @@  struct igb_rx_queue {
 	uint8_t             port_id;  /**< Device port identifier. */
 	uint8_t             crc_len;  /**< 0 if CRC stripped, 4 otherwise. */
 	uint8_t             drop_en;  /**< If not 0, set SRRCTL.Drop_En. */
+	uint8_t             rsc_en;   /**< If not 0, RSC is enabled. */
 	uint8_t             rx_deferred_start; /**< not in global dev start. */
 #ifdef RTE_LIBRTE_IXGBE_RX_ALLOW_BULK_ALLOC
 	/** need to alloc dummy mbuf, for wraparound when scanning hw ring */