[dpdk-dev,v13,02/14] eal/linux: add rte_epoll_wait/ctl support

Message ID 1434686442-578-3-git-send-email-cunming.liang@intel.com (mailing list archive)
State Changes Requested, archived
Headers

Commit Message

Cunming Liang June 19, 2015, 4 a.m. UTC
  The patch adds 'rte_epoll_wait' and 'rte_epoll_ctl' for async event wakeup.
It defines 'struct rte_epoll_event' as the event param.
When the event fds add to a specified epoll instance, 'eptrs' will hold the rte_epoll_event object pointer.
The 'op' uses the same enum as epoll_wait/ctl does.
The epoll event support to carry a raw user data and to register a callback which is executed during wakeup.

Signed-off-by: Cunming Liang <cunming.liang@intel.com>
---
v13 changes
 - version map cleanup for v2.1

v11 changes
 - cleanup spelling error

v9 changes
 - rework on coding style

v8 changes
 - support delete event in safety during the wakeup execution
 - add EINTR process during epoll_wait

v7 changes
 - split v6[4/8] into two patches, one for epoll event(this one)
   another for rx intr(next patch)
 - introduce rte_epoll_event definition
 - rte_epoll_wait/ctl for more generic RTE epoll API

v6 changes
 - split rte_intr_wait_rx_pkt into two function, wait and set.
 - rewrite rte_intr_rx_wait/rte_intr_rx_set to remove queue visibility on eal.
 - rte_intr_rx_wait to support multiplexing.
 - allow epfd as input to support flexible event fd combination.

 lib/librte_eal/linuxapp/eal/eal_interrupts.c       | 139 +++++++++++++++++++++
 .../linuxapp/eal/include/exec-env/rte_interrupts.h |  80 ++++++++++++
 lib/librte_eal/linuxapp/eal/rte_eal_version.map    |  10 ++
 3 files changed, 229 insertions(+)
  

Comments

Thomas Monjalon July 13, 2015, 4:46 p.m. UTC | #1
2015-06-19 12:00, Cunming Liang:
> --- a/lib/librte_eal/linuxapp/eal/include/exec-env/rte_interrupts.h
> +++ b/lib/librte_eal/linuxapp/eal/include/exec-env/rte_interrupts.h
> @@ -51,6 +51,32 @@ enum rte_intr_handle_type {
>  	RTE_INTR_HANDLE_MAX
>  };
>  
> +#define RTE_INTR_EVENT_ADD            1UL
> +#define	RTE_INTR_EVENT_DEL            2UL

Wrong alignment here.

> --- a/lib/librte_eal/linuxapp/eal/rte_eal_version.map
> +++ b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
> @@ -98,3 +98,13 @@ DPDK_2.0 {
>  
>  	local: *;
>  };
> +
> +DPDK_2.1 {
> +	global:
> +
> +	rte_epoll_ctl;
> +	rte_epoll_wait;
> +	rte_intr_tls_epfd;
> +
> +	local: *;
> +} DPDK_2.0;

local is already defined above.
  
Thomas Monjalon July 13, 2015, 4:56 p.m. UTC | #2
2015-06-19 12:00, Cunming Liang:
> +int
> +rte_epoll_wait(int epfd, struct rte_epoll_event *events,
> +	       int maxevents, int timeout)
> +{
> +	struct epoll_event evs[maxevents];
> +	int rc;
> +
> +	if (!events) {
> +		RTE_LOG(ERR, EAL, "rte_epoll_event can't be NULL\n");
> +		return -1;
> +	}
> +
> +	/* using per thread epoll fd */
> +	if (epfd == RTE_EPOLL_PER_THREAD)
> +		epfd = rte_intr_tls_epfd();
> +
> +	while (1) {
> +		rc = epoll_wait(epfd, evs, maxevents, timeout);
> +		if (likely(rc > 0)) {
> +			/* epoll_wait has at least one fd ready to read */
> +			rc = eal_epoll_process_event(evs, rc, events);
> +			break;
> +		} else if (rc < 0) {
> +			if (errno == EINTR)
> +				continue;
> +			/* epoll_wait fail */
> +			RTE_LOG(ERR, EAL, "epoll_wait returns with fail %s\n",
> +				strerror(errno));
> +			rc = -1;
> +			break;
> +		}
> +	}
> +
> +	return rc;
> +}

In general, such loop is application-level.
What is the added value of rte_epoll_wait()?
Do we need some wrappers to libc in DPDK?
  
Cunming Liang July 17, 2015, 5:27 a.m. UTC | #3
> -----Original Message-----
> From: Thomas Monjalon [mailto:thomas.monjalon@6wind.com]
> Sent: Tuesday, July 14, 2015 12:46 AM
> To: Liang, Cunming
> Cc: dev@dpdk.org; shemming@brocade.com; david.marchand@6wind.com;
> Zhou, Danny; Wang, Liang-min; Richardson, Bruce; Liu, Yong;
> nhorman@tuxdriver.com
> Subject: Re: [PATCH v13 02/14] eal/linux: add rte_epoll_wait/ctl support
> 
> 2015-06-19 12:00, Cunming Liang:
> > --- a/lib/librte_eal/linuxapp/eal/include/exec-env/rte_interrupts.h
> > +++ b/lib/librte_eal/linuxapp/eal/include/exec-env/rte_interrupts.h
> > @@ -51,6 +51,32 @@ enum rte_intr_handle_type {
> >  	RTE_INTR_HANDLE_MAX
> >  };
> >
> > +#define RTE_INTR_EVENT_ADD            1UL
> > +#define	RTE_INTR_EVENT_DEL            2UL
> 
> Wrong alignment here.
> 
> > --- a/lib/librte_eal/linuxapp/eal/rte_eal_version.map
> > +++ b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
> > @@ -98,3 +98,13 @@ DPDK_2.0 {
> >
> >  	local: *;
> >  };
> > +
> > +DPDK_2.1 {
> > +	global:
> > +
> > +	rte_epoll_ctl;
> > +	rte_epoll_wait;
> > +	rte_intr_tls_epfd;
> > +
> > +	local: *;
> > +} DPDK_2.0;
> 
> local is already defined above.
> 
Will rework it, thanks.
  
Cunming Liang July 17, 2015, 5:47 a.m. UTC | #4
> -----Original Message-----
> From: Thomas Monjalon [mailto:thomas.monjalon@6wind.com]
> Sent: Tuesday, July 14, 2015 12:56 AM
> To: Liang, Cunming
> Cc: dev@dpdk.org; shemming@brocade.com; david.marchand@6wind.com;
> Zhou, Danny; Wang, Liang-min; Richardson, Bruce; Liu, Yong;
> nhorman@tuxdriver.com
> Subject: Re: [PATCH v13 02/14] eal/linux: add rte_epoll_wait/ctl support
> 
> 2015-06-19 12:00, Cunming Liang:
> > +int
> > +rte_epoll_wait(int epfd, struct rte_epoll_event *events,
> > +	       int maxevents, int timeout)
> > +{
> > +	struct epoll_event evs[maxevents];
> > +	int rc;
> > +
> > +	if (!events) {
> > +		RTE_LOG(ERR, EAL, "rte_epoll_event can't be NULL\n");
> > +		return -1;
> > +	}
> > +
> > +	/* using per thread epoll fd */
> > +	if (epfd == RTE_EPOLL_PER_THREAD)
> > +		epfd = rte_intr_tls_epfd();
> > +
> > +	while (1) {
> > +		rc = epoll_wait(epfd, evs, maxevents, timeout);
> > +		if (likely(rc > 0)) {
> > +			/* epoll_wait has at least one fd ready to read */
> > +			rc = eal_epoll_process_event(evs, rc, events);
> > +			break;
> > +		} else if (rc < 0) {
> > +			if (errno == EINTR)
> > +				continue;
> > +			/* epoll_wait fail */
> > +			RTE_LOG(ERR, EAL, "epoll_wait returns with fail %s\n",
> > +				strerror(errno));
> > +			rc = -1;
> > +			break;
> > +		}
> > +	}
> > +
> > +	return rc;
> > +}
> 
> In general, such loop is application-level.
> What is the added value of rte_epoll_wait()?
> Do we need some wrappers to libc in DPDK?
Some motivations to do it,
1) 'epoll_event' takes either fd or a data point. However we require more to cover both rx interrupt and other user's events.
2) Some errno processing can be addressed commonly, it's helpful to focus on the real event we're interested in.
3) Usually there's one epoll instance per lcore to serve the events. Here gives a default one if it isn't assigned.
  

Patch

diff --git a/lib/librte_eal/linuxapp/eal/eal_interrupts.c b/lib/librte_eal/linuxapp/eal/eal_interrupts.c
index a6325c6..dc327a4 100644
--- a/lib/librte_eal/linuxapp/eal/eal_interrupts.c
+++ b/lib/librte_eal/linuxapp/eal/eal_interrupts.c
@@ -69,6 +69,8 @@ 
 
 #define EAL_INTR_EPOLL_WAIT_FOREVER (-1)
 
+static RTE_DEFINE_PER_LCORE(int, _epfd) = -1; /**< epoll fd per thread */
+
 /**
  * union for pipe fds.
  */
@@ -894,3 +896,140 @@  rte_eal_intr_init(void)
 
 	return -ret;
 }
+
+static int
+eal_epoll_process_event(struct epoll_event *evs, unsigned int n,
+			struct rte_epoll_event *events)
+{
+	unsigned int i, count = 0;
+	struct rte_epoll_event *rev;
+
+	for (i = 0; i < n; i++) {
+		rev = evs[i].data.ptr;
+		if (!rev || !rte_atomic32_cmpset(&rev->status, RTE_EPOLL_VALID,
+						 RTE_EPOLL_EXEC))
+			continue;
+
+		events[count].status        = RTE_EPOLL_VALID;
+		events[count].fd            = rev->fd;
+		events[count].epfd          = rev->epfd;
+		events[count].epdata.event  = rev->epdata.event;
+		events[count].epdata.data   = rev->epdata.data;
+		if (rev->epdata.cb_fun)
+			rev->epdata.cb_fun(rev->fd,
+					   rev->epdata.cb_arg);
+
+		rte_compiler_barrier();
+		rev->status = RTE_EPOLL_VALID;
+		count++;
+	}
+	return count;
+}
+
+static inline int
+eal_init_tls_epfd(void)
+{
+	int pfd = epoll_create(255);
+
+	if (pfd < 0) {
+		RTE_LOG(ERR, EAL,
+			"Cannot create epoll instance\n");
+		return -1;
+	}
+	return pfd;
+}
+
+int
+rte_intr_tls_epfd(void)
+{
+	if (RTE_PER_LCORE(_epfd) == -1)
+		RTE_PER_LCORE(_epfd) = eal_init_tls_epfd();
+
+	return RTE_PER_LCORE(_epfd);
+}
+
+int
+rte_epoll_wait(int epfd, struct rte_epoll_event *events,
+	       int maxevents, int timeout)
+{
+	struct epoll_event evs[maxevents];
+	int rc;
+
+	if (!events) {
+		RTE_LOG(ERR, EAL, "rte_epoll_event can't be NULL\n");
+		return -1;
+	}
+
+	/* using per thread epoll fd */
+	if (epfd == RTE_EPOLL_PER_THREAD)
+		epfd = rte_intr_tls_epfd();
+
+	while (1) {
+		rc = epoll_wait(epfd, evs, maxevents, timeout);
+		if (likely(rc > 0)) {
+			/* epoll_wait has at least one fd ready to read */
+			rc = eal_epoll_process_event(evs, rc, events);
+			break;
+		} else if (rc < 0) {
+			if (errno == EINTR)
+				continue;
+			/* epoll_wait fail */
+			RTE_LOG(ERR, EAL, "epoll_wait returns with fail %s\n",
+				strerror(errno));
+			rc = -1;
+			break;
+		}
+	}
+
+	return rc;
+}
+
+static inline void
+eal_epoll_data_safe_free(struct rte_epoll_event *ev)
+{
+	while (!rte_atomic32_cmpset(&ev->status, RTE_EPOLL_VALID,
+				    RTE_EPOLL_INVALID))
+		while (ev->status != RTE_EPOLL_VALID)
+			rte_pause();
+	memset(&ev->epdata, 0, sizeof(ev->epdata));
+	ev->fd = -1;
+	ev->epfd = -1;
+}
+
+int
+rte_epoll_ctl(int epfd, int op, int fd,
+	      struct rte_epoll_event *event)
+{
+	struct epoll_event ev;
+
+	if (!event) {
+		RTE_LOG(ERR, EAL, "rte_epoll_event can't be NULL\n");
+		return -1;
+	}
+
+	/* using per thread epoll fd */
+	if (epfd == RTE_EPOLL_PER_THREAD)
+		epfd = rte_intr_tls_epfd();
+
+	if (op == EPOLL_CTL_ADD) {
+		event->status = RTE_EPOLL_VALID;
+		event->fd = fd;  /* ignore fd in event */
+		event->epfd = epfd;
+		ev.data.ptr = (void *)event;
+	}
+
+	ev.events = event->epdata.event;
+	if (epoll_ctl(epfd, op, fd, &ev) < 0) {
+		RTE_LOG(ERR, EAL, "Error op %d fd %d epoll_ctl, %s\n",
+			op, fd, strerror(errno));
+		if (op == EPOLL_CTL_ADD)
+			/* rollback status when CTL_ADD fail */
+			event->status = RTE_EPOLL_INVALID;
+		return -1;
+	}
+
+	if (op == EPOLL_CTL_DEL && event->status != RTE_EPOLL_INVALID)
+		eal_epoll_data_safe_free(event);
+
+	return 0;
+}
diff --git a/lib/librte_eal/linuxapp/eal/include/exec-env/rte_interrupts.h b/lib/librte_eal/linuxapp/eal/include/exec-env/rte_interrupts.h
index b6f2c41..4db95bb 100644
--- a/lib/librte_eal/linuxapp/eal/include/exec-env/rte_interrupts.h
+++ b/lib/librte_eal/linuxapp/eal/include/exec-env/rte_interrupts.h
@@ -51,6 +51,32 @@  enum rte_intr_handle_type {
 	RTE_INTR_HANDLE_MAX
 };
 
+#define RTE_INTR_EVENT_ADD            1UL
+#define	RTE_INTR_EVENT_DEL            2UL
+
+typedef void (*rte_intr_event_cb_t)(int fd, void *arg);
+
+struct rte_epoll_data {
+	uint32_t event;               /**< event type */
+	void *data;                   /**< User data */
+	rte_intr_event_cb_t cb_fun;   /**< IN: callback fun */
+	void *cb_arg;	              /**< IN: callback arg */
+};
+
+enum {
+	RTE_EPOLL_INVALID = 0,
+	RTE_EPOLL_VALID,
+	RTE_EPOLL_EXEC,
+};
+
+/** interrupt epoll event obj, taken by epoll_event.ptr */
+struct rte_epoll_event {
+	volatile uint32_t status;  /**< OUT: event status */
+	int fd;                    /**< OUT: event fd */
+	int epfd;       /**< OUT: epoll instance the ev associated with */
+	struct rte_epoll_data epdata;
+};
+
 /** Handle for interrupts. */
 struct rte_intr_handle {
 	union {
@@ -63,7 +89,61 @@  struct rte_intr_handle {
 	uint32_t max_intr;               /**< max interrupt requested */
 	uint32_t nb_efd;                 /**< number of available efds */
 	int efds[RTE_MAX_RXTX_INTR_VEC_ID];  /**< intr vectors/efds mapping */
+	struct rte_epoll_event elist[RTE_MAX_RXTX_INTR_VEC_ID];
+					 /**< intr vector epoll event */
 	int *intr_vec;                   /**< intr vector number array */
 };
 
+#define RTE_EPOLL_PER_THREAD        -1  /**< to hint using per thread epfd */
+
+/**
+ * It waits for events on the epoll instance.
+ *
+ * @param epfd
+ *   Epoll instance fd on which the caller wait for events.
+ * @param events
+ *   Memory area contains the events that will be available for the caller.
+ * @param maxevents
+ *   Up to maxevents are returned, must greater than zero.
+ * @param timeout
+ *   Specifying a timeout of -1 causes a block indefinitely.
+ *   Specifying a timeout equal to zero cause to return immediately.
+ * @return
+ *   - On success, returns the number of available event.
+ *   - On failure, a negative value.
+ */
+int
+rte_epoll_wait(int epfd, struct rte_epoll_event *events,
+	       int maxevents, int timeout);
+
+/**
+ * It performs control operations on epoll instance referred by the epfd.
+ * It requests that the operation op be performed for the target fd.
+ *
+ * @param epfd
+ *   Epoll instance fd on which the caller perform control operations.
+ * @param op
+ *   The operation be performed for the target fd.
+ * @param fd
+ *   The target fd on which the control ops perform.
+ * @param event
+ *   Describes the object linked to the fd.
+ *   Note: The caller must take care the object deletion after CTL_DEL.
+ * @return
+ *   - On success, zero.
+ *   - On failure, a negative value.
+ */
+int
+rte_epoll_ctl(int epfd, int op, int fd,
+	      struct rte_epoll_event *event);
+
+/**
+ * The function returns the per thread epoll instance.
+ *
+ * @return
+ *   epfd the epoll instance referred to.
+ */
+int
+rte_intr_tls_epfd(void);
+
 #endif /* _RTE_LINUXAPP_INTERRUPTS_H_ */
diff --git a/lib/librte_eal/linuxapp/eal/rte_eal_version.map b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
index 7e850a9..f388af6 100644
--- a/lib/librte_eal/linuxapp/eal/rte_eal_version.map
+++ b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
@@ -98,3 +98,13 @@  DPDK_2.0 {
 
 	local: *;
 };
+
+DPDK_2.1 {
+	global:
+
+	rte_epoll_ctl;
+	rte_epoll_wait;
+	rte_intr_tls_epfd;
+
+	local: *;
+} DPDK_2.0;