[dpdk-dev,v8,02/11] eal/linux: add rte_epoll_wait/ctl support
Commit Message
The patch adds 'rte_epoll_wait' and 'rte_epoll_ctl' for async event wakeup.
It defines 'struct rte_epoll_event' as the event param.
The 'op' uses the same enum as epoll_wait/ctl does.
The epoll event support to carry a raw user data and to register a callback which is exectuted during wakeup.
Signed-off-by: Cunming Liang <cunming.liang@intel.com>
---
v8 changes
- support delete event in safety during the wakeup execution
- add EINTR process during epoll_wait
v7 changes
- split v6[4/8] into two patches, one for epoll event(this one)
another for rx intr(next patch)
- introduce rte_epoll_event definition
- rte_epoll_wait/ctl for more generic RTE epoll API
v6 changes
- split rte_intr_wait_rx_pkt into two function, wait and set.
- rewrite rte_intr_rx_wait/rte_intr_rx_set to remove queue visibility on eal.
- rte_intr_rx_wait to support multiplexing.
- allow epfd as input to support flexible event fd combination.
lib/librte_eal/linuxapp/eal/eal_interrupts.c | 137 +++++++++++++++++++++
.../linuxapp/eal/include/exec-env/rte_interrupts.h | 82 +++++++++++-
lib/librte_eal/linuxapp/eal/rte_eal_version.map | 3 +
3 files changed, 219 insertions(+), 3 deletions(-)
Comments
On Thu, 21 May 2015 16:55:54 +0800
Cunming Liang <cunming.liang@intel.com> wrote:
> +static int
> +eal_epoll_process_event(struct epoll_event *evs, int n,
> + struct rte_epoll_event *events)
> +{
> + int i;
> + int count = 0;
> + struct rte_epoll_event *rev;
> + for (i = 0; i < n; i++) {
> + rev = (struct rte_epoll_event *)evs[i].data.ptr;
> + if (!rev || !rte_atomic32_cmpset(&rev->status, RTE_EPOLL_VALID,
> + RTE_EPOLL_EXEC))
> + continue;
> +
> + events[count].status = RTE_EPOLL_VALID;
> + events[count].fd = rev->fd;
> + events[count].epfd = rev->epfd;
> + events[count].epdata.event = rev->epdata.event;
> + events[count].epdata.data = rev->epdata.data;
This code has several style issues:
1. Always put blank line after declarations
2. Use unsigned where ever it makes sense as a matter of habit.
unsigned int i, count = 0;
3. Don't add casts where not necessary, it reduces compiler type checking
and is a bad habit. In this case evs[i].data.ptr is void *
and therefore no cast is needed.
On 5/22/2015 2:17 AM, Stephen Hemminger wrote:
> On Thu, 21 May 2015 16:55:54 +0800
> Cunming Liang <cunming.liang@intel.com> wrote:
>
>> +static int
>> +eal_epoll_process_event(struct epoll_event *evs, int n,
>> + struct rte_epoll_event *events)
>> +{
>> + int i;
>> + int count = 0;
>> + struct rte_epoll_event *rev;
>> + for (i = 0; i < n; i++) {
>> + rev = (struct rte_epoll_event *)evs[i].data.ptr;
>> + if (!rev || !rte_atomic32_cmpset(&rev->status, RTE_EPOLL_VALID,
>> + RTE_EPOLL_EXEC))
>> + continue;
>> +
>> + events[count].status = RTE_EPOLL_VALID;
>> + events[count].fd = rev->fd;
>> + events[count].epfd = rev->epfd;
>> + events[count].epdata.event = rev->epdata.event;
>> + events[count].epdata.data = rev->epdata.data;
> This code has several style issues:
> 1. Always put blank line after declarations
>
> 2. Use unsigned where ever it makes sense as a matter of habit.
> unsigned int i, count = 0;
>
> 3. Don't add casts where not necessary, it reduces compiler type checking
> and is a bad habit. In this case evs[i].data.ptr is void *
> and therefore no cast is needed.
Fully agree, thanks for the comment.
@@ -69,6 +69,8 @@
#define EAL_INTR_EPOLL_WAIT_FOREVER (-1)
+static RTE_DEFINE_PER_LCORE(int, _epfd) = -1; /**< epoll fd per thread */
+
/**
* union for pipe fds.
*/
@@ -859,3 +861,138 @@ rte_eal_intr_init(void)
return -ret;
}
+static int
+eal_epoll_process_event(struct epoll_event *evs, int n,
+ struct rte_epoll_event *events)
+{
+ int i;
+ int count = 0;
+ struct rte_epoll_event *rev;
+ for (i = 0; i < n; i++) {
+ rev = (struct rte_epoll_event *)evs[i].data.ptr;
+ if (!rev || !rte_atomic32_cmpset(&rev->status, RTE_EPOLL_VALID,
+ RTE_EPOLL_EXEC))
+ continue;
+
+ events[count].status = RTE_EPOLL_VALID;
+ events[count].fd = rev->fd;
+ events[count].epfd = rev->epfd;
+ events[count].epdata.event = rev->epdata.event;
+ events[count].epdata.data = rev->epdata.data;
+ if (rev->epdata.cb_fun)
+ rev->epdata.cb_fun(rev->fd,
+ rev->epdata.cb_arg);
+
+ rte_compiler_barrier();
+ rev->status = RTE_EPOLL_VALID;
+ count++;
+ }
+ return count;
+}
+
+static inline int
+eal_init_tls_epfd(void)
+{
+ int pfd = epoll_create(255);
+ if (pfd < 0) {
+ RTE_LOG(ERR, EAL,
+ "Cannot create epoll instance\n");
+ return -1;
+ }
+ return pfd;
+}
+
+int
+rte_intr_tls_epfd(void)
+{
+ if (RTE_PER_LCORE(_epfd) == -1)
+ RTE_PER_LCORE(_epfd) = eal_init_tls_epfd();
+
+ return RTE_PER_LCORE(_epfd);
+}
+
+int
+rte_epoll_wait(int epfd, struct rte_epoll_event *events,
+ int maxevents, int timeout)
+{
+ struct epoll_event evs[maxevents];
+ int rc;
+
+ if (!events) {
+ RTE_LOG(ERR, EAL, "rte_epoll_event can't be NULL\n");
+ return -1;
+ }
+
+ /* using per thread epoll fd */
+ if (epfd == RTE_EPOLL_PER_THREAD)
+ epfd = rte_intr_tls_epfd();
+
+ while (1) {
+ rc = epoll_wait(epfd, evs, maxevents, timeout);
+ if (likely(rc > 0)) {
+ /* epoll_wait has at least one fd ready to read */
+ rc = eal_epoll_process_event(evs, rc, events);
+ break;
+ } else if (rc < 0) {
+ if (errno == EINTR)
+ continue;
+ /* epoll_wait fail */
+ RTE_LOG(ERR, EAL, "epoll_wait returns with fail %s\n",
+ strerror(errno));
+ rc = -1;
+ break;
+ }
+ }
+
+ return rc;
+}
+
+static inline void
+eal_epoll_data_safe_free(struct rte_epoll_event *ev)
+{
+ while (!rte_atomic32_cmpset(&ev->status, RTE_EPOLL_VALID,
+ RTE_EPOLL_INVALID))
+ while (ev->status != RTE_EPOLL_VALID)
+ rte_pause();
+ memset(&ev->epdata, 0, sizeof(ev->epdata));
+ ev->fd = -1;
+ ev->epfd = -1;
+}
+
+int
+rte_epoll_ctl(int epfd, int op, int fd,
+ struct rte_epoll_event *event)
+{
+ struct epoll_event ev;
+
+ if (!event) {
+ RTE_LOG(ERR, EAL, "rte_epoll_event can't be NULL\n");
+ return -1;
+ }
+
+ /* using per thread epoll fd */
+ if (epfd == RTE_EPOLL_PER_THREAD)
+ epfd = rte_intr_tls_epfd();
+
+ if (op == EPOLL_CTL_ADD) {
+ event->status = RTE_EPOLL_VALID;
+ event->fd = fd; /* ignore fd in event */
+ event->epfd = epfd;
+ ev.data.ptr = (void *)event;
+ }
+
+ ev.events = event->epdata.event;
+ if (epoll_ctl(epfd, op, fd, &ev) < 0) {
+ RTE_LOG(ERR, EAL, "Error op %d fd %d epoll_ctl, %s\n",
+ op, fd, strerror(errno));
+ if (op == EPOLL_CTL_ADD)
+ /* rollback status when CTL_ADD fail */
+ event->status = RTE_EPOLL_INVALID;
+ return -1;
+ }
+
+ if (op == EPOLL_CTL_DEL && event->status != RTE_EPOLL_INVALID)
+ eal_epoll_data_safe_free(event);
+
+ return 0;
+}
@@ -50,7 +50,31 @@ enum rte_intr_handle_type {
RTE_INTR_HANDLE_MAX
};
-struct rte_epoll_event;
+#define RTE_INTR_EVENT_ADD 1UL
+#define RTE_INTR_EVENT_DEL 2UL
+
+typedef void (*rte_intr_event_cb_t)(int fd, void *arg);
+
+struct rte_epoll_data {
+ uint32_t event; /**< event type */
+ void *data; /**< User data */
+ rte_intr_event_cb_t cb_fun; /**< IN: callback fun */
+ void *cb_arg; /**< IN: callback arg */
+};
+
+enum {
+ RTE_EPOLL_INVALID = 0,
+ RTE_EPOLL_VALID,
+ RTE_EPOLL_EXEC,
+};
+
+/** interrupt epoll event obj, taken by epoll_event.ptr */
+struct rte_epoll_event {
+ volatile uint32_t status; /**< OUT: event status */
+ int fd; /**< OUT: event fd */
+ int epfd; /**< OUT: epoll instance the ev associated with */
+ struct rte_epoll_data epdata;
+};
/** Handle for interrupts. */
struct rte_intr_handle {
@@ -64,9 +88,61 @@ struct rte_intr_handle {
uint32_t max_intr; /**< max interrupt requested */
uint32_t nb_efd; /**< number of available efds */
int efds[RTE_MAX_RXTX_INTR_VEC_ID]; /**< intr vectors/efds mapping */
- struct rte_epoll_event *elist[RTE_MAX_RXTX_INTR_VEC_ID];
- /**< intr vector epoll event ptr */
+ struct rte_epoll_event elist[RTE_MAX_RXTX_INTR_VEC_ID];
+ /**< intr vector epoll event */
int *intr_vec; /**< intr vector number array */
};
+#define RTE_EPOLL_PER_THREAD -1 /**< to hint using per thread epfd */
+
+/**
+ * It waits for events on the epoll instance.
+ *
+ * @param epfd
+ * Epoll instance fd on which the caller wait for events.
+ * @param events
+ * Memory area contains the events that will be available for the caller.
+ * @param maxevents
+ * Up to maxevents are returned, must greater than zero.
+ * @param timeout
+ * Specifying a timeout of -1 causes a block indefinitely.
+ * Specifying a timeout equal to zero cause to return immediately.
+ * @return
+ * - On success, returns the number of available event.
+ * - On failure, a negative value.
+ */
+int
+rte_epoll_wait(int epfd, struct rte_epoll_event *events,
+ int maxevents, int timeout);
+
+/**
+ * It performs control operations on epoll instance referred by the epfd.
+ * It requests that the operation op be performed for the target fd.
+ *
+ * @param epfd
+ * Epoll instance fd on which the caller perform control operations.
+ * @param op
+ * The operation be performed for the target fd.
+ * @param fd
+ * The target fd on which the control ops perform.
+ * @param event
+ * Describes the object linked to the fd.
+ * Note: The caller must take care the object deletion after CTL_DEL.
+ * @return
+ * - On success, zero.
+ * - On failure, a negative value.
+ */
+int
+rte_epoll_ctl(int epfd, int op, int fd,
+ struct rte_epoll_event *event);
+
+/**
+ * The function returns the per thread epoll instance.
+ *
+ * @return
+ * epfd the epoll instance refered to.
+ */
+int
+rte_intr_tls_epfd(void);
+
#endif /* _RTE_LINUXAPP_INTERRUPTS_H_ */
@@ -52,6 +52,8 @@ DPDK_2.0 {
rte_eal_vdev_init;
rte_eal_vdev_uninit;
rte_eal_wait_lcore;
+ rte_epoll_ctl;
+ rte_epoll_wait;
rte_exit;
rte_get_hpet_cycles;
rte_get_hpet_hz;
@@ -61,6 +63,7 @@ DPDK_2.0 {
rte_intr_callback_unregister;
rte_intr_disable;
rte_intr_enable;
+ rte_intr_tls_epfd;
rte_log;
rte_log_add_in_history;
rte_log_cur_msg_loglevel;