[dpdk-dev,v5,5/6] eal: add per rx queue interrupt handling based on VFIO

Message ID 1424710542-14637-6-git-send-email-danny.zhou@intel.com (mailing list archive)
State Changes Requested, archived
Headers

Commit Message

Zhou, Danny Feb. 23, 2015, 4:55 p.m. UTC
  v5 changes
- Rebase the patchset onto the HEAD
- Isolate ethdev from EAL for new-added wait-for-rx interrupt function
- Export wait-for-rx interrupt function for shared libraries

v4 changes:
- Adjust position of new-added structure fields

v3 changes:
- Fix review comments

v2 changes:
- Fix compilation issue for a missed header file
- Bug fix: free unreleased resources on the exception path before return
- Consolidate coding style related review comments

This patch does below:
- Create multiple VFIO eventfd for rx queues.
- Handle per rx queue interrupt.
- Eliminate unnecessary suspended DPDK polling thread wakeup mechanism
for rx interrupt by allowing polling thread epoll_wait rx queue
interrupt notification.

Signed-off-by: Danny Zhou <danny.zhou@intel.com>
Tested-by: Yong Liu <yong.liu@intel.com>
---
 lib/librte_eal/bsdapp/eal/rte_eal_version.map   |   1 +
 lib/librte_eal/common/include/rte_eal.h         |   1 +
 lib/librte_eal/common/include/rte_interrupts.h  |  12 ++
 lib/librte_eal/linuxapp/eal/eal_interrupts.c    | 191 ++++++++++++++++++------
 lib/librte_eal/linuxapp/eal/eal_pci_vfio.c      |  12 +-
 lib/librte_eal/linuxapp/eal/rte_eal_version.map |   1 +
 6 files changed, 174 insertions(+), 44 deletions(-)
  

Comments

David Marchand Feb. 24, 2015, 10:42 a.m. UTC | #1
Hello Danny,

On Mon, Feb 23, 2015 at 5:55 PM, Zhou Danny <danny.zhou@intel.com> wrote:

[snip]

+/**
> + * @param intr_handle
> + *   pointer to the interrupt handle.
> + * @param queue_id
> + *   the queue id
> + * @return
> + *   - On success, return 0
> + *   - On failure, returns -1.
> + */
> +int rte_intr_wait_rx_pkt(struct rte_intr_handle *intr_handle,
> +                       uint8_t queue_id);
> +
>

From my point of view, the queue_id (just like port_id) is something that
should be handled by ethdev, not eal.

diff --git a/lib/librte_eal/linuxapp/eal/eal_interrupts.c
> b/lib/librte_eal/linuxapp/eal/eal_interrupts.c
> index 8c5b834..ee0f019 100644
> --- a/lib/librte_eal/linuxapp/eal/eal_interrupts.c
> +++ b/lib/librte_eal/linuxapp/eal/eal_interrupts.c
>

 [snip]

+int
> +rte_intr_wait_rx_pkt(struct rte_intr_handle *intr_handle, uint8_t
> queue_id)
> +{
> +       struct epoll_event ev;
> +       unsigned numfds = 0;
> +
> +       if (!intr_handle || intr_handle->fd < 0 || intr_handle->uio_cfg_fd
> < 0)
> +               return -1;
> +       if (queue_id >= VFIO_MAX_QUEUE_ID)
> +               return -1;
> +
> +       /* create epoll fd */
> +       int pfd = epoll_create(1);
> +       if (pfd < 0) {
> +               RTE_LOG(ERR, EAL, "Cannot create epoll instance\n");
> +               return -1;
> +       }
>

Why recreate the epoll instance at each call to this function ?

+
> +       rte_spinlock_lock(&intr_lock);
> +
> +       ev.events = EPOLLIN | EPOLLPRI;
> +       switch (intr_handle->type) {
> +       case RTE_INTR_HANDLE_UIO:
> +               ev.data.fd = intr_handle->fd;
> +               break;
> +#ifdef VFIO_PRESENT
> +       case RTE_INTR_HANDLE_VFIO_MSIX:
> +       case RTE_INTR_HANDLE_VFIO_MSI:
> +       case RTE_INTR_HANDLE_VFIO_LEGACY:
> +               ev.data.fd = intr_handle->queue_fd[queue_id];
> +               break;
> +#endif
> +       default:
> +               rte_spinlock_unlock(&intr_lock);
> +               close(pfd);
> +               return -1;
> +       }
> +
> +       if (epoll_ctl(pfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) {
> +               RTE_LOG(ERR, EAL, "Error adding fd %d epoll_ctl, %s\n",
> +                       intr_handle->queue_fd[queue_id], strerror(errno));
> +       } else
> +               numfds++;
> +
> +       rte_spinlock_unlock(&intr_lock);
> +       /* serve the interrupt */
> +       eal_intr_handle_rx_interrupts(intr_handle, pfd, numfds);
> +
> +       /**
> +       * when we return, we need to rebuild the
> +       * list of fds to monitor.
> +       */
> +       close(pfd);
>

Why do we need to rebuild this "list of fds" ?
Afaics, the fds we want to observe are not supposed to change in the
meantime.
epoll maintains this list, you don't have to care about this.


Looking at this patchset, I think there is a design issue.
eal does not need to know about portid neither queueid.

eal can provide an api to retrieve the interrupt fds, configure an epoll
instance, wait on an epoll instance etc...
ethdev is then responsible to setup the mapping between port id / queue id
and interrupt fds by asking the eal about those fds.

This would result in an eal api even simpler and we could add other fds in
a single epoll fd for other uses.
  
Zhou, Danny Feb. 25, 2015, 6:58 a.m. UTC | #2
Thanks for comments and please see my answers inline.

From: David Marchand [mailto:david.marchand@6wind.com]

Sent: Tuesday, February 24, 2015 6:42 PM
To: Zhou, Danny
Cc: dev@dpdk.org
Subject: Re: [dpdk-dev] [PATCH v5 5/6] eal: add per rx queue interrupt handling based on VFIO

Hello Danny,

On Mon, Feb 23, 2015 at 5:55 PM, Zhou Danny <danny.zhou@intel.com<mailto:danny.zhou@intel.com>> wrote:

[snip]

+/**
+ * @param intr_handle
+ *   pointer to the interrupt handle.
+ * @param queue_id
+ *   the queue id
+ * @return
+ *   - On success, return 0
+ *   - On failure, returns -1.
+ */
+int rte_intr_wait_rx_pkt(struct rte_intr_handle *intr_handle,
+                       uint8_t queue_id);
+

From my point of view, the queue_id (just like port_id) is something that should be handled by ethdev, not eal.

DZ: See comments below.

diff --git a/lib/librte_eal/linuxapp/eal/eal_interrupts.c b/lib/librte_eal/linuxapp/eal/eal_interrupts.c
index 8c5b834..ee0f019 100644
--- a/lib/librte_eal/linuxapp/eal/eal_interrupts.c
+++ b/lib/librte_eal/linuxapp/eal/eal_interrupts.c

 [snip]

+int
+rte_intr_wait_rx_pkt(struct rte_intr_handle *intr_handle, uint8_t queue_id)
+{
+       struct epoll_event ev;
+       unsigned numfds = 0;
+
+       if (!intr_handle || intr_handle->fd < 0 || intr_handle->uio_cfg_fd < 0)
+               return -1;
+       if (queue_id >= VFIO_MAX_QUEUE_ID)
+               return -1;
+
+       /* create epoll fd */
+       int pfd = epoll_create(1);
+       if (pfd < 0) {
+               RTE_LOG(ERR, EAL, "Cannot create epoll instance\n");
+               return -1;
+       }

Why recreate the epoll instance at each call to this function ?

DZ: To avoid recreating the epoll instance for each queue, the struct rte_intr_handle(or a new structure added to ethdev)
should be extended by adding fields storing per-queue pfd. This way, it could reduce user/kernel context  switch overhead
when calling epoll_create() each time.

Sounds good?

+
+       rte_spinlock_lock(&intr_lock);
+
+       ev.events = EPOLLIN | EPOLLPRI;
+       switch (intr_handle->type) {
+       case RTE_INTR_HANDLE_UIO:
+               ev.data.fd = intr_handle->fd;
+               break;
+#ifdef VFIO_PRESENT
+       case RTE_INTR_HANDLE_VFIO_MSIX:
+       case RTE_INTR_HANDLE_VFIO_MSI:
+       case RTE_INTR_HANDLE_VFIO_LEGACY:
+               ev.data.fd = intr_handle->queue_fd[queue_id];
+               break;
+#endif
+       default:
+               rte_spinlock_unlock(&intr_lock);
+               close(pfd);
+               return -1;
+       }
+
+       if (epoll_ctl(pfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) {
+               RTE_LOG(ERR, EAL, "Error adding fd %d epoll_ctl, %s\n",
+                       intr_handle->queue_fd[queue_id], strerror(errno));
+       } else
+               numfds++;
+
+       rte_spinlock_unlock(&intr_lock);
+       /* serve the interrupt */
+       eal_intr_handle_rx_interrupts(intr_handle, pfd, numfds);
+
+       /**
+       * when we return, we need to rebuild the
+       * list of fds to monitor.
+       */
+       close(pfd);

Why do we need to rebuild this "list of fds" ?
Afaics, the fds we want to observe are not supposed to change in the meantime.
epoll maintains this list, you don't have to care about this.

Agreed, it is not needed.

Looking at this patchset, I think there is a design issue.
eal does not need to know about portid neither queueid.

eal can provide an api to retrieve the interrupt fds, configure an epoll instance, wait on an epoll instance etc...
ethdev is then responsible to setup the mapping between port id / queue id and interrupt fds by asking the eal about those fds.

This would result in an eal api even simpler and we could add other fds in a single epoll fd for other uses.

DZ: The queueid is just an index to the queue related eventfd array stored in EAL. If this array is still in the EAL and ethdev can apply for it and setup mapping for certain queue, there
might be issue for multiple-process use case where the fd resources allocated for secondary process are not freed if the secondary process exits unexpectedly.

Probably we can setup the eventfd array inside ethdev,  and we just need EAL API to wait for ethdev’fd. So application invokes ethdev API with portid and queueid, and ethdev calls eal
API to wait on a ethdev fd which correlates with the specified portid and queueid.

Sounds ok to you?

I am going to travel tomorrow and Steve Liang might follow up on V6 patch submission when I am absent. Thanks Steve!

--
David Marchand
  
David Marchand Feb. 25, 2015, 10:22 a.m. UTC | #3
Hello Danny,

On Wed, Feb 25, 2015 at 7:58 AM, Zhou, Danny <danny.zhou@intel.com> wrote:

>
> +int
> +rte_intr_wait_rx_pkt(struct rte_intr_handle *intr_handle, uint8_t
> queue_id)
> +{
> +       struct epoll_event ev;
> +       unsigned numfds = 0;
> +
> +       if (!intr_handle || intr_handle->fd < 0 || intr_handle->uio_cfg_fd
> < 0)
> +               return -1;
> +       if (queue_id >= VFIO_MAX_QUEUE_ID)
> +               return -1;
> +
> +       /* create epoll fd */
> +       int pfd = epoll_create(1);
> +       if (pfd < 0) {
> +               RTE_LOG(ERR, EAL, "Cannot create epoll instance\n");
> +               return -1;
> +       }
>
>
>
> Why recreate the epoll instance at each call to this function ?
>
>
>
> DZ: To avoid recreating the epoll instance for each queue, the struct
> rte_intr_handle(or a new structure added to ethdev)
>
> should be extended by adding fields storing per-queue pfd. This way, it
> could reduce user/kernel context  switch overhead
>
> when calling epoll_create() each time.
>
>
>
> Sounds good?
>

You don't need a epfd per queue. And hardcoding epfd == eventfd will give a
not very usable api.

Plus, epoll is something linux-specific, so you can't move it out of
eal/linux.
I suppose you need an abstraction here (and in the future we could add
something for bsd ?).



>
> Looking at this patchset, I think there is a design issue.
>
> eal does not need to know about portid neither queueid.
>
>
>
> eal can provide an api to retrieve the interrupt fds, configure an epoll
> instance, wait on an epoll instance etc...
>
> ethdev is then responsible to setup the mapping between port id / queue id
> and interrupt fds by asking the eal about those fds.
>
>
>
> This would result in an eal api even simpler and we could add other fds in
> a single epoll fd for other uses.
>
>
>
> DZ: The queueid is just an index to the queue related eventfd array stored
> in EAL. If this array is still in the EAL and ethdev can apply for it and
> setup mapping for certain queue, there
>
> might be issue for multiple-process use case where the fd resources
> allocated for secondary process are not freed if the secondary process
> exits unexpectedly.
>

Not sure I follow you.
If a secondary process exits, the eventfds created in primary process
should still be valid and reusable.
Why would you need to free them ? Something to do with vfio ?



>
> Probably we can setup the eventfd array inside ethdev,  and we just need
> EAL API to wait for ethdev’fd. So application invokes ethdev API with
> portid and queueid, and ethdev calls eal
>
> API to wait on a ethdev fd which correlates with the specified portid and
> queueid.
>
>
>
> Sounds ok to you?
>

eventfds creation can not be handled by ethdev, since it needs
infrastructure and informations from within the eal/linux.
Again, do we need an abstraction ?

ethdev must be the one that does the mappings between port/queue and
eventfds (or any object that represents a way to wake up for a given
port/queue).
  
Zhou, Danny Feb. 25, 2015, 3:29 p.m. UTC | #4
From: David Marchand [mailto:david.marchand@6wind.com]

Sent: Wednesday, February 25, 2015 6:22 PM
To: Zhou, Danny
Cc: dev@dpdk.org; Liang, Cunming
Subject: Re: [dpdk-dev] [PATCH v5 5/6] eal: add per rx queue interrupt handling based on VFIO

Hello Danny,

On Wed, Feb 25, 2015 at 7:58 AM, Zhou, Danny <danny.zhou@intel.com<mailto:danny.zhou@intel.com>> wrote:

+int
+rte_intr_wait_rx_pkt(struct rte_intr_handle *intr_handle, uint8_t queue_id)
+{
+       struct epoll_event ev;
+       unsigned numfds = 0;
+
+       if (!intr_handle || intr_handle->fd < 0 || intr_handle->uio_cfg_fd < 0)
+               return -1;
+       if (queue_id >= VFIO_MAX_QUEUE_ID)
+               return -1;
+
+       /* create epoll fd */
+       int pfd = epoll_create(1);
+       if (pfd < 0) {
+               RTE_LOG(ERR, EAL, "Cannot create epoll instance\n");
+               return -1;
+       }

Why recreate the epoll instance at each call to this function ?

DZ: To avoid recreating the epoll instance for each queue, the struct rte_intr_handle(or a new structure added to ethdev)
should be extended by adding fields storing per-queue pfd. This way, it could reduce user/kernel context  switch overhead
when calling epoll_create() each time.

Sounds good?

You don't need a epfd per queue. And hardcoding epfd == eventfd will give a not very usable api.

Plus, epoll is something linux-specific, so you can't move it out of eal/linux.
I suppose you need an abstraction here (and in the future we could add something for bsd ?).

DZ: libev provides abstraction layer which is a good candidate to integrate, rather than
reinventing one I think. The BSD support can be implemented in the files under
lib\librte_eal\bsdapp\eal folder by calling BSD specific APIs. Maybe it is a good idea to introduce
a separated component like OS Adaption Layer into EAL in the future once DPDK is widely adopted in
BSD as well as Windows, then all DPDK components invoke Linux specific APIs could instead calling abstraction APIs.

Adding an abstraction here specifically for epoll does not resolve all the porting/migration problem in my mind.


Looking at this patchset, I think there is a design issue.
eal does not need to know about portid neither queueid.

eal can provide an api to retrieve the interrupt fds, configure an epoll instance, wait on an epoll instance etc...
ethdev is then responsible to setup the mapping between port id / queue id and interrupt fds by asking the eal about those fds.

This would result in an eal api even simpler and we could add other fds in a single epoll fd for other uses.

DZ: The queueid is just an index to the queue related eventfd array stored in EAL. If this array is still in the EAL and ethdev can apply for it and setup mapping for certain queue, there
might be issue for multiple-process use case where the fd resources allocated for secondary process are not freed if the secondary process exits unexpectedly.

Not sure I follow you.
If a secondary process exits, the eventfds created in primary process should still be valid and reusable.
Why would you need to free them ? Something to do with vfio ?

DZ: See below.

Probably we can setup the eventfd array inside ethdev,  and we just need EAL API to wait for ethdev’fd. So application invokes ethdev API with portid and queueid, and ethdev calls eal
API to wait on a ethdev fd which correlates with the specified portid and queueid.

Sounds ok to you?

eventfds creation can not be handled by ethdev, since it needs infrastructure and informations from within the eal/linux.
Again, do we need an abstraction ?

ethdev must be the one that does the mappings between port/queue and eventfds (or any object that represents a way to wake up for a given port/queue).

DZ: agreed after revisiting code. Let us follow your direction to create a ethdev API, similar to rte_eth_dev_rx_queue_intr_enable()/rte_eth_dev_rx_queue_intr_disable(), to use portiid and queueid as arguments. Then this ethdev API uses the mapped eventfds to invoke corresponding EAL API, waiting for interrupt event notification from kernel.  A V6 patchset will be created for this.

--
David Marchand
  
Thomas Monjalon Feb. 25, 2015, 3:44 p.m. UTC | #5
Please Danny, click on the button "uninstall Outlook"
or configure it to have quote marks.
This email is really hard to read.

2015-02-25 15:29, Zhou, Danny:
> From: David Marchand [mailto:david.marchand@6wind.com]
> Sent: Wednesday, February 25, 2015 6:22 PM
> To: Zhou, Danny
> Cc: dev@dpdk.org; Liang, Cunming
> Subject: Re: [dpdk-dev] [PATCH v5 5/6] eal: add per rx queue interrupt handling based on VFIO
> 
> Hello Danny,
> 
> On Wed, Feb 25, 2015 at 7:58 AM, Zhou, Danny <danny.zhou@intel.com<mailto:danny.zhou@intel.com>> wrote:
> 
> +int
> +rte_intr_wait_rx_pkt(struct rte_intr_handle *intr_handle, uint8_t queue_id)
> +{
> +       struct epoll_event ev;
> +       unsigned numfds = 0;
> +
> +       if (!intr_handle || intr_handle->fd < 0 || intr_handle->uio_cfg_fd < 0)
> +               return -1;
> +       if (queue_id >= VFIO_MAX_QUEUE_ID)
> +               return -1;
> +
> +       /* create epoll fd */
> +       int pfd = epoll_create(1);
> +       if (pfd < 0) {
> +               RTE_LOG(ERR, EAL, "Cannot create epoll instance\n");
> +               return -1;
> +       }
> 
> Why recreate the epoll instance at each call to this function ?
> 
> DZ: To avoid recreating the epoll instance for each queue, the struct rte_intr_handle(or a new structure added to ethdev)
> should be extended by adding fields storing per-queue pfd. This way, it could reduce user/kernel context  switch overhead
> when calling epoll_create() each time.
> 
> Sounds good?
> 
> You don't need a epfd per queue. And hardcoding epfd == eventfd will give a not very usable api.
> 
> Plus, epoll is something linux-specific, so you can't move it out of eal/linux.
> I suppose you need an abstraction here (and in the future we could add something for bsd ?).
> 
> DZ: libev provides abstraction layer which is a good candidate to integrate, rather than
> reinventing one I think. The BSD support can be implemented in the files under
> lib\librte_eal\bsdapp\eal folder by calling BSD specific APIs. Maybe it is a good idea to introduce
> a separated component like OS Adaption Layer into EAL in the future once DPDK is widely adopted in
> BSD as well as Windows, then all DPDK components invoke Linux specific APIs could instead calling abstraction APIs.

EAL means Environment Abstraction Layer.
In my mind, OS is part of the environment.
DPDK components don't invoke Linux specific APIs, they use EAL!
What are you thinking about?

> Adding an abstraction here specifically for epoll does not resolve all the porting/migration problem in my mind.
  
David Marchand Feb. 25, 2015, 3:52 p.m. UTC | #6
On Wed, Feb 25, 2015 at 4:29 PM, Zhou, Danny <danny.zhou@intel.com> wrote:

>
>
>
>
> *From:* David Marchand [mailto:david.marchand@6wind.com]
> *Sent:* Wednesday, February 25, 2015 6:22 PM
> *To:* Zhou, Danny
> *Cc:* dev@dpdk.org; Liang, Cunming
> *Subject:* Re: [dpdk-dev] [PATCH v5 5/6] eal: add per rx queue interrupt
> handling based on VFIO
>
>
>
>
>
> DZ: To avoid recreating the epoll instance for each queue, the struct
> rte_intr_handle(or a new structure added to ethdev)
>
> should be extended by adding fields storing per-queue pfd. This way, it
> could reduce user/kernel context  switch overhead
>
> when calling epoll_create() each time.
>
>
>
> Sounds good?
>
>
>
> You don't need a epfd per queue. And hardcoding epfd == eventfd will give
> a not very usable api.
>
>
>
> Plus, epoll is something linux-specific, so you can't move it out of
> eal/linux.
>
> I suppose you need an abstraction here (and in the future we could add
> something for bsd ?).
>
>
>
> DZ: libev provides abstraction layer which is a good candidate to
> integrate, rather than
>
> reinventing one I think. The BSD support can be implemented in the files
> under
>
> lib\librte_eal\bsdapp\eal folder by calling BSD specific APIs. Maybe it is
> a good idea to introduce
>
> a separated component like OS Adaption Layer into EAL in the future once
> DPDK is widely adopted in
>
> BSD as well as Windows, then all DPDK components invoke Linux specific
> APIs could instead calling abstraction APIs.
>
>
>
> Adding an abstraction here specifically for epoll does not resolve all the
> porting/migration problem in my mind.
>

Yes, reusing this kind of library (or libevent) looks like a good idea.

Hum, I would say eal/common is there for the common part and for the
different abstractions.
Do you see anything that would not fit in ?



>  eventfds creation can not be handled by ethdev, since it needs
> infrastructure and informations from within the eal/linux.
>
> Again, do we need an abstraction ?
>
>
>
> ethdev must be the one that does the mappings between port/queue and
> eventfds (or any object that represents a way to wake up for a given
> port/queue).
>
>
>
> DZ: agreed after revisiting code. Let us follow your direction to create a
> ethdev API, similar to
> rte_eth_dev_rx_queue_intr_enable()/rte_eth_dev_rx_queue_intr_disable(), to
> use portiid and queueid as arguments. Then this ethdev API uses the mapped
> eventfds to invoke corresponding EAL API, waiting for interrupt event
> notification from kernel.  A V6 patchset will be created for this.
>

Ok, I will look at it when available.
  

Patch

diff --git a/lib/librte_eal/bsdapp/eal/rte_eal_version.map b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
index 5ed6e4d..dd300ea 100644
--- a/lib/librte_eal/bsdapp/eal/rte_eal_version.map
+++ b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
@@ -57,6 +57,7 @@  DPDK_2.0 {
 	rte_intr_callback_unregister;
 	rte_intr_disable;
 	rte_intr_enable;
+	rte_intr_wait_rx_pkt;
 	rte_log;
 	rte_log_add_in_history;
 	rte_log_cur_msg_loglevel;
diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
index f4ecd2e..b9d5230 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -150,6 +150,7 @@  int rte_eal_iopl_init(void);
  *   - On failure, a negative error value.
  */
 int rte_eal_init(int argc, char **argv);
+
 /**
  * Usage function typedef used by the application usage function.
  *
diff --git a/lib/librte_eal/common/include/rte_interrupts.h b/lib/librte_eal/common/include/rte_interrupts.h
index 609c34b..1ba2421 100644
--- a/lib/librte_eal/common/include/rte_interrupts.h
+++ b/lib/librte_eal/common/include/rte_interrupts.h
@@ -113,6 +113,18 @@  int rte_intr_enable(struct rte_intr_handle *intr_handle);
  */
 int rte_intr_disable(struct rte_intr_handle *intr_handle);
 
+/**
+ * @param intr_handle
+ *   pointer to the interrupt handle.
+ * @param queue_id
+ *   the queue id
+ * @return
+ *   - On success, return 0
+ *   - On failure, returns -1.
+ */
+int rte_intr_wait_rx_pkt(struct rte_intr_handle *intr_handle,
+			uint8_t queue_id);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/lib/librte_eal/linuxapp/eal/eal_interrupts.c b/lib/librte_eal/linuxapp/eal/eal_interrupts.c
index 8c5b834..ee0f019 100644
--- a/lib/librte_eal/linuxapp/eal/eal_interrupts.c
+++ b/lib/librte_eal/linuxapp/eal/eal_interrupts.c
@@ -127,6 +127,9 @@  static pthread_t intr_thread;
 #ifdef VFIO_PRESENT
 
 #define IRQ_SET_BUF_LEN  (sizeof(struct vfio_irq_set) + sizeof(int))
+/* irq set buffer length for queue interrupts and LSC interrupt */
+#define MSIX_IRQ_SET_BUF_LEN (sizeof(struct vfio_irq_set) + \
+				sizeof(int) * (VFIO_MAX_QUEUE_ID + 1))
 
 /* enable legacy (INTx) interrupts */
 static int
@@ -218,10 +221,10 @@  vfio_disable_intx(struct rte_intr_handle *intr_handle) {
 	return 0;
 }
 
-/* enable MSI-X interrupts */
+/* enable MSI interrupts */
 static int
 vfio_enable_msi(struct rte_intr_handle *intr_handle) {
-	int len, ret;
+	int len, ret, max_intr;
 	char irq_set_buf[IRQ_SET_BUF_LEN];
 	struct vfio_irq_set *irq_set;
 	int *fd_ptr;
@@ -230,12 +233,19 @@  vfio_enable_msi(struct rte_intr_handle *intr_handle) {
 
 	irq_set = (struct vfio_irq_set *) irq_set_buf;
 	irq_set->argsz = len;
-	irq_set->count = 1;
+	if ((!intr_handle->max_intr) ||
+		(intr_handle->max_intr > VFIO_MAX_QUEUE_ID))
+		max_intr = VFIO_MAX_QUEUE_ID + 1;
+	else
+		max_intr = intr_handle->max_intr;
+
+	irq_set->count = max_intr;
 	irq_set->flags = VFIO_IRQ_SET_DATA_EVENTFD | VFIO_IRQ_SET_ACTION_TRIGGER;
 	irq_set->index = VFIO_PCI_MSI_IRQ_INDEX;
 	irq_set->start = 0;
 	fd_ptr = (int *) &irq_set->data;
-	*fd_ptr = intr_handle->fd;
+	memcpy(fd_ptr, intr_handle->queue_fd, sizeof(intr_handle->queue_fd));
+	fd_ptr[max_intr - 1] = intr_handle->fd;
 
 	ret = ioctl(intr_handle->vfio_dev_fd, VFIO_DEVICE_SET_IRQS, irq_set);
 
@@ -244,27 +254,10 @@  vfio_enable_msi(struct rte_intr_handle *intr_handle) {
 						intr_handle->fd);
 		return -1;
 	}
-
-	/* manually trigger interrupt to enable it */
-	memset(irq_set, 0, len);
-	len = sizeof(struct vfio_irq_set);
-	irq_set->argsz = len;
-	irq_set->count = 1;
-	irq_set->flags = VFIO_IRQ_SET_DATA_NONE | VFIO_IRQ_SET_ACTION_TRIGGER;
-	irq_set->index = VFIO_PCI_MSI_IRQ_INDEX;
-	irq_set->start = 0;
-
-	ret = ioctl(intr_handle->vfio_dev_fd, VFIO_DEVICE_SET_IRQS, irq_set);
-
-	if (ret) {
-		RTE_LOG(ERR, EAL, "Error triggering MSI interrupts for fd %d\n",
-						intr_handle->fd);
-		return -1;
-	}
 	return 0;
 }
 
-/* disable MSI-X interrupts */
+/* disable MSI interrupts */
 static int
 vfio_disable_msi(struct rte_intr_handle *intr_handle) {
 	struct vfio_irq_set *irq_set;
@@ -292,8 +285,8 @@  vfio_disable_msi(struct rte_intr_handle *intr_handle) {
 /* enable MSI-X interrupts */
 static int
 vfio_enable_msix(struct rte_intr_handle *intr_handle) {
-	int len, ret;
-	char irq_set_buf[IRQ_SET_BUF_LEN];
+	int len, ret, max_intr;
+	char irq_set_buf[MSIX_IRQ_SET_BUF_LEN];
 	struct vfio_irq_set *irq_set;
 	int *fd_ptr;
 
@@ -301,12 +294,19 @@  vfio_enable_msix(struct rte_intr_handle *intr_handle) {
 
 	irq_set = (struct vfio_irq_set *) irq_set_buf;
 	irq_set->argsz = len;
-	irq_set->count = 1;
+	if ((!intr_handle->max_intr) ||
+		(intr_handle->max_intr > VFIO_MAX_QUEUE_ID))
+		max_intr = VFIO_MAX_QUEUE_ID + 1;
+	else
+		max_intr = intr_handle->max_intr;
+
+	irq_set->count = max_intr;
 	irq_set->flags = VFIO_IRQ_SET_DATA_EVENTFD | VFIO_IRQ_SET_ACTION_TRIGGER;
 	irq_set->index = VFIO_PCI_MSIX_IRQ_INDEX;
 	irq_set->start = 0;
 	fd_ptr = (int *) &irq_set->data;
-	*fd_ptr = intr_handle->fd;
+	memcpy(fd_ptr, intr_handle->queue_fd, sizeof(intr_handle->queue_fd));
+	fd_ptr[max_intr - 1] = intr_handle->fd;
 
 	ret = ioctl(intr_handle->vfio_dev_fd, VFIO_DEVICE_SET_IRQS, irq_set);
 
@@ -316,22 +316,6 @@  vfio_enable_msix(struct rte_intr_handle *intr_handle) {
 		return -1;
 	}
 
-	/* manually trigger interrupt to enable it */
-	memset(irq_set, 0, len);
-	len = sizeof(struct vfio_irq_set);
-	irq_set->argsz = len;
-	irq_set->count = 1;
-	irq_set->flags = VFIO_IRQ_SET_DATA_NONE | VFIO_IRQ_SET_ACTION_TRIGGER;
-	irq_set->index = VFIO_PCI_MSIX_IRQ_INDEX;
-	irq_set->start = 0;
-
-	ret = ioctl(intr_handle->vfio_dev_fd, VFIO_DEVICE_SET_IRQS, irq_set);
-
-	if (ret) {
-		RTE_LOG(ERR, EAL, "Error triggering MSI-X interrupts for fd %d\n",
-						intr_handle->fd);
-		return -1;
-	}
 	return 0;
 }
 
@@ -339,7 +323,7 @@  vfio_enable_msix(struct rte_intr_handle *intr_handle) {
 static int
 vfio_disable_msix(struct rte_intr_handle *intr_handle) {
 	struct vfio_irq_set *irq_set;
-	char irq_set_buf[IRQ_SET_BUF_LEN];
+	char irq_set_buf[MSIX_IRQ_SET_BUF_LEN];
 	int len, ret;
 
 	len = sizeof(struct vfio_irq_set);
@@ -860,3 +844,124 @@  rte_eal_intr_init(void)
 	return -ret;
 }
 
+static void
+eal_intr_process_rx_interrupts(struct rte_intr_handle *intr_handle,
+			struct epoll_event *events, int nfds)
+{
+	int n, bytes_read;
+	union rte_intr_read_buffer buf;
+
+	for (n = 0; n < nfds; n++) {
+		/* set the length to be read for different handle type */
+		switch (intr_handle->type) {
+		case RTE_INTR_HANDLE_UIO:
+			bytes_read = sizeof(buf.uio_intr_count);
+			break;
+		case RTE_INTR_HANDLE_ALARM:
+			bytes_read = sizeof(buf.timerfd_num);
+			break;
+#ifdef VFIO_PRESENT
+		case RTE_INTR_HANDLE_VFIO_MSIX:
+		case RTE_INTR_HANDLE_VFIO_MSI:
+		case RTE_INTR_HANDLE_VFIO_LEGACY:
+			bytes_read = sizeof(buf.vfio_intr_count);
+			break;
+#endif
+		default:
+			bytes_read = 1;
+			break;
+		}
+
+		/**
+		* read out to clear the ready-to-be-read flag
+		* for epoll_wait.
+		*/
+		bytes_read = read(events[n].data.fd, &buf, bytes_read);
+		if (bytes_read < 0)
+			RTE_LOG(ERR, EAL, "Error reading from file "
+				"descriptor %d: %s\n", events[n].data.fd,
+							strerror(errno));
+		else if (bytes_read == 0)
+			RTE_LOG(ERR, EAL, "Read nothing from file "
+				"descriptor %d\n", events[n].data.fd);
+	}
+}
+
+static void
+eal_intr_handle_rx_interrupts(struct rte_intr_handle *intr_handle,
+			int pfd, unsigned totalfds)
+{
+	struct epoll_event events[totalfds];
+	int nfds = 0;
+
+	do {
+		nfds = epoll_wait(pfd, events, totalfds,
+				EAL_INTR_EPOLL_WAIT_FOREVER);
+		/* epoll_wait fail */
+		if (nfds < 0) {
+			RTE_LOG(ERR, EAL,
+				"epoll_wait returns with fail\n");
+			return;
+		}
+	} while (nfds == 0);
+
+	/* epoll_wait has at least one fd ready to read */
+	eal_intr_process_rx_interrupts(intr_handle, events, nfds);
+}
+
+int
+rte_intr_wait_rx_pkt(struct rte_intr_handle *intr_handle, uint8_t queue_id)
+{
+	struct epoll_event ev;
+	unsigned numfds = 0;
+
+	if (!intr_handle || intr_handle->fd < 0 || intr_handle->uio_cfg_fd < 0)
+		return -1;
+	if (queue_id >= VFIO_MAX_QUEUE_ID)
+		return -1;
+
+	/* create epoll fd */
+	int pfd = epoll_create(1);
+	if (pfd < 0) {
+		RTE_LOG(ERR, EAL, "Cannot create epoll instance\n");
+		return -1;
+	}
+
+	rte_spinlock_lock(&intr_lock);
+
+	ev.events = EPOLLIN | EPOLLPRI;
+	switch (intr_handle->type) {
+	case RTE_INTR_HANDLE_UIO:
+		ev.data.fd = intr_handle->fd;
+		break;
+#ifdef VFIO_PRESENT
+	case RTE_INTR_HANDLE_VFIO_MSIX:
+	case RTE_INTR_HANDLE_VFIO_MSI:
+	case RTE_INTR_HANDLE_VFIO_LEGACY:
+		ev.data.fd = intr_handle->queue_fd[queue_id];
+		break;
+#endif
+	default:
+		rte_spinlock_unlock(&intr_lock);
+		close(pfd);
+		return -1;
+	}
+
+	if (epoll_ctl(pfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) {
+		RTE_LOG(ERR, EAL, "Error adding fd %d epoll_ctl, %s\n",
+			intr_handle->queue_fd[queue_id], strerror(errno));
+	} else
+		numfds++;
+
+	rte_spinlock_unlock(&intr_lock);
+	/* serve the interrupt */
+	eal_intr_handle_rx_interrupts(intr_handle, pfd, numfds);
+
+	/**
+	* when we return, we need to rebuild the
+	* list of fds to monitor.
+	*/
+	close(pfd);
+
+	return 0;
+}
diff --git a/lib/librte_eal/linuxapp/eal/eal_pci_vfio.c b/lib/librte_eal/linuxapp/eal/eal_pci_vfio.c
index 20e0977..0e5fa76 100644
--- a/lib/librte_eal/linuxapp/eal/eal_pci_vfio.c
+++ b/lib/librte_eal/linuxapp/eal/eal_pci_vfio.c
@@ -283,11 +283,21 @@  pci_vfio_setup_interrupts(struct rte_pci_device *dev, int vfio_dev_fd)
 
 		dev->intr_handle.fd = fd;
 		dev->intr_handle.vfio_dev_fd = vfio_dev_fd;
-
 		switch (i) {
 		case VFIO_PCI_MSIX_IRQ_INDEX:
 			internal_config.vfio_intr_mode = RTE_INTR_MODE_MSIX;
 			dev->intr_handle.type = RTE_INTR_HANDLE_VFIO_MSIX;
+			for (i = 0; i < VFIO_MAX_QUEUE_ID; i++) {
+				fd = eventfd(0, 0);
+				if (fd < 0) {
+					RTE_LOG(ERR, EAL,
+					"cannot setup eventfd,"
+					"error %i (%s)\n",
+					errno, strerror(errno));
+					return -1;
+				}
+				dev->intr_handle.queue_fd[i] = fd;
+			}
 			break;
 		case VFIO_PCI_MSI_IRQ_INDEX:
 			internal_config.vfio_intr_mode = RTE_INTR_MODE_MSI;
diff --git a/lib/librte_eal/linuxapp/eal/rte_eal_version.map b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
index 5ed6e4d..dd300ea 100644
--- a/lib/librte_eal/linuxapp/eal/rte_eal_version.map
+++ b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
@@ -57,6 +57,7 @@  DPDK_2.0 {
 	rte_intr_callback_unregister;
 	rte_intr_disable;
 	rte_intr_enable;
+	rte_intr_wait_rx_pkt;
 	rte_log;
 	rte_log_add_in_history;
 	rte_log_cur_msg_loglevel;