[dpdk-dev,v6,4/8] eal/linux: add per rx queue interrupt handling based on VFIO

Message ID 1425012976-10173-5-git-send-email-cunming.liang@intel.com (mailing list archive)
State Changes Requested, archived
Headers

Commit Message

Cunming Liang Feb. 27, 2015, 4:56 a.m. UTC
  This patch does below:
 - Create multiple VFIO eventfd for rx queues.
 - Handle per rx queue interrupt.
 - Eliminate unnecessary suspended DPDK polling thread wakeup mechanism
   for rx interrupt by allowing polling thread epoll_wait rx queue
   interrupt notification.

Signed-off-by: Danny Zhou <danny.zhou@intel.com>
Signed-off-by: Cunming Liang <cunming.liang@intel.com>
---
v6 changes
 - split rte_intr_wait_rx_pkt into two function, wait and set.
 - rewrite rte_intr_rx_wait/rte_intr_rx_set to remove queue visibility on eal.
 - rte_intr_rx_wait to support multiplexing.
 - allow epfd as input to support flexible event fd combination.

v5 changes
 - Rebase the patchset onto the HEAD
 - Isolate ethdev from EAL for new-added wait-for-rx interrupt function
 - Export wait-for-rx interrupt function for shared libraries

v4 changes:
 - Adjust position of new-added structure fields

v3 changes:
 - Fix review comments

v2 changes:
 - Fix compilation issue for a missed header file
 - Bug fix: free unreleased resources on the exception path before return
 - Consolidate coding style related review comments

 lib/librte_eal/linuxapp/eal/eal_interrupts.c    | 224 +++++++++++++++++++-----
 lib/librte_eal/linuxapp/eal/eal_pci_vfio.c      |  23 ++-
 lib/librte_eal/linuxapp/eal/rte_eal_version.map |   2 +
 3 files changed, 201 insertions(+), 48 deletions(-)
  

Comments

David Marchand Feb. 27, 2015, 10:33 a.m. UTC | #1
I am not really comfortable with this api.

This is just creating something on top of the standard epoll api with
limitations.
In the end, we could just use an external lib that does this already.

So ok, this will work for your limited use case, but this will not be
really useful for anything else.
Not sure it has its place in eal, this is more an example to me.


On Fri, Feb 27, 2015 at 5:56 AM, Cunming Liang <cunming.liang@intel.com>
wrote:

> This patch does below:
>  - Create multiple VFIO eventfd for rx queues.
>  - Handle per rx queue interrupt.
>  - Eliminate unnecessary suspended DPDK polling thread wakeup mechanism
>    for rx interrupt by allowing polling thread epoll_wait rx queue
>    interrupt notification.
>
> Signed-off-by: Danny Zhou <danny.zhou@intel.com>
> Signed-off-by: Cunming Liang <cunming.liang@intel.com>
> ---
> v6 changes
>  - split rte_intr_wait_rx_pkt into two function, wait and set.
>  - rewrite rte_intr_rx_wait/rte_intr_rx_set to remove queue visibility on
> eal.
>  - rte_intr_rx_wait to support multiplexing.
>  - allow epfd as input to support flexible event fd combination.
>
>
>  lib/librte_eal/linuxapp/eal/eal_interrupts.c    | 224
> +++++++++++++++++++-----
>  lib/librte_eal/linuxapp/eal/eal_pci_vfio.c      |  23 ++-
>  lib/librte_eal/linuxapp/eal/rte_eal_version.map |   2 +
>  3 files changed, 201 insertions(+), 48 deletions(-)
>
> diff --git a/lib/librte_eal/linuxapp/eal/eal_interrupts.c
> b/lib/librte_eal/linuxapp/eal/eal_interrupts.c
> index 8c5b834..f90c2b4 100644
> --- a/lib/librte_eal/linuxapp/eal/eal_interrupts.c
> +++ b/lib/librte_eal/linuxapp/eal/eal_interrupts.c
>
>
[snip]


>
> +static void
> +eal_intr_process_rxtx_interrupts(struct rte_intr_handle *intr_handle,
> +                                struct epoll_event *events,
> +                                uint32_t *vec, int nfds)
> +{
> +       int i, bytes_read;
> +       union rte_intr_read_buffer buf;
> +       int fd;
> +
> +       for (i = 0; i < nfds; i++) {
> +               /* set the length to be read for different handle type */
> +               switch (intr_handle->type) {
> +               case RTE_INTR_HANDLE_UIO:
> +                       bytes_read = sizeof(buf.uio_intr_count);
> +                       break;
> +               case RTE_INTR_HANDLE_ALARM:
> +                       bytes_read = sizeof(buf.timerfd_num);
> +                       break;
> +#ifdef VFIO_PRESENT
> +               case RTE_INTR_HANDLE_VFIO_MSIX:
> +               case RTE_INTR_HANDLE_VFIO_MSI:
> +               case RTE_INTR_HANDLE_VFIO_LEGACY:
> +                       bytes_read = sizeof(buf.vfio_intr_count);
> +                       break;
> +#endif
> +               default:
> +                       bytes_read = 1;
> +                       break;
> +               }
> +
> +               /**
> +               * read out to clear the ready-to-be-read flag
> +               * for epoll_wait.
> +               */
> +               vec[i] = events[i].data.u32;
> +               assert(vec[i] < VFIO_MAX_RXTX_INTR_ID);
> +
> +               fd = intr_handle->efds[vec[i]];
> +               bytes_read = read(fd, &buf, bytes_read);
> +               if (bytes_read < 0)
> +                       RTE_LOG(ERR, EAL, "Error reading from file "
> +                               "descriptor %d: %s\n", fd,
> strerror(errno));
> +               else if (bytes_read == 0)
> +                       RTE_LOG(ERR, EAL, "Read nothing from file "
> +                               "descriptor %d\n", fd);
> +       }
> +}
>

Why unconditionnally read ?
You are absorbing events from the application if the application gave you
an external epfd and populated it with its own fds.


> +
> +static int init_tls_epfd(void)
> +{
> +       int pfd = epoll_create(1);
> +       if (pfd < 0) {
> +               RTE_LOG(ERR, EAL,
> +                       "Cannot create epoll instance\n");
> +               return -1;
> +       }
> +       return pfd;
> +}
> +
> +int
> +rte_intr_rx_wait(struct rte_intr_handle *intr_handle, int epfd,
> +                uint32_t *vec, uint16_t num)
> +{
>

In the end, this "rx" does not mean anything to eal.


+#define MAX_EVENTS      8
> +       struct epoll_event events[MAX_EVENTS];
> +       int ret, nfds = 0;
> +
> +       if (!intr_handle || !vec) {
> +               RTE_LOG(ERR, EAL, "invalid input parameter\n");
> +               return -1;
> +       }
> +
> +       if (intr_handle->type != RTE_INTR_HANDLE_VFIO_MSIX) {
> +               RTE_LOG(ERR, EAL, "intr type should be VFIO_MSIX\n");
> +               return -1;
> +       }
> +
> +       if (epfd == RTE_EPOLL_FD_ANY) {
> +               /* using per thread epoll fd */
> +               if (unlikely(RTE_PER_LCORE(_epfd) == -1))
> +                       RTE_PER_LCORE(_epfd) = init_tls_epfd();
> +               epfd = RTE_PER_LCORE(_epfd);
> +       }
>

Rather than testing every time, this should be set by the caller, i.e. epfd
is always valid.
If application does not want to create a epfd, then it calls
 rte_intr_rx_wait with RTE_EPOLL_FD_ANY (this name is not well chosen) that
is a macro wrapped to RTE_PER_LCORE(_epfd).

init_tls_epfd() should be called only once at init time.
No need to check every time.

+
> +       do {
> +               ret = epoll_wait(epfd, events,
> +                                RTE_MIN(num, MAX_EVENTS),
> +                                EAL_INTR_EPOLL_WAIT_FOREVER);
> +               if (unlikely(ret < 0)) {
> +                       /* epoll_wait fail */
> +                       RTE_LOG(ERR, EAL, "epoll_wait returns with
> fail\n");
> +                       return -1;
> +               } else if (ret > 0) {
> +                       /* epoll_wait has at least one fd ready to read */
> +                       eal_intr_process_rxtx_interrupts(intr_handle,
> events,
> +                                                        vec, ret);
> +                       num -= ret;
> +                       vec += ret;
> +                       nfds += ret;
> +               } else if (nfds > 0)
> +                       break;
> +       } while (num > 0);
> +
> +       return nfds;
> +}
>

You are blocking unless all fds have been set, so you are serialising all
events.

+
> +int
> +rte_intr_rx_set(struct rte_intr_handle *intr_handle, int epfd,
> +               int op, uint32_t vec)
> +{
> +       struct epoll_event ev;
> +
> +       if (!intr_handle || vec >= VFIO_MAX_RXTX_INTR_ID) {
> +               RTE_LOG(ERR, EAL, "invalid input parameter\n");
> +               return -1;
> +       }
> +
> +       if (intr_handle->type != RTE_INTR_HANDLE_VFIO_MSIX) {
> +               RTE_LOG(ERR, EAL, "intr type should be VFIO_MSIX\n");
> +               return -1;
> +       }
> +
> +       switch (op) {
> +       case RTE_INTR_EVENT_ADD:
> +               op = EPOLL_CTL_ADD;
> +               break;
> +       case RTE_INTR_EVENT_DEL:
> +               op = EPOLL_CTL_DEL;
> +               break;
> +       default:
> +               RTE_LOG(ERR, EAL, "event op type mismatch\n");
> +               return -1;
> +       }
> +
> +       if (epfd == RTE_EPOLL_FD_ANY) {
> +               /* using per thread epoll fd */
> +               if (RTE_PER_LCORE(_epfd) == -1)
> +                       RTE_PER_LCORE(_epfd) = init_tls_epfd();
> +               epfd = RTE_PER_LCORE(_epfd);
> +       }
> +
> +       ev.data.u32 = vec;
> +       ev.events = EPOLLIN | EPOLLPRI;
> +       if (epoll_ctl(epfd, op, intr_handle->efds[vec], &ev) < 0) {
> +               RTE_LOG(ERR, EAL, "Error op %d fd %d epoll_ctl, %s\n",
> +                       op, intr_handle->efds[vec], strerror(errno));
> +               return -1;
> +       }
> +
> +       return 0;
> +}
>



> diff --git a/lib/librte_eal/linuxapp/eal/eal_pci_vfio.c
> b/lib/librte_eal/linuxapp/eal/eal_pci_vfio.c
> index ee9660f..d90d23c 100644
> --- a/lib/librte_eal/linuxapp/eal/eal_pci_vfio.c
> +++ b/lib/librte_eal/linuxapp/eal/eal_pci_vfio.c
> @@ -38,6 +38,7 @@
>  #include <sys/socket.h>
>  #include <sys/ioctl.h>
>  #include <sys/mman.h>
> +#include <sys/epoll.h>
>
>  #include <rte_log.h>
>  #include <rte_pci.h>
> @@ -274,16 +275,18 @@ pci_vfio_setup_interrupts(struct rte_pci_device
> *dev, int vfio_dev_fd)
>                 ret = ioctl(vfio_dev_fd, VFIO_DEVICE_GET_IRQ_INFO, &irq);
>                 if (ret < 0) {
>                         RTE_LOG(ERR, EAL, "  cannot get IRQ info, "
> -                                       "error %i (%s)\n", errno,
> strerror(errno));
> +                               "error %i (%s)\n", errno, strerror(errno));
>                         return -1;
>                 }
>

Garbage, this has nothing to do with the patch.


>
>                 /* if this vector cannot be used with eventfd, fail if we
> explicitly
>                  * specified interrupt type, otherwise continue */
>                 if ((irq.flags & VFIO_IRQ_INFO_EVENTFD) == 0) {
> -                       if (internal_config.vfio_intr_mode !=
> RTE_INTR_MODE_NONE) {
> +                       if (internal_config.vfio_intr_mode !=
> +                           RTE_INTR_MODE_NONE) {
>                                 RTE_LOG(ERR, EAL,
> -                                               "  interrupt vector does
> not support eventfd!\n");
> +                                       "  interrupt vector "
> +                                       "does not support eventfd!\n");
>                                 return -1;
>                         } else
>                                 continue;
>

Idem.



> @@ -293,17 +296,27 @@ pci_vfio_setup_interrupts(struct rte_pci_device
> *dev, int vfio_dev_fd)
>                 fd = eventfd(0, 0);
>                 if (fd < 0) {
>                         RTE_LOG(ERR, EAL, "  cannot set up eventfd, "
> -                                       "error %i (%s)\n", errno,
> strerror(errno));
> +                               "error %i (%s)\n", errno, strerror(errno));
>

Idem.


>                         return -1;
>                 }
>
>                 dev->intr_handle.fd = fd;
>                 dev->intr_handle.vfio_dev_fd = vfio_dev_fd;
> -
>

Idem.


>                 switch (i) {
>                 case VFIO_PCI_MSIX_IRQ_INDEX:
>                         internal_config.vfio_intr_mode =
> RTE_INTR_MODE_MSIX;
>                         dev->intr_handle.type = RTE_INTR_HANDLE_VFIO_MSIX;
> +                       for (i = 0; i < VFIO_MAX_RXTX_INTR_ID; i++) {
> +                               fd = eventfd(0, 0);
> +                               if (fd < 0) {
> +                                       RTE_LOG(ERR, EAL,
> +                                               "cannot setup eventfd,"
> +                                               "error %i (%s)\n",
> +                                               errno, strerror(errno));
> +                                       return -1;
> +                               }
> +                               dev->intr_handle.efds[i] = fd;
> +                       }
>                         break;
>                 case VFIO_PCI_MSI_IRQ_INDEX:
>                         internal_config.vfio_intr_mode = RTE_INTR_MODE_MSI;
> diff --git a/lib/librte_eal/linuxapp/eal/rte_eal_version.map
> b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
> index 5f1857d..892a452 100644
> --- a/lib/librte_eal/linuxapp/eal/rte_eal_version.map
> +++ b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
> @@ -64,6 +64,8 @@ DPDK_2.0 {
>         rte_intr_callback_unregister;
>         rte_intr_disable;
>         rte_intr_enable;
> +       rte_intr_rx_set;
> +       rte_intr_rx_wait;
>         rte_log;
>         rte_log_add_in_history;
>         rte_log_cur_msg_loglevel;
> --
> 1.8.1.4
>
  

Patch

diff --git a/lib/librte_eal/linuxapp/eal/eal_interrupts.c b/lib/librte_eal/linuxapp/eal/eal_interrupts.c
index 8c5b834..f90c2b4 100644
--- a/lib/librte_eal/linuxapp/eal/eal_interrupts.c
+++ b/lib/librte_eal/linuxapp/eal/eal_interrupts.c
@@ -44,6 +44,7 @@ 
 #include <sys/epoll.h>
 #include <sys/signalfd.h>
 #include <sys/ioctl.h>
+#include <assert.h>
 
 #include <rte_common.h>
 #include <rte_interrupts.h>
@@ -70,6 +71,8 @@ 
 
 #define EAL_INTR_EPOLL_WAIT_FOREVER (-1)
 
+static RTE_DEFINE_PER_LCORE(int, _epfd) = -1; /**< epoll fd per thread */
+
 /**
  * union for pipe fds.
  */
@@ -127,6 +130,9 @@  static pthread_t intr_thread;
 #ifdef VFIO_PRESENT
 
 #define IRQ_SET_BUF_LEN  (sizeof(struct vfio_irq_set) + sizeof(int))
+/* irq set buffer length for queue interrupts and LSC interrupt */
+#define MSIX_IRQ_SET_BUF_LEN (sizeof(struct vfio_irq_set) + \
+			      sizeof(int) * (VFIO_MAX_RXTX_INTR_ID + 1))
 
 /* enable legacy (INTx) interrupts */
 static int
@@ -218,10 +224,10 @@  vfio_disable_intx(struct rte_intr_handle *intr_handle) {
 	return 0;
 }
 
-/* enable MSI-X interrupts */
+/* enable MSI interrupts */
 static int
 vfio_enable_msi(struct rte_intr_handle *intr_handle) {
-	int len, ret;
+	int len, ret, max_intr;
 	char irq_set_buf[IRQ_SET_BUF_LEN];
 	struct vfio_irq_set *irq_set;
 	int *fd_ptr;
@@ -230,12 +236,19 @@  vfio_enable_msi(struct rte_intr_handle *intr_handle) {
 
 	irq_set = (struct vfio_irq_set *) irq_set_buf;
 	irq_set->argsz = len;
-	irq_set->count = 1;
+	if ((!intr_handle->max_intr) ||
+		(intr_handle->max_intr > VFIO_MAX_RXTX_INTR_ID))
+		max_intr = VFIO_MAX_RXTX_INTR_ID + 1;
+	else
+		max_intr = intr_handle->max_intr;
+
+	irq_set->count = max_intr;
 	irq_set->flags = VFIO_IRQ_SET_DATA_EVENTFD | VFIO_IRQ_SET_ACTION_TRIGGER;
 	irq_set->index = VFIO_PCI_MSI_IRQ_INDEX;
 	irq_set->start = 0;
 	fd_ptr = (int *) &irq_set->data;
-	*fd_ptr = intr_handle->fd;
+	memcpy(fd_ptr, intr_handle->efds, sizeof(intr_handle->efds));
+	fd_ptr[max_intr - 1] = intr_handle->fd;
 
 	ret = ioctl(intr_handle->vfio_dev_fd, VFIO_DEVICE_SET_IRQS, irq_set);
 
@@ -244,27 +257,10 @@  vfio_enable_msi(struct rte_intr_handle *intr_handle) {
 						intr_handle->fd);
 		return -1;
 	}
-
-	/* manually trigger interrupt to enable it */
-	memset(irq_set, 0, len);
-	len = sizeof(struct vfio_irq_set);
-	irq_set->argsz = len;
-	irq_set->count = 1;
-	irq_set->flags = VFIO_IRQ_SET_DATA_NONE | VFIO_IRQ_SET_ACTION_TRIGGER;
-	irq_set->index = VFIO_PCI_MSI_IRQ_INDEX;
-	irq_set->start = 0;
-
-	ret = ioctl(intr_handle->vfio_dev_fd, VFIO_DEVICE_SET_IRQS, irq_set);
-
-	if (ret) {
-		RTE_LOG(ERR, EAL, "Error triggering MSI interrupts for fd %d\n",
-						intr_handle->fd);
-		return -1;
-	}
 	return 0;
 }
 
-/* disable MSI-X interrupts */
+/* disable MSI interrupts */
 static int
 vfio_disable_msi(struct rte_intr_handle *intr_handle) {
 	struct vfio_irq_set *irq_set;
@@ -292,8 +288,8 @@  vfio_disable_msi(struct rte_intr_handle *intr_handle) {
 /* enable MSI-X interrupts */
 static int
 vfio_enable_msix(struct rte_intr_handle *intr_handle) {
-	int len, ret;
-	char irq_set_buf[IRQ_SET_BUF_LEN];
+	int len, ret, max_intr;
+	char irq_set_buf[MSIX_IRQ_SET_BUF_LEN];
 	struct vfio_irq_set *irq_set;
 	int *fd_ptr;
 
@@ -301,12 +297,19 @@  vfio_enable_msix(struct rte_intr_handle *intr_handle) {
 
 	irq_set = (struct vfio_irq_set *) irq_set_buf;
 	irq_set->argsz = len;
-	irq_set->count = 1;
+	if ((!intr_handle->max_intr) ||
+		(intr_handle->max_intr > VFIO_MAX_RXTX_INTR_ID))
+		max_intr = VFIO_MAX_RXTX_INTR_ID + 1;
+	else
+		max_intr = intr_handle->max_intr;
+
+	irq_set->count = max_intr;
 	irq_set->flags = VFIO_IRQ_SET_DATA_EVENTFD | VFIO_IRQ_SET_ACTION_TRIGGER;
 	irq_set->index = VFIO_PCI_MSIX_IRQ_INDEX;
 	irq_set->start = 0;
 	fd_ptr = (int *) &irq_set->data;
-	*fd_ptr = intr_handle->fd;
+	memcpy(fd_ptr, intr_handle->efds, sizeof(intr_handle->efds));
+	fd_ptr[max_intr - 1] = intr_handle->fd;
 
 	ret = ioctl(intr_handle->vfio_dev_fd, VFIO_DEVICE_SET_IRQS, irq_set);
 
@@ -316,22 +319,6 @@  vfio_enable_msix(struct rte_intr_handle *intr_handle) {
 		return -1;
 	}
 
-	/* manually trigger interrupt to enable it */
-	memset(irq_set, 0, len);
-	len = sizeof(struct vfio_irq_set);
-	irq_set->argsz = len;
-	irq_set->count = 1;
-	irq_set->flags = VFIO_IRQ_SET_DATA_NONE | VFIO_IRQ_SET_ACTION_TRIGGER;
-	irq_set->index = VFIO_PCI_MSIX_IRQ_INDEX;
-	irq_set->start = 0;
-
-	ret = ioctl(intr_handle->vfio_dev_fd, VFIO_DEVICE_SET_IRQS, irq_set);
-
-	if (ret) {
-		RTE_LOG(ERR, EAL, "Error triggering MSI-X interrupts for fd %d\n",
-						intr_handle->fd);
-		return -1;
-	}
 	return 0;
 }
 
@@ -339,7 +326,7 @@  vfio_enable_msix(struct rte_intr_handle *intr_handle) {
 static int
 vfio_disable_msix(struct rte_intr_handle *intr_handle) {
 	struct vfio_irq_set *irq_set;
-	char irq_set_buf[IRQ_SET_BUF_LEN];
+	char irq_set_buf[MSIX_IRQ_SET_BUF_LEN];
 	int len, ret;
 
 	len = sizeof(struct vfio_irq_set);
@@ -860,3 +847,154 @@  rte_eal_intr_init(void)
 	return -ret;
 }
 
+static void
+eal_intr_process_rxtx_interrupts(struct rte_intr_handle *intr_handle,
+				 struct epoll_event *events,
+				 uint32_t *vec, int nfds)
+{
+	int i, bytes_read;
+	union rte_intr_read_buffer buf;
+	int fd;
+
+	for (i = 0; i < nfds; i++) {
+		/* set the length to be read for different handle type */
+		switch (intr_handle->type) {
+		case RTE_INTR_HANDLE_UIO:
+			bytes_read = sizeof(buf.uio_intr_count);
+			break;
+		case RTE_INTR_HANDLE_ALARM:
+			bytes_read = sizeof(buf.timerfd_num);
+			break;
+#ifdef VFIO_PRESENT
+		case RTE_INTR_HANDLE_VFIO_MSIX:
+		case RTE_INTR_HANDLE_VFIO_MSI:
+		case RTE_INTR_HANDLE_VFIO_LEGACY:
+			bytes_read = sizeof(buf.vfio_intr_count);
+			break;
+#endif
+		default:
+			bytes_read = 1;
+			break;
+		}
+
+		/**
+		* read out to clear the ready-to-be-read flag
+		* for epoll_wait.
+		*/
+		vec[i] = events[i].data.u32;
+		assert(vec[i] < VFIO_MAX_RXTX_INTR_ID);
+
+		fd = intr_handle->efds[vec[i]];
+		bytes_read = read(fd, &buf, bytes_read);
+		if (bytes_read < 0)
+			RTE_LOG(ERR, EAL, "Error reading from file "
+				"descriptor %d: %s\n", fd, strerror(errno));
+		else if (bytes_read == 0)
+			RTE_LOG(ERR, EAL, "Read nothing from file "
+				"descriptor %d\n", fd);
+	}
+}
+
+static int init_tls_epfd(void)
+{
+	int pfd = epoll_create(1);
+	if (pfd < 0) {
+		RTE_LOG(ERR, EAL,
+			"Cannot create epoll instance\n");
+		return -1;
+	}
+	return pfd;
+}
+
+int
+rte_intr_rx_wait(struct rte_intr_handle *intr_handle, int epfd,
+		 uint32_t *vec, uint16_t num)
+{
+#define MAX_EVENTS      8
+	struct epoll_event events[MAX_EVENTS];
+	int ret, nfds = 0;
+
+	if (!intr_handle || !vec) {
+		RTE_LOG(ERR, EAL, "invalid input parameter\n");
+		return -1;
+	}
+
+	if (intr_handle->type != RTE_INTR_HANDLE_VFIO_MSIX) {
+		RTE_LOG(ERR, EAL, "intr type should be VFIO_MSIX\n");
+		return -1;
+	}
+
+	if (epfd == RTE_EPOLL_FD_ANY) {
+		/* using per thread epoll fd */
+		if (unlikely(RTE_PER_LCORE(_epfd) == -1))
+			RTE_PER_LCORE(_epfd) = init_tls_epfd();
+		epfd = RTE_PER_LCORE(_epfd);
+	}
+
+	do {
+		ret = epoll_wait(epfd, events,
+				 RTE_MIN(num, MAX_EVENTS),
+				 EAL_INTR_EPOLL_WAIT_FOREVER);
+		if (unlikely(ret < 0)) {
+			/* epoll_wait fail */
+			RTE_LOG(ERR, EAL, "epoll_wait returns with fail\n");
+			return -1;
+		} else if (ret > 0) {
+			/* epoll_wait has at least one fd ready to read */
+			eal_intr_process_rxtx_interrupts(intr_handle, events,
+							 vec, ret);
+			num -= ret;
+			vec += ret;
+			nfds += ret;
+		} else if (nfds > 0)
+			break;
+	} while (num > 0);
+
+	return nfds;
+}
+
+int
+rte_intr_rx_set(struct rte_intr_handle *intr_handle, int epfd,
+		int op, uint32_t vec)
+{
+	struct epoll_event ev;
+
+	if (!intr_handle || vec >= VFIO_MAX_RXTX_INTR_ID) {
+		RTE_LOG(ERR, EAL, "invalid input parameter\n");
+		return -1;
+	}
+
+	if (intr_handle->type != RTE_INTR_HANDLE_VFIO_MSIX) {
+		RTE_LOG(ERR, EAL, "intr type should be VFIO_MSIX\n");
+		return -1;
+	}
+
+	switch (op) {
+	case RTE_INTR_EVENT_ADD:
+		op = EPOLL_CTL_ADD;
+		break;
+	case RTE_INTR_EVENT_DEL:
+		op = EPOLL_CTL_DEL;
+		break;
+	default:
+		RTE_LOG(ERR, EAL, "event op type mismatch\n");
+		return -1;
+	}
+
+	if (epfd == RTE_EPOLL_FD_ANY) {
+		/* using per thread epoll fd */
+		if (RTE_PER_LCORE(_epfd) == -1)
+			RTE_PER_LCORE(_epfd) = init_tls_epfd();
+		epfd = RTE_PER_LCORE(_epfd);
+	}
+
+	ev.data.u32 = vec;
+	ev.events = EPOLLIN | EPOLLPRI;
+	if (epoll_ctl(epfd, op, intr_handle->efds[vec], &ev) < 0) {
+		RTE_LOG(ERR, EAL, "Error op %d fd %d epoll_ctl, %s\n",
+			op, intr_handle->efds[vec], strerror(errno));
+		return -1;
+	}
+
+	return 0;
+}
diff --git a/lib/librte_eal/linuxapp/eal/eal_pci_vfio.c b/lib/librte_eal/linuxapp/eal/eal_pci_vfio.c
index ee9660f..d90d23c 100644
--- a/lib/librte_eal/linuxapp/eal/eal_pci_vfio.c
+++ b/lib/librte_eal/linuxapp/eal/eal_pci_vfio.c
@@ -38,6 +38,7 @@ 
 #include <sys/socket.h>
 #include <sys/ioctl.h>
 #include <sys/mman.h>
+#include <sys/epoll.h>
 
 #include <rte_log.h>
 #include <rte_pci.h>
@@ -274,16 +275,18 @@  pci_vfio_setup_interrupts(struct rte_pci_device *dev, int vfio_dev_fd)
 		ret = ioctl(vfio_dev_fd, VFIO_DEVICE_GET_IRQ_INFO, &irq);
 		if (ret < 0) {
 			RTE_LOG(ERR, EAL, "  cannot get IRQ info, "
-					"error %i (%s)\n", errno, strerror(errno));
+				"error %i (%s)\n", errno, strerror(errno));
 			return -1;
 		}
 
 		/* if this vector cannot be used with eventfd, fail if we explicitly
 		 * specified interrupt type, otherwise continue */
 		if ((irq.flags & VFIO_IRQ_INFO_EVENTFD) == 0) {
-			if (internal_config.vfio_intr_mode != RTE_INTR_MODE_NONE) {
+			if (internal_config.vfio_intr_mode !=
+			    RTE_INTR_MODE_NONE) {
 				RTE_LOG(ERR, EAL,
-						"  interrupt vector does not support eventfd!\n");
+					"  interrupt vector "
+					"does not support eventfd!\n");
 				return -1;
 			} else
 				continue;
@@ -293,17 +296,27 @@  pci_vfio_setup_interrupts(struct rte_pci_device *dev, int vfio_dev_fd)
 		fd = eventfd(0, 0);
 		if (fd < 0) {
 			RTE_LOG(ERR, EAL, "  cannot set up eventfd, "
-					"error %i (%s)\n", errno, strerror(errno));
+				"error %i (%s)\n", errno, strerror(errno));
 			return -1;
 		}
 
 		dev->intr_handle.fd = fd;
 		dev->intr_handle.vfio_dev_fd = vfio_dev_fd;
-
 		switch (i) {
 		case VFIO_PCI_MSIX_IRQ_INDEX:
 			internal_config.vfio_intr_mode = RTE_INTR_MODE_MSIX;
 			dev->intr_handle.type = RTE_INTR_HANDLE_VFIO_MSIX;
+			for (i = 0; i < VFIO_MAX_RXTX_INTR_ID; i++) {
+				fd = eventfd(0, 0);
+				if (fd < 0) {
+					RTE_LOG(ERR, EAL,
+						"cannot setup eventfd,"
+						"error %i (%s)\n",
+						errno, strerror(errno));
+					return -1;
+				}
+				dev->intr_handle.efds[i] = fd;
+			}
 			break;
 		case VFIO_PCI_MSI_IRQ_INDEX:
 			internal_config.vfio_intr_mode = RTE_INTR_MODE_MSI;
diff --git a/lib/librte_eal/linuxapp/eal/rte_eal_version.map b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
index 5f1857d..892a452 100644
--- a/lib/librte_eal/linuxapp/eal/rte_eal_version.map
+++ b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
@@ -64,6 +64,8 @@  DPDK_2.0 {
 	rte_intr_callback_unregister;
 	rte_intr_disable;
 	rte_intr_enable;
+	rte_intr_rx_set;
+	rte_intr_rx_wait;
 	rte_log;
 	rte_log_add_in_history;
 	rte_log_cur_msg_loglevel;