From patchwork Thu Feb 18 18:30:11 2021 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Patchwork-Submitter: =?utf-8?q?Mattias_R=C3=B6nnblom?= X-Patchwork-Id: 87991 X-Patchwork-Delegate: jerinj@marvell.com Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 0DCDFA054A; Thu, 18 Feb 2021 19:30:20 +0100 (CET) Received: from [217.70.189.124] (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 8C36540040; Thu, 18 Feb 2021 19:30:19 +0100 (CET) Received: from mail.lysator.liu.se (mail.lysator.liu.se [130.236.254.3]) by mails.dpdk.org (Postfix) with ESMTP id 1E1574003D for ; Thu, 18 Feb 2021 19:30:18 +0100 (CET) Received: from mail.lysator.liu.se (localhost [127.0.0.1]) by mail.lysator.liu.se (Postfix) with ESMTP id AD1ED40017 for ; Thu, 18 Feb 2021 19:30:17 +0100 (CET) Received: by mail.lysator.liu.se (Postfix, from userid 1004) id 9A0FE4000B; Thu, 18 Feb 2021 19:30:17 +0100 (CET) X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on bernadotte.lysator.liu.se X-Spam-Level: X-Spam-Status: No, score=-1.0 required=5.0 tests=ALL_TRUSTED autolearn=disabled version=3.4.2 X-Spam-Score: -1.0 Received: from isengard.friendlyfire.se (h-215-114.A163.priv.bahnhof.se [62.63.215.114]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by mail.lysator.liu.se (Postfix) with ESMTPSA id 69EE24000A; Thu, 18 Feb 2021 19:30:14 +0100 (CET) From: =?utf-8?q?Mattias_R=C3=B6nnblom?= To: jerinj@marvell.com Cc: dev@dpdk.org, bruce.richardson@intel.com, =?utf-8?q?Mattias_R=C3=B6nnblo?= =?utf-8?q?m?= Date: Thu, 18 Feb 2021 19:30:11 +0100 Message-Id: <20210218183011.254447-1-mattias.ronnblom@ericsson.com> X-Mailer: git-send-email 2.25.1 MIME-Version: 1.0 X-Virus-Scanned: ClamAV using ClamSMTP Subject: [dpdk-dev] [RFC] eventdev: introduce event dispatcher X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" The purpose of the event dispatcher is primarily to decouple different parts of an application (e.g., processing pipeline stages), which share the same underlying event device. The event dispatcher replaces the conditional logic (often, a switch statement) that typically follows an event device dequeue operation, where events are dispatched to different parts of the application based on the destination queue id. The concept is similar to a UNIX file descriptor event loop library. Instead of tying callback functions to fds as for example libevent does, the event dispatcher binds callbacks to queue ids. An event dispatcher is configured to dequeue events from a specific event device, and ties into the service core framework, to do its (and the application's) work. The event dispatcher provides a convenient way for an eventdev-based application to use service cores for application-level processing, and thus for sharing those cores with other DPDK services. Signed-off-by: Mattias Rönnblom --- lib/librte_eventdev/Makefile | 2 + lib/librte_eventdev/meson.build | 6 +- lib/librte_eventdev/rte_event_dispatcher.c | 420 +++++++++++++++++++ lib/librte_eventdev/rte_event_dispatcher.h | 251 +++++++++++ lib/librte_eventdev/rte_eventdev_version.map | 10 + 5 files changed, 687 insertions(+), 2 deletions(-) create mode 100644 lib/librte_eventdev/rte_event_dispatcher.c create mode 100644 lib/librte_eventdev/rte_event_dispatcher.h diff --git a/lib/librte_eventdev/Makefile b/lib/librte_eventdev/Makefile index 0715256bb4..614d53af1b 100644 --- a/lib/librte_eventdev/Makefile +++ b/lib/librte_eventdev/Makefile @@ -26,6 +26,7 @@ SRCS-y += rte_event_eth_rx_adapter.c SRCS-y += rte_event_timer_adapter.c SRCS-y += rte_event_crypto_adapter.c SRCS-y += rte_event_eth_tx_adapter.c +SRCS-y += rte_event_dispatcher.c # export include files SYMLINK-y-include += rte_eventdev.h @@ -40,6 +41,7 @@ SYMLINK-y-include += rte_event_timer_adapter.h SYMLINK-y-include += rte_event_timer_adapter_pmd.h SYMLINK-y-include += rte_event_crypto_adapter.h SYMLINK-y-include += rte_event_eth_tx_adapter.h +SYMLINK-y-include += rte_event_dispatcher.h # versioning export map EXPORT_MAP := rte_eventdev_version.map diff --git a/lib/librte_eventdev/meson.build b/lib/librte_eventdev/meson.build index d1f25ee5ca..2ca81983b5 100644 --- a/lib/librte_eventdev/meson.build +++ b/lib/librte_eventdev/meson.build @@ -13,7 +13,8 @@ sources = files('rte_eventdev.c', 'rte_event_eth_rx_adapter.c', 'rte_event_timer_adapter.c', 'rte_event_crypto_adapter.c', - 'rte_event_eth_tx_adapter.c') + 'rte_event_eth_tx_adapter.c', + 'rte_event_dispatcher.c') headers = files('rte_eventdev.h', 'rte_eventdev_pmd.h', 'rte_eventdev_pmd_pci.h', @@ -25,5 +26,6 @@ headers = files('rte_eventdev.h', 'rte_event_timer_adapter.h', 'rte_event_timer_adapter_pmd.h', 'rte_event_crypto_adapter.h', - 'rte_event_eth_tx_adapter.h') + 'rte_event_eth_tx_adapter.h', + 'rte_event_dispatcher.h') deps += ['ring', 'ethdev', 'hash', 'mempool', 'mbuf', 'timer', 'cryptodev'] diff --git a/lib/librte_eventdev/rte_event_dispatcher.c b/lib/librte_eventdev/rte_event_dispatcher.c new file mode 100644 index 0000000000..1c7e55a752 --- /dev/null +++ b/lib/librte_eventdev/rte_event_dispatcher.c @@ -0,0 +1,420 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2021 Ericsson AB + */ + +#include +#include + +#include +#include +#include + +#include + +#define RED_MAX_PORTS_PER_LCORE (4) + +struct rte_event_dispatcher_lcore_port { + uint8_t port_id; + uint16_t batch_size; + uint64_t timeout; +}; + +struct rte_event_dispatcher_lcore { + uint8_t num_ports; + struct rte_event_dispatcher_lcore_port ports[RED_MAX_PORTS_PER_LCORE]; +}; + +struct rte_event_dispatcher_cb { + rte_event_dispatcher_cb_t cb_fun; + void *cb_data; +}; + +struct rte_event_dispatcher { + uint8_t id; + uint8_t event_dev_id; + int socket_id; + uint32_t service_id; + struct rte_event_dispatcher_lcore lcores[RTE_MAX_LCORE]; + struct rte_event_dispatcher_cb queue_cbs[UINT8_MAX]; + struct rte_event_dispatcher_cb fallback; +}; + +static struct rte_event_dispatcher *dispatchers[UINT8_MAX]; + +static bool +red_has_dispatcher(uint8_t id) +{ + return dispatchers[id] != NULL; +} + +static struct rte_event_dispatcher * +red_get_dispatcher(uint8_t id) +{ + return dispatchers[id]; +} + +static void +red_set_dispatcher(uint8_t id, struct rte_event_dispatcher *dispatcher) +{ + dispatchers[id] = dispatcher; +} + +#define RED_VALID_ID_OR_RET_EINVAL(id) \ + do { \ + if (unlikely(!red_has_dispatcher(id))) { \ + RTE_EDEV_LOG_ERR("Invalid dispatcher id %d\n", id); \ + return -EINVAL; \ + } \ + } while (0) + +static struct rte_event_dispatcher_cb * +red_lookup_cb(struct rte_event_dispatcher *dispatcher, uint8_t queue_id) +{ + struct rte_event_dispatcher_cb *cb = &dispatcher->queue_cbs[queue_id]; + + if (unlikely(cb->cb_fun == NULL)) + cb = &dispatcher->fallback; + + return cb; +} + +static void +red_dispatch_events(struct rte_event_dispatcher *dispatcher, + struct rte_event *events, uint16_t num_events) +{ + uint16_t cb_start; + uint16_t cb_len; + + for (cb_start = 0; cb_start < num_events; cb_start += cb_len) { + uint16_t cb_end = cb_start; + uint8_t queue_id = events[cb_start].queue_id; + struct rte_event_dispatcher_cb *cb; + + while (++cb_end < num_events && + events[cb_end].queue_id == queue_id) + ; + + cb_len = cb_end - cb_start; + + cb = red_lookup_cb(dispatcher, queue_id); + + if (unlikely(cb->cb_fun == NULL)) { + RTE_EDEV_LOG_ERR("Attempted to dispatch %d events " + "for queue id %d, but no queue or " + "fallback cb were configured\n", + cb_len, queue_id); + continue; + } + + cb->cb_fun(&events[cb_start], cb_len, cb->cb_data); + } +} + +static void +red_port_dequeue(struct rte_event_dispatcher *dispatcher, + struct rte_event_dispatcher_lcore_port *port) +{ + uint16_t batch_size = port->batch_size; + struct rte_event events[batch_size]; + uint16_t n; + + n = rte_event_dequeue_burst(dispatcher->event_dev_id, port->port_id, + events, batch_size, port->timeout); + + red_dispatch_events(dispatcher, events, n); +} + +static int32_t +red_lcore_process(void *userdata) +{ + uint16_t i; + struct rte_event_dispatcher *dispatcher = userdata; + unsigned int lcore_id = rte_lcore_id(); + struct rte_event_dispatcher_lcore *lcore = + &dispatcher->lcores[lcore_id]; + + for (i = 0; i < lcore->num_ports; i++) { + struct rte_event_dispatcher_lcore_port *port = + &lcore->ports[i]; + + red_port_dequeue(dispatcher, port); + } + + return 0; +} + +static int +red_service_runstate_set(uint32_t service_id, int state) +{ + int rc; + + rc = rte_service_component_runstate_set(service_id, state); + + if (rc) + RTE_EDEV_LOG_ERR("Error %d occurred while setting service " + "component run state to %d\n", rc, state); + + return rc; +} + +static int +red_service_register(struct rte_event_dispatcher *dispatcher) +{ + struct rte_service_spec service = { + .callback = red_lcore_process, + .callback_userdata = dispatcher, + .capabilities = RTE_SERVICE_CAP_MT_SAFE, + .socket_id = dispatcher->socket_id + }; + int rc; + + snprintf(service.name, RTE_SERVICE_NAME_MAX - 1, "red_%d", + dispatcher->id); + + rc = rte_service_component_register(&service, &dispatcher->service_id); + + if (rc) + RTE_EDEV_LOG_ERR("Registration of event dispatcher service " + "%s failed with error code %d\n", + service.name, rc); + + rc = red_service_runstate_set(dispatcher->service_id, 1); + + if (rc) + rte_service_component_unregister(dispatcher->service_id); + + return rc; +} + +static int +red_service_unregister(struct rte_event_dispatcher *dispatcher) +{ + int rc; + + rc = red_service_runstate_set(dispatcher->service_id, 0); + + if (rc) + return rc; + + rc = rte_service_component_unregister(dispatcher->service_id); + + if (rc) + RTE_EDEV_LOG_ERR("Unregistration of event dispatcher service " + "failed with error code %d\n", rc); + + return rc; +} + +int +rte_event_dispatcher_create(uint8_t id, uint8_t event_dev_id) +{ + int socket_id; + struct rte_event_dispatcher *dispatcher; + int rc; + + if (red_has_dispatcher(id)) { + RTE_EDEV_LOG_ERR("Dispatcher with id %d already exists\n", + id); + return -EEXIST; + } + + socket_id = rte_event_dev_socket_id(event_dev_id); + + dispatcher = + rte_malloc_socket("event dispatcher", + sizeof(struct rte_event_dispatcher), + RTE_CACHE_LINE_SIZE, socket_id); + + if (dispatcher == NULL) { + RTE_EDEV_LOG_ERR("Unable to allocate memory for event " + "dispatcher\n"); + return -ENOMEM; + } + + *dispatcher = (struct rte_event_dispatcher) { + .id = id, + .event_dev_id = event_dev_id, + .socket_id = socket_id + }; + + rc = red_service_register(dispatcher); + + if (rc < 0) { + rte_free(dispatcher); + return rc; + } + + red_set_dispatcher(id, dispatcher); + + return 0; +} + +int +rte_event_dispatcher_free(uint8_t id) +{ + struct rte_event_dispatcher *dispatcher; + int rc; + + RED_VALID_ID_OR_RET_EINVAL(id); + dispatcher = red_get_dispatcher(id); + + rc = red_service_unregister(dispatcher); + + if (rc) + return rc; + + red_set_dispatcher(id, NULL); + + rte_free(dispatcher); + + return 0; +} + +int +rte_event_dispatcher_service_id_get(uint8_t id, uint32_t *service_id) +{ + struct rte_event_dispatcher *dispatcher; + + RED_VALID_ID_OR_RET_EINVAL(id); + dispatcher = red_get_dispatcher(id); + + *service_id = dispatcher->service_id; + + return 0; +} + +static int16_t +lcore_port_index(struct rte_event_dispatcher_lcore *lcore, + uint8_t event_port_id) +{ + uint16_t i; + + for (i = 0; i < lcore->num_ports; i++) { + struct rte_event_dispatcher_lcore_port *port = + &lcore->ports[i]; + if (port->port_id == event_port_id) + return i; + } + + return -1; +} + +int +rte_event_dispatcher_bind_port_to_lcore(uint8_t id, uint8_t event_port_id, + uint16_t batch_size, uint64_t timeout, + unsigned int lcore_id) +{ + struct rte_event_dispatcher *dispatcher; + struct rte_event_dispatcher_lcore *lcore; + struct rte_event_dispatcher_lcore_port *port; + + RED_VALID_ID_OR_RET_EINVAL(id); + dispatcher = red_get_dispatcher(id); + + lcore = &dispatcher->lcores[lcore_id]; + + if (lcore->num_ports == RED_MAX_PORTS_PER_LCORE) + return -ENOMEM; + + if (lcore_port_index(lcore, event_port_id) >= 0) + return -EEXIST; + + port = &lcore->ports[lcore->num_ports]; + + *port = (struct rte_event_dispatcher_lcore_port) { + .port_id = event_port_id, + .batch_size = batch_size, + .timeout = timeout + }; + + lcore->num_ports++; + + return 0; +} + +int +rte_event_dispatcher_unbind_port_from_lcore(uint8_t id, uint8_t event_port_id, + unsigned int lcore_id) +{ + struct rte_event_dispatcher *dispatcher; + struct rte_event_dispatcher_lcore *lcore; + int16_t port_idx; + struct rte_event_dispatcher_lcore_port *port; + struct rte_event_dispatcher_lcore_port *last; + + RED_VALID_ID_OR_RET_EINVAL(id); + dispatcher = red_get_dispatcher(id); + + lcore = &dispatcher->lcores[lcore_id]; + + port_idx = lcore_port_index(lcore, event_port_id); + + if (port_idx < 0) + return -ENOENT; + + port = &lcore->ports[port_idx]; + last = &lcore->ports[lcore->num_ports - 1]; + + if (port != last) + *port = *last; + + lcore->num_ports--; + + return 0; +} + +static int +red_cb_manage(uint8_t id, uint8_t queue_id, bool reg, bool fallback, + rte_event_dispatcher_cb_t cb_fun, void *cb_data) +{ + struct rte_event_dispatcher *dispatcher; + struct rte_event_dispatcher_cb *cb; + + RED_VALID_ID_OR_RET_EINVAL(id); + dispatcher = red_get_dispatcher(id); + + if (fallback) + cb = &dispatcher->fallback; + else + cb = &dispatcher->queue_cbs[queue_id]; + + if (reg && cb->cb_fun != NULL) + return -EEXIST; + + if (!reg && cb->cb_fun == NULL) + return -ENOENT; + + *cb = (struct rte_event_dispatcher_cb) { + .cb_fun = cb_fun, + .cb_data = cb_data + }; + + return 0; +} + +int +rte_event_dispatcher_register(uint8_t id, uint8_t queue_id, + rte_event_dispatcher_cb_t cb_fun, void *cb_data) +{ + return red_cb_manage(id, queue_id, true, false, cb_fun, cb_data); +} + +int +rte_event_dispatcher_unregister(uint8_t id, uint8_t queue_id) +{ + return red_cb_manage(id, queue_id, false, false, NULL, NULL); +} + +int +rte_event_dispatcher_register_fallback(uint8_t id, + rte_event_dispatcher_cb_t cb_fun, + void *cb_data) +{ + return red_cb_manage(id, 0, true, true, cb_fun, cb_data); +} + +int +rte_event_dispatcher_unregister_fallback(uint8_t id) +{ + return red_cb_manage(id, 0, false, true, NULL, NULL); +} diff --git a/lib/librte_eventdev/rte_event_dispatcher.h b/lib/librte_eventdev/rte_event_dispatcher.h new file mode 100644 index 0000000000..11f57571ab --- /dev/null +++ b/lib/librte_eventdev/rte_event_dispatcher.h @@ -0,0 +1,251 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2021 Ericsson AB + */ + +#ifndef __RTE_EVENT_DISPATCHER_H__ +#define __RTE_EVENT_DISPATCHER_H__ + +/** + * @file + * + * RTE Event Dispatcher + * + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +/** + * Function prototype for dispatcher callbacks. + * + * @param events + * Pointer to an array of events. + * + * @param num + * The number of events in the @p events array. + * + * @param cb_data + * The pointer supplied by the application in + * rte_event_dispatcher_register() or + * rte_event_dispatcher_register_fallback(). + */ + +typedef void (*rte_event_dispatcher_cb_t)(struct rte_event *events, + uint16_t num, void *cb_data); + +/** + * Create an event dispatcher with the specified id. + * + * @param id + * An application-specified, unique (across all event dispatcher + * instances) identifier. + * + * @param event_dev_id + * The identifier of the event device from which this event dispatcher + * will dequeue events. + * + * @return + * - 0: Success + * - <0: Error code on failure + */ +__rte_experimental +int +rte_event_dispatcher_create(uint8_t id, uint8_t event_dev_id); + +/** + * Frees an event dispatcher with the specified id. + * + * @param id + * The event dispatcher identifier. + * + * @return + * - 0: Success + * - <0: Error code on failure + */ +__rte_experimental +int +rte_event_dispatcher_free(uint8_t id); + +/** + * Retrieve the service identifier of the event dispatcher. + * + * @param id + * The event dispatcher identifier. + * + * @param [out] service_id + * A pointer to a caller-supplied buffer where the event dispatcher's + * service id will be stored. + * + * @return + * - 0: Success + * - <0: Error code on failure. + */ +__rte_experimental +int +rte_event_dispatcher_service_id_get(uint8_t id, uint32_t *service_id); + +/** + * Binds an event device port to a specific lcore on the specified + * event dispatcher. + * + * This function configures an event dispatcher to dequeue events from + * an event device port (as specified by @p event_port_id), in case + * its service function is run on particular lcore (as specified by @p + * lcore_id). + * + * Multiple event device ports may be bound to the same lcore. A + * particular port may only be bound to one lcore. + * + * If the event dispatcher service is mapped (with + * rte_service_map_lcore_set()) to a lcore for which no ports are + * bound, the service function will be a no-operation. + * + * @param id + * The event dispatcher identifier. + * + * @param event_port_id + * The event device port identifier. + * + * @param batch_size + * The batch size to use in rte_event_dequeue_burst(), for the + * configured event device port and lcore. + * + * @param timeout + * The timeout parameter to use in rte_event_dequeue_burst(), for the + * configured event device port and lcore. + * + * @return + * - 0: Success + * - <0: Error code on failure. + */ +__rte_experimental +int +rte_event_dispatcher_bind_port_to_lcore(uint8_t id, uint8_t event_port_id, + uint16_t batch_size, uint64_t timeout, + unsigned int lcore_id); + +/** + * Unbind an event device port from a specific lcore. + * + * @param id + * The event dispatcher identifier. + * + * @param event_port_id + * The event device port identifier. + * + * @return + * - 0: Success + * - <0: Error code on failure. + */ +__rte_experimental +int +rte_event_dispatcher_unbind_port_from_lcore(uint8_t id, uint8_t event_port_id, + unsigned int lcore_id); + +/** + * Register a callback function for the specified queue identifier. + * + * At most one callback may be registered per queue id. + * + * The same callback function may be registered for multiple queue ids. + * + * For each callback invocation, events belonging to a single queue id + * will dispatched. + * + * @param id + * The event dispatcher identifier. + * + * @param queue_id + * The event device queue id for which @p cb_fun should be called. + * + * @param cb_fun + * The callback function. + * + * @param cb_data + * A pointer to some application-specific opaque data (or NULL), + * which is supplied back to the application in the callback. + * + * @return + * - 0: Success + * - <0: Error code on failure. + */ +__rte_experimental +int +rte_event_dispatcher_register(uint8_t id, uint8_t queue_id, + rte_event_dispatcher_cb_t cb_fun, void *cb_data); + +/** + * Unregister a callback function for the specified queue identifier. + * + * @param id + * The event dispatcher identifier. + * + * @param queue_id + * The event device queue id for which the callback should be removed. + * + * @return + * - 0: Success + * - <0: Error code on failure. + */ +__rte_experimental +int +rte_event_dispatcher_unregister(uint8_t id, uint8_t queue_id); + +/** + * Register a fallback callback function for the specified queue + * identifier. + * + * Only events for which no queue-specific callback function will be + * dispatched to the @p cb_fun callback. + * + * At most one callback fallback function may be registered. + * + * For each callback invocation, only events belonging to a single + * queue id will be included. + * + * If the event dispatcher encounters an event with a queue id for + * which the application has not registered any specific callback, and + * there is also no fallback configured, the event will be dropped. + * + * @param id + * The event dispatcher identifier. + * + * @param cb_fun + * The fallback callback function. + * + * @param cb_data + * A pointer to some application-specific opaque data (or NULL), + * which is supplied back to the application in the callback. + * + * @return + * - 0: Success + * - <0: Error code on failure. + */ +__rte_experimental +int +rte_event_dispatcher_register_fallback(uint8_t id, + rte_event_dispatcher_cb_t cb_fun, + void *cb_data); + +/** + * Unregister the fallback callback function. + * + * @param id + * The event dispatcher identifier. + * + * @return + * - 0: Success + * - <0: Error code on failure. + */ +__rte_experimental +int +rte_event_dispatcher_unregister_fallback(uint8_t id); + +#ifdef __cplusplus +} +#endif + +#endif /* __RTE_EVENT_DISPATCHER__ */ diff --git a/lib/librte_eventdev/rte_eventdev_version.map b/lib/librte_eventdev/rte_eventdev_version.map index 91a62cd077..dcb887601b 100644 --- a/lib/librte_eventdev/rte_eventdev_version.map +++ b/lib/librte_eventdev/rte_eventdev_version.map @@ -134,4 +134,14 @@ EXPERIMENTAL { __rte_eventdev_trace_crypto_adapter_queue_pair_del; __rte_eventdev_trace_crypto_adapter_start; __rte_eventdev_trace_crypto_adapter_stop; + + rte_event_dispatcher_create; + rte_event_dispatcher_free; + rte_event_dispatcher_service_id_get; + rte_event_dispatcher_bind_port_to_lcore; + rte_event_dispatcher_unbind_port_from_lcore; + rte_event_dispatcher_register; + rte_event_dispatcher_unregister; + rte_event_dispatcher_register_fallback; + rte_event_dispatcher_unregister_fallback; };