[RFC] eventdev: introduce event dispatcher

Message ID 20210218183011.254447-1-mattias.ronnblom@ericsson.com (mailing list archive)
State Changes Requested, archived
Delegated to: Jerin Jacob
Headers
Series [RFC] eventdev: introduce event dispatcher |

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/Intel-compilation fail apply issues

Commit Message

Mattias Rönnblom Feb. 18, 2021, 6:30 p.m. UTC
  The purpose of the event dispatcher is primarily to decouple different
parts of an application (e.g., processing pipeline stages), which
share the same underlying event device.

The event dispatcher replaces the conditional logic (often, a switch
statement) that typically follows an event device dequeue operation,
where events are dispatched to different parts of the application
based on the destination queue id.

The concept is similar to a UNIX file descriptor event loop library.
Instead of tying callback functions to fds as for example libevent
does, the event dispatcher binds callbacks to queue ids.

An event dispatcher is configured to dequeue events from a specific
event device, and ties into the service core framework, to do its (and
the application's) work.

The event dispatcher provides a convenient way for an eventdev-based
application to use service cores for application-level processing, and
thus for sharing those cores with other DPDK services.

Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
---
 lib/librte_eventdev/Makefile                 |   2 +
 lib/librte_eventdev/meson.build              |   6 +-
 lib/librte_eventdev/rte_event_dispatcher.c   | 420 +++++++++++++++++++
 lib/librte_eventdev/rte_event_dispatcher.h   | 251 +++++++++++
 lib/librte_eventdev/rte_eventdev_version.map |  10 +
 5 files changed, 687 insertions(+), 2 deletions(-)
 create mode 100644 lib/librte_eventdev/rte_event_dispatcher.c
 create mode 100644 lib/librte_eventdev/rte_event_dispatcher.h
  

Comments

Luca Boccassi Feb. 22, 2021, 3:28 p.m. UTC | #1
On Thu, 2021-02-18 at 19:30 +0100, Mattias Rönnblom wrote:
> The purpose of the event dispatcher is primarily to decouple different
> parts of an application (e.g., processing pipeline stages), which
> share the same underlying event device.
> 
> The event dispatcher replaces the conditional logic (often, a switch
> statement) that typically follows an event device dequeue operation,
> where events are dispatched to different parts of the application
> based on the destination queue id.
> 
> The concept is similar to a UNIX file descriptor event loop library.
> Instead of tying callback functions to fds as for example libevent
> does, the event dispatcher binds callbacks to queue ids.
> 
> An event dispatcher is configured to dequeue events from a specific
> event device, and ties into the service core framework, to do its (and
> the application's) work.
> 
> The event dispatcher provides a convenient way for an eventdev-based
> application to use service cores for application-level processing, and
> thus for sharing those cores with other DPDK services.
> 
> Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
> ---
>  lib/librte_eventdev/Makefile                 |   2 +
>  lib/librte_eventdev/meson.build              |   6 +-
>  lib/librte_eventdev/rte_event_dispatcher.c   | 420 +++++++++++++++++++
>  lib/librte_eventdev/rte_event_dispatcher.h   | 251 +++++++++++
>  lib/librte_eventdev/rte_eventdev_version.map |  10 +
>  5 files changed, 687 insertions(+), 2 deletions(-)
>  create mode 100644 lib/librte_eventdev/rte_event_dispatcher.c
>  create mode 100644 lib/librte_eventdev/rte_event_dispatcher.h

Hi,

Is this intended to be used by applications or by PMDs? If the former,
then IMHO the interface should really be based around (or allow using)
FDs, so that it can be polled. Applications normally have more event
sources that just DPDK.
  
Jerin Jacob Feb. 25, 2021, 12:32 p.m. UTC | #2
On Fri, Feb 19, 2021 at 12:00 AM Mattias Rönnblom
<mattias.ronnblom@ericsson.com> wrote:
>
> The purpose of the event dispatcher is primarily to decouple different
> parts of an application (e.g., processing pipeline stages), which
> share the same underlying event device.
>
> The event dispatcher replaces the conditional logic (often, a switch
> statement) that typically follows an event device dequeue operation,
> where events are dispatched to different parts of the application
> based on the destination queue id.

# If the device has all type queue[1] this RFC would restrict to
use queue ONLY as stage. A stage can be a Queue Type also.
How we can abstract this in this model?

# Also, I think, it may make sense to add this type of infrastructure as
helper functions as these are built on top of existing APIs i.e There
is no support
required from the driver to establish this model. IMO, If we need to
add such support as
one fixed set of functionality, we could have helper APIs to express a certain
usage of eventdev. Rather defining the that's only way to do this.
I think, A helper function can be used to as abstraction to define
this kind of model.

# Also, There is function pointer overhead and aggregating the events
in implementation,
That may be not always "the" optimized model of making it work vs switch case in
application.


[1]
See RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES in
https://doc.dpdk.org/guides/prog_guide/eventdev.html


>
> The concept is similar to a UNIX file descriptor event loop library.
> Instead of tying callback functions to fds as for example libevent
> does, the event dispatcher binds callbacks to queue ids.
>
> An event dispatcher is configured to dequeue events from a specific
> event device, and ties into the service core framework, to do its (and
> the application's) work.
>
> The event dispatcher provides a convenient way for an eventdev-based
> application to use service cores for application-level processing, and
> thus for sharing those cores with other DPDK services.
>
> Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
> ---
>  lib/librte_eventdev/Makefile                 |   2 +
>  lib/librte_eventdev/meson.build              |   6 +-
>  lib/librte_eventdev/rte_event_dispatcher.c   | 420 +++++++++++++++++++
>  lib/librte_eventdev/rte_event_dispatcher.h   | 251 +++++++++++
>  lib/librte_eventdev/rte_eventdev_version.map |  10 +
>  5 files changed, 687 insertions(+), 2 deletions(-)
>  create mode 100644 lib/librte_eventdev/rte_event_dispatcher.c
>  create mode 100644 lib/librte_eventdev/rte_event_dispatcher.h
>
> diff --git a/lib/librte_eventdev/Makefile b/lib/librte_eventdev/Makefile
> index 0715256bb4..614d53af1b 100644
> --- a/lib/librte_eventdev/Makefile
> +++ b/lib/librte_eventdev/Makefile
> @@ -26,6 +26,7 @@ SRCS-y += rte_event_eth_rx_adapter.c
>  SRCS-y += rte_event_timer_adapter.c
>  SRCS-y += rte_event_crypto_adapter.c
>  SRCS-y += rte_event_eth_tx_adapter.c
> +SRCS-y += rte_event_dispatcher.c
>
>  # export include files
>  SYMLINK-y-include += rte_eventdev.h
> @@ -40,6 +41,7 @@ SYMLINK-y-include += rte_event_timer_adapter.h
>  SYMLINK-y-include += rte_event_timer_adapter_pmd.h
>  SYMLINK-y-include += rte_event_crypto_adapter.h
>  SYMLINK-y-include += rte_event_eth_tx_adapter.h
> +SYMLINK-y-include += rte_event_dispatcher.h
>
>  # versioning export map
>  EXPORT_MAP := rte_eventdev_version.map
> diff --git a/lib/librte_eventdev/meson.build b/lib/librte_eventdev/meson.build
> index d1f25ee5ca..2ca81983b5 100644
> --- a/lib/librte_eventdev/meson.build
> +++ b/lib/librte_eventdev/meson.build
> @@ -13,7 +13,8 @@ sources = files('rte_eventdev.c',
>                 'rte_event_eth_rx_adapter.c',
>                 'rte_event_timer_adapter.c',
>                 'rte_event_crypto_adapter.c',
> -               'rte_event_eth_tx_adapter.c')
> +               'rte_event_eth_tx_adapter.c',
> +               'rte_event_dispatcher.c')
>  headers = files('rte_eventdev.h',
>                 'rte_eventdev_pmd.h',
>                 'rte_eventdev_pmd_pci.h',
> @@ -25,5 +26,6 @@ headers = files('rte_eventdev.h',
>                 'rte_event_timer_adapter.h',
>                 'rte_event_timer_adapter_pmd.h',
>                 'rte_event_crypto_adapter.h',
> -               'rte_event_eth_tx_adapter.h')
> +               'rte_event_eth_tx_adapter.h',
> +               'rte_event_dispatcher.h')
>  deps += ['ring', 'ethdev', 'hash', 'mempool', 'mbuf', 'timer', 'cryptodev']
> diff --git a/lib/librte_eventdev/rte_event_dispatcher.c b/lib/librte_eventdev/rte_event_dispatcher.c
> new file mode 100644
> index 0000000000..1c7e55a752
> --- /dev/null
> +++ b/lib/librte_eventdev/rte_event_dispatcher.c
> @@ -0,0 +1,420 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2021 Ericsson AB
> + */
> +
> +#include <stdbool.h>
> +#include <stdint.h>
> +
> +#include <rte_lcore.h>
> +#include <rte_service_component.h>
> +#include <rte_eventdev_pmd.h>
> +
> +#include <rte_event_dispatcher.h>
> +
> +#define RED_MAX_PORTS_PER_LCORE (4)
> +
> +struct rte_event_dispatcher_lcore_port {
> +       uint8_t port_id;
> +       uint16_t batch_size;
> +       uint64_t timeout;
> +};
> +
> +struct rte_event_dispatcher_lcore {
> +       uint8_t num_ports;
> +       struct rte_event_dispatcher_lcore_port ports[RED_MAX_PORTS_PER_LCORE];
> +};
> +
> +struct rte_event_dispatcher_cb {
> +       rte_event_dispatcher_cb_t cb_fun;
> +       void *cb_data;
> +};
> +
> +struct rte_event_dispatcher {
> +       uint8_t id;
> +       uint8_t event_dev_id;
> +       int socket_id;
> +       uint32_t service_id;
> +       struct rte_event_dispatcher_lcore lcores[RTE_MAX_LCORE];
> +       struct rte_event_dispatcher_cb queue_cbs[UINT8_MAX];
> +       struct rte_event_dispatcher_cb fallback;
> +};
> +
> +static struct rte_event_dispatcher *dispatchers[UINT8_MAX];
> +
> +static bool
> +red_has_dispatcher(uint8_t id)
> +{
> +       return dispatchers[id] != NULL;
> +}
> +
> +static struct rte_event_dispatcher *
> +red_get_dispatcher(uint8_t id)
> +{
> +       return dispatchers[id];
> +}
> +
> +static void
> +red_set_dispatcher(uint8_t id, struct rte_event_dispatcher *dispatcher)
> +{
> +       dispatchers[id] = dispatcher;
> +}
> +
> +#define RED_VALID_ID_OR_RET_EINVAL(id)                                 \
> +       do {                                                            \
> +               if (unlikely(!red_has_dispatcher(id))) {                \
> +                       RTE_EDEV_LOG_ERR("Invalid dispatcher id %d\n", id); \
> +                       return -EINVAL;                                 \
> +               }                                                       \
> +       } while (0)
> +
> +static struct rte_event_dispatcher_cb *
> +red_lookup_cb(struct rte_event_dispatcher *dispatcher, uint8_t queue_id)
> +{
> +       struct rte_event_dispatcher_cb *cb = &dispatcher->queue_cbs[queue_id];
> +
> +       if (unlikely(cb->cb_fun == NULL))
> +           cb = &dispatcher->fallback;
> +
> +       return cb;
> +}
> +
> +static void
> +red_dispatch_events(struct rte_event_dispatcher *dispatcher,
> +                   struct rte_event *events, uint16_t num_events)
> +{
> +       uint16_t cb_start;
> +       uint16_t cb_len;
> +
> +       for (cb_start = 0; cb_start < num_events; cb_start += cb_len) {
> +               uint16_t cb_end = cb_start;
> +               uint8_t queue_id = events[cb_start].queue_id;
> +               struct rte_event_dispatcher_cb *cb;
> +
> +               while (++cb_end < num_events &&
> +                      events[cb_end].queue_id == queue_id)
> +                       ;
> +
> +               cb_len = cb_end - cb_start;
> +
> +               cb = red_lookup_cb(dispatcher, queue_id);
> +
> +               if (unlikely(cb->cb_fun == NULL)) {
> +                       RTE_EDEV_LOG_ERR("Attempted to dispatch %d events "
> +                                        "for queue id %d, but no queue or "
> +                                        "fallback cb were configured\n",
> +                                        cb_len, queue_id);
> +                       continue;
> +               }
> +
> +               cb->cb_fun(&events[cb_start], cb_len, cb->cb_data);
> +       }
> +}
> +
> +static void
> +red_port_dequeue(struct rte_event_dispatcher *dispatcher,
> +                struct rte_event_dispatcher_lcore_port *port)
> +{
> +       uint16_t batch_size = port->batch_size;
> +       struct rte_event events[batch_size];
> +       uint16_t n;
> +
> +       n = rte_event_dequeue_burst(dispatcher->event_dev_id, port->port_id,
> +                                   events, batch_size, port->timeout);
> +
> +       red_dispatch_events(dispatcher, events, n);
> +}
> +
> +static int32_t
> +red_lcore_process(void *userdata)
> +{
> +       uint16_t i;
> +       struct rte_event_dispatcher *dispatcher = userdata;
> +       unsigned int lcore_id = rte_lcore_id();
> +       struct rte_event_dispatcher_lcore *lcore =
> +               &dispatcher->lcores[lcore_id];
> +
> +       for (i = 0; i < lcore->num_ports; i++) {
> +               struct rte_event_dispatcher_lcore_port *port =
> +                       &lcore->ports[i];
> +
> +               red_port_dequeue(dispatcher, port);
> +       }
> +
> +       return 0;
> +}
> +
> +static int
> +red_service_runstate_set(uint32_t service_id, int state)
> +{
> +       int rc;
> +
> +       rc = rte_service_component_runstate_set(service_id, state);
> +
> +       if (rc)
> +               RTE_EDEV_LOG_ERR("Error %d occurred while setting service "
> +                                "component run state to %d\n", rc, state);
> +
> +       return rc;
> +}
> +
> +static int
> +red_service_register(struct rte_event_dispatcher *dispatcher)
> +{
> +       struct rte_service_spec service = {
> +               .callback = red_lcore_process,
> +               .callback_userdata = dispatcher,
> +               .capabilities = RTE_SERVICE_CAP_MT_SAFE,
> +               .socket_id = dispatcher->socket_id
> +       };
> +       int rc;
> +
> +       snprintf(service.name, RTE_SERVICE_NAME_MAX - 1, "red_%d",
> +                dispatcher->id);
> +
> +       rc = rte_service_component_register(&service, &dispatcher->service_id);
> +
> +       if (rc)
> +               RTE_EDEV_LOG_ERR("Registration of event dispatcher service "
> +                                "%s failed with error code %d\n",
> +                                service.name, rc);
> +
> +       rc = red_service_runstate_set(dispatcher->service_id, 1);
> +
> +       if (rc)
> +               rte_service_component_unregister(dispatcher->service_id);
> +
> +       return rc;
> +}
> +
> +static int
> +red_service_unregister(struct rte_event_dispatcher *dispatcher)
> +{
> +       int rc;
> +
> +       rc = red_service_runstate_set(dispatcher->service_id, 0);
> +
> +       if (rc)
> +               return rc;
> +
> +       rc = rte_service_component_unregister(dispatcher->service_id);
> +
> +       if (rc)
> +               RTE_EDEV_LOG_ERR("Unregistration of event dispatcher service "
> +                                "failed with error code %d\n", rc);
> +
> +       return rc;
> +}
> +
> +int
> +rte_event_dispatcher_create(uint8_t id, uint8_t event_dev_id)
> +{
> +       int socket_id;
> +       struct rte_event_dispatcher *dispatcher;
> +       int rc;
> +
> +       if (red_has_dispatcher(id)) {
> +               RTE_EDEV_LOG_ERR("Dispatcher with id %d already exists\n",
> +                                id);
> +               return -EEXIST;
> +       }
> +
> +       socket_id = rte_event_dev_socket_id(event_dev_id);
> +
> +       dispatcher =
> +               rte_malloc_socket("event dispatcher",
> +                                 sizeof(struct rte_event_dispatcher),
> +                                 RTE_CACHE_LINE_SIZE, socket_id);
> +
> +       if (dispatcher == NULL) {
> +               RTE_EDEV_LOG_ERR("Unable to allocate memory for event "
> +                                "dispatcher\n");
> +               return -ENOMEM;
> +       }
> +
> +       *dispatcher = (struct rte_event_dispatcher) {
> +               .id = id,
> +               .event_dev_id = event_dev_id,
> +               .socket_id = socket_id
> +       };
> +
> +       rc = red_service_register(dispatcher);
> +
> +       if (rc < 0) {
> +               rte_free(dispatcher);
> +               return rc;
> +       }
> +
> +       red_set_dispatcher(id, dispatcher);
> +
> +       return 0;
> +}
> +
> +int
> +rte_event_dispatcher_free(uint8_t id)
> +{
> +       struct rte_event_dispatcher *dispatcher;
> +       int rc;
> +
> +       RED_VALID_ID_OR_RET_EINVAL(id);
> +       dispatcher = red_get_dispatcher(id);
> +
> +       rc = red_service_unregister(dispatcher);
> +
> +       if (rc)
> +               return rc;
> +
> +       red_set_dispatcher(id, NULL);
> +
> +       rte_free(dispatcher);
> +
> +       return 0;
> +}
> +
> +int
> +rte_event_dispatcher_service_id_get(uint8_t id, uint32_t *service_id)
> +{
> +       struct rte_event_dispatcher *dispatcher;
> +
> +       RED_VALID_ID_OR_RET_EINVAL(id);
> +       dispatcher = red_get_dispatcher(id);
> +
> +       *service_id = dispatcher->service_id;
> +
> +       return 0;
> +}
> +
> +static int16_t
> +lcore_port_index(struct rte_event_dispatcher_lcore *lcore,
> +                uint8_t event_port_id)
> +{
> +       uint16_t i;
> +
> +       for (i = 0; i < lcore->num_ports; i++) {
> +               struct rte_event_dispatcher_lcore_port *port =
> +                       &lcore->ports[i];
> +               if (port->port_id == event_port_id)
> +                       return i;
> +       }
> +
> +       return -1;
> +}
> +
> +int
> +rte_event_dispatcher_bind_port_to_lcore(uint8_t id, uint8_t event_port_id,
> +                                       uint16_t batch_size, uint64_t timeout,
> +                                       unsigned int lcore_id)
> +{
> +       struct rte_event_dispatcher *dispatcher;
> +       struct rte_event_dispatcher_lcore *lcore;
> +       struct rte_event_dispatcher_lcore_port *port;
> +
> +       RED_VALID_ID_OR_RET_EINVAL(id);
> +       dispatcher = red_get_dispatcher(id);
> +
> +       lcore = &dispatcher->lcores[lcore_id];
> +
> +       if (lcore->num_ports == RED_MAX_PORTS_PER_LCORE)
> +               return -ENOMEM;
> +
> +       if (lcore_port_index(lcore, event_port_id) >= 0)
> +               return -EEXIST;
> +
> +       port = &lcore->ports[lcore->num_ports];
> +
> +       *port = (struct rte_event_dispatcher_lcore_port) {
> +               .port_id = event_port_id,
> +               .batch_size = batch_size,
> +               .timeout = timeout
> +       };
> +
> +       lcore->num_ports++;
> +
> +       return 0;
> +}
> +
> +int
> +rte_event_dispatcher_unbind_port_from_lcore(uint8_t id, uint8_t event_port_id,
> +                                           unsigned int lcore_id)
> +{
> +       struct rte_event_dispatcher *dispatcher;
> +       struct rte_event_dispatcher_lcore *lcore;
> +       int16_t port_idx;
> +       struct rte_event_dispatcher_lcore_port *port;
> +       struct rte_event_dispatcher_lcore_port *last;
> +
> +       RED_VALID_ID_OR_RET_EINVAL(id);
> +       dispatcher = red_get_dispatcher(id);
> +
> +       lcore = &dispatcher->lcores[lcore_id];
> +
> +       port_idx = lcore_port_index(lcore, event_port_id);
> +
> +       if (port_idx < 0)
> +               return -ENOENT;
> +
> +       port = &lcore->ports[port_idx];
> +       last = &lcore->ports[lcore->num_ports - 1];
> +
> +       if (port != last)
> +               *port = *last;
> +
> +       lcore->num_ports--;
> +
> +       return 0;
> +}
> +
> +static int
> +red_cb_manage(uint8_t id, uint8_t queue_id, bool reg, bool fallback,
> +          rte_event_dispatcher_cb_t cb_fun, void *cb_data)
> +{
> +       struct rte_event_dispatcher *dispatcher;
> +       struct rte_event_dispatcher_cb *cb;
> +
> +       RED_VALID_ID_OR_RET_EINVAL(id);
> +       dispatcher = red_get_dispatcher(id);
> +
> +       if (fallback)
> +               cb = &dispatcher->fallback;
> +       else
> +               cb = &dispatcher->queue_cbs[queue_id];
> +
> +       if (reg && cb->cb_fun != NULL)
> +               return -EEXIST;
> +
> +       if (!reg && cb->cb_fun == NULL)
> +               return -ENOENT;
> +
> +       *cb = (struct rte_event_dispatcher_cb) {
> +               .cb_fun = cb_fun,
> +               .cb_data = cb_data
> +       };
> +
> +       return 0;
> +}
> +
> +int
> +rte_event_dispatcher_register(uint8_t id, uint8_t queue_id,
> +                             rte_event_dispatcher_cb_t cb_fun, void *cb_data)
> +{
> +       return red_cb_manage(id, queue_id, true, false, cb_fun, cb_data);
> +}
> +
> +int
> +rte_event_dispatcher_unregister(uint8_t id, uint8_t queue_id)
> +{
> +       return red_cb_manage(id, queue_id, false, false, NULL, NULL);
> +}
> +
> +int
> +rte_event_dispatcher_register_fallback(uint8_t id,
> +                                      rte_event_dispatcher_cb_t cb_fun,
> +                                      void *cb_data)
> +{
> +       return red_cb_manage(id, 0, true, true, cb_fun, cb_data);
> +}
> +
> +int
> +rte_event_dispatcher_unregister_fallback(uint8_t id)
> +{
> +       return red_cb_manage(id, 0, false, true, NULL, NULL);
> +}
> diff --git a/lib/librte_eventdev/rte_event_dispatcher.h b/lib/librte_eventdev/rte_event_dispatcher.h
> new file mode 100644
> index 0000000000..11f57571ab
> --- /dev/null
> +++ b/lib/librte_eventdev/rte_event_dispatcher.h
> @@ -0,0 +1,251 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2021 Ericsson AB
> + */
> +
> +#ifndef __RTE_EVENT_DISPATCHER_H__
> +#define __RTE_EVENT_DISPATCHER_H__
> +
> +/**
> + * @file
> + *
> + * RTE Event Dispatcher
> + *
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <rte_eventdev.h>
> +
> +/**
> + * Function prototype for dispatcher callbacks.
> + *
> + * @param events
> + *  Pointer to an array of events.
> + *
> + * @param num
> + *  The number of events in the @p events array.
> + *
> + * @param cb_data
> + *  The pointer supplied by the application in
> + *  rte_event_dispatcher_register() or
> + *  rte_event_dispatcher_register_fallback().
> + */
> +
> +typedef void (*rte_event_dispatcher_cb_t)(struct rte_event *events,
> +                                         uint16_t num, void *cb_data);
> +
> +/**
> + * Create an event dispatcher with the specified id.
> + *
> + * @param id
> + *  An application-specified, unique (across all event dispatcher
> + *  instances) identifier.
> + *
> + * @param event_dev_id
> + *  The identifier of the event device from which this event dispatcher
> + *  will dequeue events.
> + *
> + * @return
> + *   - 0: Success
> + *   - <0: Error code on failure
> + */
> +__rte_experimental
> +int
> +rte_event_dispatcher_create(uint8_t id, uint8_t event_dev_id);
> +
> +/**
> + * Frees an event dispatcher with the specified id.
> + *
> + * @param id
> + *  The event dispatcher identifier.
> + *
> + * @return
> + *   - 0: Success
> + *   - <0: Error code on failure
> + */
> +__rte_experimental
> +int
> +rte_event_dispatcher_free(uint8_t id);
> +
> +/**
> + * Retrieve the service identifier of the event dispatcher.
> + *
> + * @param id
> + *  The event dispatcher identifier.
> + *
> + * @param [out] service_id
> + *  A pointer to a caller-supplied buffer where the event dispatcher's
> + *  service id will be stored.
> + *
> + * @return
> + *  - 0: Success
> + *  - <0: Error code on failure.
> + */
> +__rte_experimental
> +int
> +rte_event_dispatcher_service_id_get(uint8_t id, uint32_t *service_id);
> +
> +/**
> + * Binds an event device port to a specific lcore on the specified
> + * event dispatcher.
> + *
> + * This function configures an event dispatcher to dequeue events from
> + * an event device port (as specified by @p event_port_id), in case
> + * its service function is run on particular lcore (as specified by @p
> + * lcore_id).
> + *
> + * Multiple event device ports may be bound to the same lcore. A
> + * particular port may only be bound to one lcore.
> + *
> + * If the event dispatcher service is mapped (with
> + * rte_service_map_lcore_set()) to a lcore for which no ports are
> + * bound, the service function will be a no-operation.
> + *
> + * @param id
> + *  The event dispatcher identifier.
> + *
> + * @param event_port_id
> + *  The event device port identifier.
> + *
> + * @param batch_size
> + *  The batch size to use in rte_event_dequeue_burst(), for the
> + *  configured event device port and lcore.
> + *
> + * @param timeout
> + *  The timeout parameter to use in rte_event_dequeue_burst(), for the
> + *  configured event device port and lcore.
> + *
> + * @return
> + *  - 0: Success
> + *  - <0: Error code on failure.
> + */
> +__rte_experimental
> +int
> +rte_event_dispatcher_bind_port_to_lcore(uint8_t id, uint8_t event_port_id,
> +                                       uint16_t batch_size, uint64_t timeout,
> +                                       unsigned int lcore_id);
> +
> +/**
> + * Unbind an event device port from a specific lcore.
> + *
> + * @param id
> + *  The event dispatcher identifier.
> + *
> + * @param event_port_id
> + *  The event device port identifier.
> + *
> + * @return
> + *  - 0: Success
> + *  - <0: Error code on failure.
> + */
> +__rte_experimental
> +int
> +rte_event_dispatcher_unbind_port_from_lcore(uint8_t id, uint8_t event_port_id,
> +                                           unsigned int lcore_id);
> +
> +/**
> + * Register a callback function for the specified queue identifier.
> + *
> + * At most one callback may be registered per queue id.
> + *
> + * The same callback function may be registered for multiple queue ids.
> + *
> + * For each callback invocation, events belonging to a single queue id
> + * will dispatched.
> + *
> + * @param id
> + *  The event dispatcher identifier.
> + *
> + * @param queue_id
> + *  The event device queue id for which @p cb_fun should be called.
> + *
> + * @param cb_fun
> + *  The callback function.
> + *
> + * @param cb_data
> + *  A pointer to some application-specific opaque data (or NULL),
> + *  which is supplied back to the application in the callback.
> + *
> + * @return
> + *  - 0: Success
> + *  - <0: Error code on failure.
> + */
> +__rte_experimental
> +int
> +rte_event_dispatcher_register(uint8_t id, uint8_t queue_id,
> +                             rte_event_dispatcher_cb_t cb_fun, void *cb_data);
> +
> +/**
> + * Unregister a callback function for the specified queue identifier.
> + *
> + * @param id
> + *  The event dispatcher identifier.
> + *
> + * @param queue_id
> + *  The event device queue id for which the callback should be removed.
> + *
> + * @return
> + *  - 0: Success
> + *  - <0: Error code on failure.
> + */
> +__rte_experimental
> +int
> +rte_event_dispatcher_unregister(uint8_t id, uint8_t queue_id);
> +
> +/**
> + * Register a fallback callback function for the specified queue
> + * identifier.
> + *
> + * Only events for which no queue-specific callback function will be
> + * dispatched to the @p cb_fun callback.
> + *
> + * At most one callback fallback function may be registered.
> + *
> + * For each callback invocation, only events belonging to a single
> + * queue id will be included.
> + *
> + * If the event dispatcher encounters an event with a queue id for
> + * which the application has not registered any specific callback, and
> + * there is also no fallback configured, the event will be dropped.
> + *
> + * @param id
> + *  The event dispatcher identifier.
> + *
> + * @param cb_fun
> + *  The fallback callback function.
> + *
> + * @param cb_data
> + *  A pointer to some application-specific opaque data (or NULL),
> + *  which is supplied back to the application in the callback.
> + *
> + * @return
> + *  - 0: Success
> + *  - <0: Error code on failure.
> + */
> +__rte_experimental
> +int
> +rte_event_dispatcher_register_fallback(uint8_t id,
> +                                      rte_event_dispatcher_cb_t cb_fun,
> +                                      void *cb_data);
> +
> +/**
> + * Unregister the fallback callback function.
> + *
> + * @param id
> + *  The event dispatcher identifier.
> + *
> + * @return
> + *  - 0: Success
> + *  - <0: Error code on failure.
> + */
> +__rte_experimental
> +int
> +rte_event_dispatcher_unregister_fallback(uint8_t id);
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* __RTE_EVENT_DISPATCHER__ */
> diff --git a/lib/librte_eventdev/rte_eventdev_version.map b/lib/librte_eventdev/rte_eventdev_version.map
> index 91a62cd077..dcb887601b 100644
> --- a/lib/librte_eventdev/rte_eventdev_version.map
> +++ b/lib/librte_eventdev/rte_eventdev_version.map
> @@ -134,4 +134,14 @@ EXPERIMENTAL {
>         __rte_eventdev_trace_crypto_adapter_queue_pair_del;
>         __rte_eventdev_trace_crypto_adapter_start;
>         __rte_eventdev_trace_crypto_adapter_stop;
> +
> +       rte_event_dispatcher_create;
> +       rte_event_dispatcher_free;
> +       rte_event_dispatcher_service_id_get;
> +       rte_event_dispatcher_bind_port_to_lcore;
> +       rte_event_dispatcher_unbind_port_from_lcore;
> +       rte_event_dispatcher_register;
> +       rte_event_dispatcher_unregister;
> +       rte_event_dispatcher_register_fallback;
> +       rte_event_dispatcher_unregister_fallback;
>  };
> --
> 2.25.1
>
  
Mattias Rönnblom Feb. 26, 2021, 7:48 a.m. UTC | #3
On 2021-02-22 16:28, Luca Boccassi wrote:
> On Thu, 2021-02-18 at 19:30 +0100, Mattias Rönnblom wrote:
>> The purpose of the event dispatcher is primarily to decouple different
>> parts of an application (e.g., processing pipeline stages), which
>> share the same underlying event device.
>>
>> The event dispatcher replaces the conditional logic (often, a switch
>> statement) that typically follows an event device dequeue operation,
>> where events are dispatched to different parts of the application
>> based on the destination queue id.
>>
>> The concept is similar to a UNIX file descriptor event loop library.
>> Instead of tying callback functions to fds as for example libevent
>> does, the event dispatcher binds callbacks to queue ids.
>>
>> An event dispatcher is configured to dequeue events from a specific
>> event device, and ties into the service core framework, to do its (and
>> the application's) work.
>>
>> The event dispatcher provides a convenient way for an eventdev-based
>> application to use service cores for application-level processing, and
>> thus for sharing those cores with other DPDK services.
>>
>> Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
>> ---
>>   lib/librte_eventdev/Makefile                 |   2 +
>>   lib/librte_eventdev/meson.build              |   6 +-
>>   lib/librte_eventdev/rte_event_dispatcher.c   | 420 +++++++++++++++++++
>>   lib/librte_eventdev/rte_event_dispatcher.h   | 251 +++++++++++
>>   lib/librte_eventdev/rte_eventdev_version.map |  10 +
>>   5 files changed, 687 insertions(+), 2 deletions(-)
>>   create mode 100644 lib/librte_eventdev/rte_event_dispatcher.c
>>   create mode 100644 lib/librte_eventdev/rte_event_dispatcher.h
> Hi,
>
> Is this intended to be used by applications or by PMDs? If the former,
> then IMHO the interface should really be based around (or allow using)
> FDs, so that it can be polled. Applications normally have more event
> sources that just DPDK.
>
It's for applications. File descriptors might be involved in the API 
somehow, but I think for most applications, they are too costly. I mean, 
the whole point of DPDK is to avoid the operating system overhead.
  
Mattias Rönnblom Feb. 26, 2021, 8:01 a.m. UTC | #4
On 2021-02-25 13:32, Jerin Jacob wrote:
> On Fri, Feb 19, 2021 at 12:00 AM Mattias Rönnblom
> <mattias.ronnblom@ericsson.com> wrote:
>> The purpose of the event dispatcher is primarily to decouple different
>> parts of an application (e.g., processing pipeline stages), which
>> share the same underlying event device.
>>
>> The event dispatcher replaces the conditional logic (often, a switch
>> statement) that typically follows an event device dequeue operation,
>> where events are dispatched to different parts of the application
>> based on the destination queue id.
> # If the device has all type queue[1] this RFC would restrict to
> use queue ONLY as stage. A stage can be a Queue Type also.
> How we can abstract this in this model?


"All queue type" is about scheduling policy. I would think that would be 
independent of the "logical endpoint" of the event (i.e., the queue id). 
I feel like I'm missing something here.


> # Also, I think, it may make sense to add this type of infrastructure as
> helper functions as these are built on top of existing APIs i.e There
> is no support
> required from the driver to establish this model. IMO, If we need to
> add such support as
> one fixed set of functionality, we could have helper APIs to express a certain
> usage of eventdev. Rather defining the that's only way to do this.
> I think, A helper function can be used to as abstraction to define
> this kind of model.
>
> # Also, There is function pointer overhead and aggregating the events
> in implementation,
> That may be not always "the" optimized model of making it work vs switch case in
> application.


Sure, but what to do in a reasonable generic framework?


If you are very sensitive to that 20 cc or whatever function pointer 
call, you won't use this library. Or you will, and use static linking 
and LTO to get rid of that overhead.


Probably, you have a few queues, not many. Probably, your dequeue bursts 
are large, if the system load is high (and otherwise, you don't care 
about efficiency). Then, you will have at least of couple of events per 
function call.


>
> [1]
> See RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES in
> https://protect2.fireeye.com/v1/url?k=dcf3a2b9-83689b94-dcf3e222-8692dc8284cb-5ba19813a1556a85&q=1&e=0ff1861f-8e24-453c-a93b-73fd88e0f316&u=https%3A%2F%2Fdoc.dpdk.org%2Fguides%2Fprog_guide%2Feventdev.html
>
>
>> The concept is similar to a UNIX file descriptor event loop library.
>> Instead of tying callback functions to fds as for example libevent
>> does, the event dispatcher binds callbacks to queue ids.
>>
>> An event dispatcher is configured to dequeue events from a specific
>> event device, and ties into the service core framework, to do its (and
>> the application's) work.
>>
>> The event dispatcher provides a convenient way for an eventdev-based
>> application to use service cores for application-level processing, and
>> thus for sharing those cores with other DPDK services.
>>
>> Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
>> ---
>>   lib/librte_eventdev/Makefile                 |   2 +
>>   lib/librte_eventdev/meson.build              |   6 +-
>>   lib/librte_eventdev/rte_event_dispatcher.c   | 420 +++++++++++++++++++
>>   lib/librte_eventdev/rte_event_dispatcher.h   | 251 +++++++++++
>>   lib/librte_eventdev/rte_eventdev_version.map |  10 +
>>   5 files changed, 687 insertions(+), 2 deletions(-)
>>   create mode 100644 lib/librte_eventdev/rte_event_dispatcher.c
>>   create mode 100644 lib/librte_eventdev/rte_event_dispatcher.h
>>
>> diff --git a/lib/librte_eventdev/Makefile b/lib/librte_eventdev/Makefile
>> index 0715256bb4..614d53af1b 100644
>> --- a/lib/librte_eventdev/Makefile
>> +++ b/lib/librte_eventdev/Makefile
>> @@ -26,6 +26,7 @@ SRCS-y += rte_event_eth_rx_adapter.c
>>   SRCS-y += rte_event_timer_adapter.c
>>   SRCS-y += rte_event_crypto_adapter.c
>>   SRCS-y += rte_event_eth_tx_adapter.c
>> +SRCS-y += rte_event_dispatcher.c
>>
>>   # export include files
>>   SYMLINK-y-include += rte_eventdev.h
>> @@ -40,6 +41,7 @@ SYMLINK-y-include += rte_event_timer_adapter.h
>>   SYMLINK-y-include += rte_event_timer_adapter_pmd.h
>>   SYMLINK-y-include += rte_event_crypto_adapter.h
>>   SYMLINK-y-include += rte_event_eth_tx_adapter.h
>> +SYMLINK-y-include += rte_event_dispatcher.h
>>
>>   # versioning export map
>>   EXPORT_MAP := rte_eventdev_version.map
>> diff --git a/lib/librte_eventdev/meson.build b/lib/librte_eventdev/meson.build
>> index d1f25ee5ca..2ca81983b5 100644
>> --- a/lib/librte_eventdev/meson.build
>> +++ b/lib/librte_eventdev/meson.build
>> @@ -13,7 +13,8 @@ sources = files('rte_eventdev.c',
>>                  'rte_event_eth_rx_adapter.c',
>>                  'rte_event_timer_adapter.c',
>>                  'rte_event_crypto_adapter.c',
>> -               'rte_event_eth_tx_adapter.c')
>> +               'rte_event_eth_tx_adapter.c',
>> +               'rte_event_dispatcher.c')
>>   headers = files('rte_eventdev.h',
>>                  'rte_eventdev_pmd.h',
>>                  'rte_eventdev_pmd_pci.h',
>> @@ -25,5 +26,6 @@ headers = files('rte_eventdev.h',
>>                  'rte_event_timer_adapter.h',
>>                  'rte_event_timer_adapter_pmd.h',
>>                  'rte_event_crypto_adapter.h',
>> -               'rte_event_eth_tx_adapter.h')
>> +               'rte_event_eth_tx_adapter.h',
>> +               'rte_event_dispatcher.h')
>>   deps += ['ring', 'ethdev', 'hash', 'mempool', 'mbuf', 'timer', 'cryptodev']
>> diff --git a/lib/librte_eventdev/rte_event_dispatcher.c b/lib/librte_eventdev/rte_event_dispatcher.c
>> new file mode 100644
>> index 0000000000..1c7e55a752
>> --- /dev/null
>> +++ b/lib/librte_eventdev/rte_event_dispatcher.c
>> @@ -0,0 +1,420 @@
>> +/* SPDX-License-Identifier: BSD-3-Clause
>> + * Copyright(c) 2021 Ericsson AB
>> + */
>> +
>> +#include <stdbool.h>
>> +#include <stdint.h>
>> +
>> +#include <rte_lcore.h>
>> +#include <rte_service_component.h>
>> +#include <rte_eventdev_pmd.h>
>> +
>> +#include <rte_event_dispatcher.h>
>> +
>> +#define RED_MAX_PORTS_PER_LCORE (4)
>> +
>> +struct rte_event_dispatcher_lcore_port {
>> +       uint8_t port_id;
>> +       uint16_t batch_size;
>> +       uint64_t timeout;
>> +};
>> +
>> +struct rte_event_dispatcher_lcore {
>> +       uint8_t num_ports;
>> +       struct rte_event_dispatcher_lcore_port ports[RED_MAX_PORTS_PER_LCORE];
>> +};
>> +
>> +struct rte_event_dispatcher_cb {
>> +       rte_event_dispatcher_cb_t cb_fun;
>> +       void *cb_data;
>> +};
>> +
>> +struct rte_event_dispatcher {
>> +       uint8_t id;
>> +       uint8_t event_dev_id;
>> +       int socket_id;
>> +       uint32_t service_id;
>> +       struct rte_event_dispatcher_lcore lcores[RTE_MAX_LCORE];
>> +       struct rte_event_dispatcher_cb queue_cbs[UINT8_MAX];
>> +       struct rte_event_dispatcher_cb fallback;
>> +};
>> +
>> +static struct rte_event_dispatcher *dispatchers[UINT8_MAX];
>> +
>> +static bool
>> +red_has_dispatcher(uint8_t id)
>> +{
>> +       return dispatchers[id] != NULL;
>> +}
>> +
>> +static struct rte_event_dispatcher *
>> +red_get_dispatcher(uint8_t id)
>> +{
>> +       return dispatchers[id];
>> +}
>> +
>> +static void
>> +red_set_dispatcher(uint8_t id, struct rte_event_dispatcher *dispatcher)
>> +{
>> +       dispatchers[id] = dispatcher;
>> +}
>> +
>> +#define RED_VALID_ID_OR_RET_EINVAL(id)                                 \
>> +       do {                                                            \
>> +               if (unlikely(!red_has_dispatcher(id))) {                \
>> +                       RTE_EDEV_LOG_ERR("Invalid dispatcher id %d\n", id); \
>> +                       return -EINVAL;                                 \
>> +               }                                                       \
>> +       } while (0)
>> +
>> +static struct rte_event_dispatcher_cb *
>> +red_lookup_cb(struct rte_event_dispatcher *dispatcher, uint8_t queue_id)
>> +{
>> +       struct rte_event_dispatcher_cb *cb = &dispatcher->queue_cbs[queue_id];
>> +
>> +       if (unlikely(cb->cb_fun == NULL))
>> +           cb = &dispatcher->fallback;
>> +
>> +       return cb;
>> +}
>> +
>> +static void
>> +red_dispatch_events(struct rte_event_dispatcher *dispatcher,
>> +                   struct rte_event *events, uint16_t num_events)
>> +{
>> +       uint16_t cb_start;
>> +       uint16_t cb_len;
>> +
>> +       for (cb_start = 0; cb_start < num_events; cb_start += cb_len) {
>> +               uint16_t cb_end = cb_start;
>> +               uint8_t queue_id = events[cb_start].queue_id;
>> +               struct rte_event_dispatcher_cb *cb;
>> +
>> +               while (++cb_end < num_events &&
>> +                      events[cb_end].queue_id == queue_id)
>> +                       ;
>> +
>> +               cb_len = cb_end - cb_start;
>> +
>> +               cb = red_lookup_cb(dispatcher, queue_id);
>> +
>> +               if (unlikely(cb->cb_fun == NULL)) {
>> +                       RTE_EDEV_LOG_ERR("Attempted to dispatch %d events "
>> +                                        "for queue id %d, but no queue or "
>> +                                        "fallback cb were configured\n",
>> +                                        cb_len, queue_id);
>> +                       continue;
>> +               }
>> +
>> +               cb->cb_fun(&events[cb_start], cb_len, cb->cb_data);
>> +       }
>> +}
>> +
>> +static void
>> +red_port_dequeue(struct rte_event_dispatcher *dispatcher,
>> +                struct rte_event_dispatcher_lcore_port *port)
>> +{
>> +       uint16_t batch_size = port->batch_size;
>> +       struct rte_event events[batch_size];
>> +       uint16_t n;
>> +
>> +       n = rte_event_dequeue_burst(dispatcher->event_dev_id, port->port_id,
>> +                                   events, batch_size, port->timeout);
>> +
>> +       red_dispatch_events(dispatcher, events, n);
>> +}
>> +
>> +static int32_t
>> +red_lcore_process(void *userdata)
>> +{
>> +       uint16_t i;
>> +       struct rte_event_dispatcher *dispatcher = userdata;
>> +       unsigned int lcore_id = rte_lcore_id();
>> +       struct rte_event_dispatcher_lcore *lcore =
>> +               &dispatcher->lcores[lcore_id];
>> +
>> +       for (i = 0; i < lcore->num_ports; i++) {
>> +               struct rte_event_dispatcher_lcore_port *port =
>> +                       &lcore->ports[i];
>> +
>> +               red_port_dequeue(dispatcher, port);
>> +       }
>> +
>> +       return 0;
>> +}
>> +
>> +static int
>> +red_service_runstate_set(uint32_t service_id, int state)
>> +{
>> +       int rc;
>> +
>> +       rc = rte_service_component_runstate_set(service_id, state);
>> +
>> +       if (rc)
>> +               RTE_EDEV_LOG_ERR("Error %d occurred while setting service "
>> +                                "component run state to %d\n", rc, state);
>> +
>> +       return rc;
>> +}
>> +
>> +static int
>> +red_service_register(struct rte_event_dispatcher *dispatcher)
>> +{
>> +       struct rte_service_spec service = {
>> +               .callback = red_lcore_process,
>> +               .callback_userdata = dispatcher,
>> +               .capabilities = RTE_SERVICE_CAP_MT_SAFE,
>> +               .socket_id = dispatcher->socket_id
>> +       };
>> +       int rc;
>> +
>> +       snprintf(service.name, RTE_SERVICE_NAME_MAX - 1, "red_%d",
>> +                dispatcher->id);
>> +
>> +       rc = rte_service_component_register(&service, &dispatcher->service_id);
>> +
>> +       if (rc)
>> +               RTE_EDEV_LOG_ERR("Registration of event dispatcher service "
>> +                                "%s failed with error code %d\n",
>> +                                service.name, rc);
>> +
>> +       rc = red_service_runstate_set(dispatcher->service_id, 1);
>> +
>> +       if (rc)
>> +               rte_service_component_unregister(dispatcher->service_id);
>> +
>> +       return rc;
>> +}
>> +
>> +static int
>> +red_service_unregister(struct rte_event_dispatcher *dispatcher)
>> +{
>> +       int rc;
>> +
>> +       rc = red_service_runstate_set(dispatcher->service_id, 0);
>> +
>> +       if (rc)
>> +               return rc;
>> +
>> +       rc = rte_service_component_unregister(dispatcher->service_id);
>> +
>> +       if (rc)
>> +               RTE_EDEV_LOG_ERR("Unregistration of event dispatcher service "
>> +                                "failed with error code %d\n", rc);
>> +
>> +       return rc;
>> +}
>> +
>> +int
>> +rte_event_dispatcher_create(uint8_t id, uint8_t event_dev_id)
>> +{
>> +       int socket_id;
>> +       struct rte_event_dispatcher *dispatcher;
>> +       int rc;
>> +
>> +       if (red_has_dispatcher(id)) {
>> +               RTE_EDEV_LOG_ERR("Dispatcher with id %d already exists\n",
>> +                                id);
>> +               return -EEXIST;
>> +       }
>> +
>> +       socket_id = rte_event_dev_socket_id(event_dev_id);
>> +
>> +       dispatcher =
>> +               rte_malloc_socket("event dispatcher",
>> +                                 sizeof(struct rte_event_dispatcher),
>> +                                 RTE_CACHE_LINE_SIZE, socket_id);
>> +
>> +       if (dispatcher == NULL) {
>> +               RTE_EDEV_LOG_ERR("Unable to allocate memory for event "
>> +                                "dispatcher\n");
>> +               return -ENOMEM;
>> +       }
>> +
>> +       *dispatcher = (struct rte_event_dispatcher) {
>> +               .id = id,
>> +               .event_dev_id = event_dev_id,
>> +               .socket_id = socket_id
>> +       };
>> +
>> +       rc = red_service_register(dispatcher);
>> +
>> +       if (rc < 0) {
>> +               rte_free(dispatcher);
>> +               return rc;
>> +       }
>> +
>> +       red_set_dispatcher(id, dispatcher);
>> +
>> +       return 0;
>> +}
>> +
>> +int
>> +rte_event_dispatcher_free(uint8_t id)
>> +{
>> +       struct rte_event_dispatcher *dispatcher;
>> +       int rc;
>> +
>> +       RED_VALID_ID_OR_RET_EINVAL(id);
>> +       dispatcher = red_get_dispatcher(id);
>> +
>> +       rc = red_service_unregister(dispatcher);
>> +
>> +       if (rc)
>> +               return rc;
>> +
>> +       red_set_dispatcher(id, NULL);
>> +
>> +       rte_free(dispatcher);
>> +
>> +       return 0;
>> +}
>> +
>> +int
>> +rte_event_dispatcher_service_id_get(uint8_t id, uint32_t *service_id)
>> +{
>> +       struct rte_event_dispatcher *dispatcher;
>> +
>> +       RED_VALID_ID_OR_RET_EINVAL(id);
>> +       dispatcher = red_get_dispatcher(id);
>> +
>> +       *service_id = dispatcher->service_id;
>> +
>> +       return 0;
>> +}
>> +
>> +static int16_t
>> +lcore_port_index(struct rte_event_dispatcher_lcore *lcore,
>> +                uint8_t event_port_id)
>> +{
>> +       uint16_t i;
>> +
>> +       for (i = 0; i < lcore->num_ports; i++) {
>> +               struct rte_event_dispatcher_lcore_port *port =
>> +                       &lcore->ports[i];
>> +               if (port->port_id == event_port_id)
>> +                       return i;
>> +       }
>> +
>> +       return -1;
>> +}
>> +
>> +int
>> +rte_event_dispatcher_bind_port_to_lcore(uint8_t id, uint8_t event_port_id,
>> +                                       uint16_t batch_size, uint64_t timeout,
>> +                                       unsigned int lcore_id)
>> +{
>> +       struct rte_event_dispatcher *dispatcher;
>> +       struct rte_event_dispatcher_lcore *lcore;
>> +       struct rte_event_dispatcher_lcore_port *port;
>> +
>> +       RED_VALID_ID_OR_RET_EINVAL(id);
>> +       dispatcher = red_get_dispatcher(id);
>> +
>> +       lcore = &dispatcher->lcores[lcore_id];
>> +
>> +       if (lcore->num_ports == RED_MAX_PORTS_PER_LCORE)
>> +               return -ENOMEM;
>> +
>> +       if (lcore_port_index(lcore, event_port_id) >= 0)
>> +               return -EEXIST;
>> +
>> +       port = &lcore->ports[lcore->num_ports];
>> +
>> +       *port = (struct rte_event_dispatcher_lcore_port) {
>> +               .port_id = event_port_id,
>> +               .batch_size = batch_size,
>> +               .timeout = timeout
>> +       };
>> +
>> +       lcore->num_ports++;
>> +
>> +       return 0;
>> +}
>> +
>> +int
>> +rte_event_dispatcher_unbind_port_from_lcore(uint8_t id, uint8_t event_port_id,
>> +                                           unsigned int lcore_id)
>> +{
>> +       struct rte_event_dispatcher *dispatcher;
>> +       struct rte_event_dispatcher_lcore *lcore;
>> +       int16_t port_idx;
>> +       struct rte_event_dispatcher_lcore_port *port;
>> +       struct rte_event_dispatcher_lcore_port *last;
>> +
>> +       RED_VALID_ID_OR_RET_EINVAL(id);
>> +       dispatcher = red_get_dispatcher(id);
>> +
>> +       lcore = &dispatcher->lcores[lcore_id];
>> +
>> +       port_idx = lcore_port_index(lcore, event_port_id);
>> +
>> +       if (port_idx < 0)
>> +               return -ENOENT;
>> +
>> +       port = &lcore->ports[port_idx];
>> +       last = &lcore->ports[lcore->num_ports - 1];
>> +
>> +       if (port != last)
>> +               *port = *last;
>> +
>> +       lcore->num_ports--;
>> +
>> +       return 0;
>> +}
>> +
>> +static int
>> +red_cb_manage(uint8_t id, uint8_t queue_id, bool reg, bool fallback,
>> +          rte_event_dispatcher_cb_t cb_fun, void *cb_data)
>> +{
>> +       struct rte_event_dispatcher *dispatcher;
>> +       struct rte_event_dispatcher_cb *cb;
>> +
>> +       RED_VALID_ID_OR_RET_EINVAL(id);
>> +       dispatcher = red_get_dispatcher(id);
>> +
>> +       if (fallback)
>> +               cb = &dispatcher->fallback;
>> +       else
>> +               cb = &dispatcher->queue_cbs[queue_id];
>> +
>> +       if (reg && cb->cb_fun != NULL)
>> +               return -EEXIST;
>> +
>> +       if (!reg && cb->cb_fun == NULL)
>> +               return -ENOENT;
>> +
>> +       *cb = (struct rte_event_dispatcher_cb) {
>> +               .cb_fun = cb_fun,
>> +               .cb_data = cb_data
>> +       };
>> +
>> +       return 0;
>> +}
>> +
>> +int
>> +rte_event_dispatcher_register(uint8_t id, uint8_t queue_id,
>> +                             rte_event_dispatcher_cb_t cb_fun, void *cb_data)
>> +{
>> +       return red_cb_manage(id, queue_id, true, false, cb_fun, cb_data);
>> +}
>> +
>> +int
>> +rte_event_dispatcher_unregister(uint8_t id, uint8_t queue_id)
>> +{
>> +       return red_cb_manage(id, queue_id, false, false, NULL, NULL);
>> +}
>> +
>> +int
>> +rte_event_dispatcher_register_fallback(uint8_t id,
>> +                                      rte_event_dispatcher_cb_t cb_fun,
>> +                                      void *cb_data)
>> +{
>> +       return red_cb_manage(id, 0, true, true, cb_fun, cb_data);
>> +}
>> +
>> +int
>> +rte_event_dispatcher_unregister_fallback(uint8_t id)
>> +{
>> +       return red_cb_manage(id, 0, false, true, NULL, NULL);
>> +}
>> diff --git a/lib/librte_eventdev/rte_event_dispatcher.h b/lib/librte_eventdev/rte_event_dispatcher.h
>> new file mode 100644
>> index 0000000000..11f57571ab
>> --- /dev/null
>> +++ b/lib/librte_eventdev/rte_event_dispatcher.h
>> @@ -0,0 +1,251 @@
>> +/* SPDX-License-Identifier: BSD-3-Clause
>> + * Copyright(c) 2021 Ericsson AB
>> + */
>> +
>> +#ifndef __RTE_EVENT_DISPATCHER_H__
>> +#define __RTE_EVENT_DISPATCHER_H__
>> +
>> +/**
>> + * @file
>> + *
>> + * RTE Event Dispatcher
>> + *
>> + */
>> +
>> +#ifdef __cplusplus
>> +extern "C" {
>> +#endif
>> +
>> +#include <rte_eventdev.h>
>> +
>> +/**
>> + * Function prototype for dispatcher callbacks.
>> + *
>> + * @param events
>> + *  Pointer to an array of events.
>> + *
>> + * @param num
>> + *  The number of events in the @p events array.
>> + *
>> + * @param cb_data
>> + *  The pointer supplied by the application in
>> + *  rte_event_dispatcher_register() or
>> + *  rte_event_dispatcher_register_fallback().
>> + */
>> +
>> +typedef void (*rte_event_dispatcher_cb_t)(struct rte_event *events,
>> +                                         uint16_t num, void *cb_data);
>> +
>> +/**
>> + * Create an event dispatcher with the specified id.
>> + *
>> + * @param id
>> + *  An application-specified, unique (across all event dispatcher
>> + *  instances) identifier.
>> + *
>> + * @param event_dev_id
>> + *  The identifier of the event device from which this event dispatcher
>> + *  will dequeue events.
>> + *
>> + * @return
>> + *   - 0: Success
>> + *   - <0: Error code on failure
>> + */
>> +__rte_experimental
>> +int
>> +rte_event_dispatcher_create(uint8_t id, uint8_t event_dev_id);
>> +
>> +/**
>> + * Frees an event dispatcher with the specified id.
>> + *
>> + * @param id
>> + *  The event dispatcher identifier.
>> + *
>> + * @return
>> + *   - 0: Success
>> + *   - <0: Error code on failure
>> + */
>> +__rte_experimental
>> +int
>> +rte_event_dispatcher_free(uint8_t id);
>> +
>> +/**
>> + * Retrieve the service identifier of the event dispatcher.
>> + *
>> + * @param id
>> + *  The event dispatcher identifier.
>> + *
>> + * @param [out] service_id
>> + *  A pointer to a caller-supplied buffer where the event dispatcher's
>> + *  service id will be stored.
>> + *
>> + * @return
>> + *  - 0: Success
>> + *  - <0: Error code on failure.
>> + */
>> +__rte_experimental
>> +int
>> +rte_event_dispatcher_service_id_get(uint8_t id, uint32_t *service_id);
>> +
>> +/**
>> + * Binds an event device port to a specific lcore on the specified
>> + * event dispatcher.
>> + *
>> + * This function configures an event dispatcher to dequeue events from
>> + * an event device port (as specified by @p event_port_id), in case
>> + * its service function is run on particular lcore (as specified by @p
>> + * lcore_id).
>> + *
>> + * Multiple event device ports may be bound to the same lcore. A
>> + * particular port may only be bound to one lcore.
>> + *
>> + * If the event dispatcher service is mapped (with
>> + * rte_service_map_lcore_set()) to a lcore for which no ports are
>> + * bound, the service function will be a no-operation.
>> + *
>> + * @param id
>> + *  The event dispatcher identifier.
>> + *
>> + * @param event_port_id
>> + *  The event device port identifier.
>> + *
>> + * @param batch_size
>> + *  The batch size to use in rte_event_dequeue_burst(), for the
>> + *  configured event device port and lcore.
>> + *
>> + * @param timeout
>> + *  The timeout parameter to use in rte_event_dequeue_burst(), for the
>> + *  configured event device port and lcore.
>> + *
>> + * @return
>> + *  - 0: Success
>> + *  - <0: Error code on failure.
>> + */
>> +__rte_experimental
>> +int
>> +rte_event_dispatcher_bind_port_to_lcore(uint8_t id, uint8_t event_port_id,
>> +                                       uint16_t batch_size, uint64_t timeout,
>> +                                       unsigned int lcore_id);
>> +
>> +/**
>> + * Unbind an event device port from a specific lcore.
>> + *
>> + * @param id
>> + *  The event dispatcher identifier.
>> + *
>> + * @param event_port_id
>> + *  The event device port identifier.
>> + *
>> + * @return
>> + *  - 0: Success
>> + *  - <0: Error code on failure.
>> + */
>> +__rte_experimental
>> +int
>> +rte_event_dispatcher_unbind_port_from_lcore(uint8_t id, uint8_t event_port_id,
>> +                                           unsigned int lcore_id);
>> +
>> +/**
>> + * Register a callback function for the specified queue identifier.
>> + *
>> + * At most one callback may be registered per queue id.
>> + *
>> + * The same callback function may be registered for multiple queue ids.
>> + *
>> + * For each callback invocation, events belonging to a single queue id
>> + * will dispatched.
>> + *
>> + * @param id
>> + *  The event dispatcher identifier.
>> + *
>> + * @param queue_id
>> + *  The event device queue id for which @p cb_fun should be called.
>> + *
>> + * @param cb_fun
>> + *  The callback function.
>> + *
>> + * @param cb_data
>> + *  A pointer to some application-specific opaque data (or NULL),
>> + *  which is supplied back to the application in the callback.
>> + *
>> + * @return
>> + *  - 0: Success
>> + *  - <0: Error code on failure.
>> + */
>> +__rte_experimental
>> +int
>> +rte_event_dispatcher_register(uint8_t id, uint8_t queue_id,
>> +                             rte_event_dispatcher_cb_t cb_fun, void *cb_data);
>> +
>> +/**
>> + * Unregister a callback function for the specified queue identifier.
>> + *
>> + * @param id
>> + *  The event dispatcher identifier.
>> + *
>> + * @param queue_id
>> + *  The event device queue id for which the callback should be removed.
>> + *
>> + * @return
>> + *  - 0: Success
>> + *  - <0: Error code on failure.
>> + */
>> +__rte_experimental
>> +int
>> +rte_event_dispatcher_unregister(uint8_t id, uint8_t queue_id);
>> +
>> +/**
>> + * Register a fallback callback function for the specified queue
>> + * identifier.
>> + *
>> + * Only events for which no queue-specific callback function will be
>> + * dispatched to the @p cb_fun callback.
>> + *
>> + * At most one callback fallback function may be registered.
>> + *
>> + * For each callback invocation, only events belonging to a single
>> + * queue id will be included.
>> + *
>> + * If the event dispatcher encounters an event with a queue id for
>> + * which the application has not registered any specific callback, and
>> + * there is also no fallback configured, the event will be dropped.
>> + *
>> + * @param id
>> + *  The event dispatcher identifier.
>> + *
>> + * @param cb_fun
>> + *  The fallback callback function.
>> + *
>> + * @param cb_data
>> + *  A pointer to some application-specific opaque data (or NULL),
>> + *  which is supplied back to the application in the callback.
>> + *
>> + * @return
>> + *  - 0: Success
>> + *  - <0: Error code on failure.
>> + */
>> +__rte_experimental
>> +int
>> +rte_event_dispatcher_register_fallback(uint8_t id,
>> +                                      rte_event_dispatcher_cb_t cb_fun,
>> +                                      void *cb_data);
>> +
>> +/**
>> + * Unregister the fallback callback function.
>> + *
>> + * @param id
>> + *  The event dispatcher identifier.
>> + *
>> + * @return
>> + *  - 0: Success
>> + *  - <0: Error code on failure.
>> + */
>> +__rte_experimental
>> +int
>> +rte_event_dispatcher_unregister_fallback(uint8_t id);
>> +
>> +#ifdef __cplusplus
>> +}
>> +#endif
>> +
>> +#endif /* __RTE_EVENT_DISPATCHER__ */
>> diff --git a/lib/librte_eventdev/rte_eventdev_version.map b/lib/librte_eventdev/rte_eventdev_version.map
>> index 91a62cd077..dcb887601b 100644
>> --- a/lib/librte_eventdev/rte_eventdev_version.map
>> +++ b/lib/librte_eventdev/rte_eventdev_version.map
>> @@ -134,4 +134,14 @@ EXPERIMENTAL {
>>          __rte_eventdev_trace_crypto_adapter_queue_pair_del;
>>          __rte_eventdev_trace_crypto_adapter_start;
>>          __rte_eventdev_trace_crypto_adapter_stop;
>> +
>> +       rte_event_dispatcher_create;
>> +       rte_event_dispatcher_free;
>> +       rte_event_dispatcher_service_id_get;
>> +       rte_event_dispatcher_bind_port_to_lcore;
>> +       rte_event_dispatcher_unbind_port_from_lcore;
>> +       rte_event_dispatcher_register;
>> +       rte_event_dispatcher_unregister;
>> +       rte_event_dispatcher_register_fallback;
>> +       rte_event_dispatcher_unregister_fallback;
>>   };
>> --
>> 2.25.1
>>
  
Jerin Jacob March 7, 2021, 1:04 p.m. UTC | #5
On Fri, Feb 26, 2021 at 1:31 PM Mattias Rönnblom
<mattias.ronnblom@ericsson.com> wrote:
>
> On 2021-02-25 13:32, Jerin Jacob wrote:
> > On Fri, Feb 19, 2021 at 12:00 AM Mattias Rönnblom
> > <mattias.ronnblom@ericsson.com> wrote:
> >> The purpose of the event dispatcher is primarily to decouple different
> >> parts of an application (e.g., processing pipeline stages), which
> >> share the same underlying event device.
> >>
> >> The event dispatcher replaces the conditional logic (often, a switch
> >> statement) that typically follows an event device dequeue operation,
> >> where events are dispatched to different parts of the application
> >> based on the destination queue id.
> > # If the device has all type queue[1] this RFC would restrict to
> > use queue ONLY as stage. A stage can be a Queue Type also.
> > How we can abstract this in this model?
>
>
> "All queue type" is about scheduling policy. I would think that would be
> independent of the "logical endpoint" of the event (i.e., the queue id).
> I feel like I'm missing something here.

Each queue type also can be represented as a stage.
For example, If the system has only one queue, the Typical IPsec
outbound stages can be
Q0-Ordered(For SA lookup) -> Q0(Atomic)(For Sequence number update) ->
Q0(Orderd)(Crypto operation)->Q0(Atomic)(Send on wire)

>
>
> > # Also, I think, it may make sense to add this type of infrastructure as
> > helper functions as these are built on top of existing APIs i.e There
> > is no support
> > required from the driver to establish this model. IMO, If we need to
> > add such support as
> > one fixed set of functionality, we could have helper APIs to express a certain
> > usage of eventdev. Rather defining the that's only way to do this.
> > I think, A helper function can be used to as abstraction to define
> > this kind of model.
> >
> > # Also, There is function pointer overhead and aggregating the events
> > in implementation,
> > That may be not always "the" optimized model of making it work vs switch case in
> > application.
>
>
> Sure, but what to do in a reasonable generic framework?
>
>
> If you are very sensitive to that 20 cc or whatever function pointer
> call, you won't use this library. Or you will, and use static linking
> and LTO to get rid of that overhead.
>
>
> Probably, you have a few queues, not many. Probably, your dequeue bursts
> are large, if the system load is high (and otherwise, you don't care
> about efficiency). Then, you will have at least of couple of events per
> function call.

I am fine with this library and exposing it as a function pointer if
someone needs to
have a "helper" function to model the system around this logic.

This RFC looks good to me in general. I would suggest to make it as

- Helper functions i.e if someone chooses to do write the stage in
this way, it can be enabled through this helper function.
By choosing as helper function it depicts, this is one way to do the
stuff but the NOT ONLY WAY.
- Abstract stages as a queue(which already added in the patch) and
each type in the queue for all type queue cases.
- Enhance test-eventdev to showcase the functionality and performance
of these helpers.

Thanks for the RFC.

>
>
> >
> > [1]
> > See RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES in
> > https://protect2.fireeye.com/v1/url?k=dcf3a2b9-83689b94-dcf3e222-8692dc8284cb-5ba19813a1556a85&q=1&e=0ff1861f-8e24-453c-a93b-73fd88e0f316&u=https%3A%2F%2Fdoc.dpdk.org%2Fguides%2Fprog_guide%2Feventdev.html
> >
> >
> >> The concept is similar to a UNIX file descriptor event loop library.
> >> Instead of tying callback functions to fds as for example libevent
> >> does, the event dispatcher binds callbacks to queue ids.
> >>
> >> An event dispatcher is configured to dequeue events from a specific
> >> event device, and ties into the service core framework, to do its (and
> >> the application's) work.
> >>
> >> The event dispatcher provides a convenient way for an eventdev-based
> >> application to use service cores for application-level processing, and
> >> thus for sharing those cores with other DPDK services.
> >>
> >> Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
> >> ---
> >>   lib/librte_eventdev/Makefile                 |   2 +
> >>   lib/librte_eventdev/meson.build              |   6 +-
> >>   lib/librte_eventdev/rte_event_dispatcher.c   | 420 +++++++++++++++++++
> >>   lib/librte_eventdev/rte_event_dispatcher.h   | 251 +++++++++++
> >>   lib/librte_eventdev/rte_eventdev_version.map |  10 +
> >>   5 files changed, 687 insertions(+), 2 deletions(-)
> >>   create mode 100644 lib/librte_eventdev/rte_event_dispatcher.c
> >>   create mode 100644 lib/librte_eventdev/rte_event_dispatcher.h
> >>
> >> diff --git a/lib/librte_eventdev/Makefile b/lib/librte_eventdev/Makefile
> >> index 0715256bb4..614d53af1b 100644
> >> --- a/lib/librte_eventdev/Makefile
> >> +++ b/lib/librte_eventdev/Makefile
> >> @@ -26,6 +26,7 @@ SRCS-y += rte_event_eth_rx_adapter.c
> >>   SRCS-y += rte_event_timer_adapter.c
> >>   SRCS-y += rte_event_crypto_adapter.c
> >>   SRCS-y += rte_event_eth_tx_adapter.c
> >> +SRCS-y += rte_event_dispatcher.c
> >>
> >>   # export include files
> >>   SYMLINK-y-include += rte_eventdev.h
> >> @@ -40,6 +41,7 @@ SYMLINK-y-include += rte_event_timer_adapter.h
> >>   SYMLINK-y-include += rte_event_timer_adapter_pmd.h
> >>   SYMLINK-y-include += rte_event_crypto_adapter.h
> >>   SYMLINK-y-include += rte_event_eth_tx_adapter.h
> >> +SYMLINK-y-include += rte_event_dispatcher.h
> >>
> >>   # versioning export map
> >>   EXPORT_MAP := rte_eventdev_version.map
> >> diff --git a/lib/librte_eventdev/meson.build b/lib/librte_eventdev/meson.build
> >> index d1f25ee5ca..2ca81983b5 100644
> >> --- a/lib/librte_eventdev/meson.build
> >> +++ b/lib/librte_eventdev/meson.build
> >> @@ -13,7 +13,8 @@ sources = files('rte_eventdev.c',
> >>                  'rte_event_eth_rx_adapter.c',
> >>                  'rte_event_timer_adapter.c',
> >>                  'rte_event_crypto_adapter.c',
> >> -               'rte_event_eth_tx_adapter.c')
> >> +               'rte_event_eth_tx_adapter.c',
> >> +               'rte_event_dispatcher.c')
> >>   headers = files('rte_eventdev.h',
> >>                  'rte_eventdev_pmd.h',
> >>                  'rte_eventdev_pmd_pci.h',
> >> @@ -25,5 +26,6 @@ headers = files('rte_eventdev.h',
> >>                  'rte_event_timer_adapter.h',
> >>                  'rte_event_timer_adapter_pmd.h',
> >>                  'rte_event_crypto_adapter.h',
> >> -               'rte_event_eth_tx_adapter.h')
> >> +               'rte_event_eth_tx_adapter.h',
> >> +               'rte_event_dispatcher.h')
> >>   deps += ['ring', 'ethdev', 'hash', 'mempool', 'mbuf', 'timer', 'cryptodev']
> >> diff --git a/lib/librte_eventdev/rte_event_dispatcher.c b/lib/librte_eventdev/rte_event_dispatcher.c
> >> new file mode 100644
> >> index 0000000000..1c7e55a752
> >> --- /dev/null
> >> +++ b/lib/librte_eventdev/rte_event_dispatcher.c
> >> @@ -0,0 +1,420 @@
> >> +/* SPDX-License-Identifier: BSD-3-Clause
> >> + * Copyright(c) 2021 Ericsson AB
> >> + */
> >> +
> >> +#include <stdbool.h>
> >> +#include <stdint.h>
> >> +
> >> +#include <rte_lcore.h>
> >> +#include <rte_service_component.h>
> >> +#include <rte_eventdev_pmd.h>
> >> +
> >> +#include <rte_event_dispatcher.h>
> >> +
> >> +#define RED_MAX_PORTS_PER_LCORE (4)
> >> +
> >> +struct rte_event_dispatcher_lcore_port {
> >> +       uint8_t port_id;
> >> +       uint16_t batch_size;
> >> +       uint64_t timeout;
> >> +};
> >> +
> >> +struct rte_event_dispatcher_lcore {
> >> +       uint8_t num_ports;
> >> +       struct rte_event_dispatcher_lcore_port ports[RED_MAX_PORTS_PER_LCORE];
> >> +};
> >> +
> >> +struct rte_event_dispatcher_cb {
> >> +       rte_event_dispatcher_cb_t cb_fun;
> >> +       void *cb_data;
> >> +};
> >> +
> >> +struct rte_event_dispatcher {
> >> +       uint8_t id;
> >> +       uint8_t event_dev_id;
> >> +       int socket_id;
> >> +       uint32_t service_id;
> >> +       struct rte_event_dispatcher_lcore lcores[RTE_MAX_LCORE];
> >> +       struct rte_event_dispatcher_cb queue_cbs[UINT8_MAX];
> >> +       struct rte_event_dispatcher_cb fallback;
> >> +};
> >> +
> >> +static struct rte_event_dispatcher *dispatchers[UINT8_MAX];
> >> +
> >> +static bool
> >> +red_has_dispatcher(uint8_t id)
> >> +{
> >> +       return dispatchers[id] != NULL;
> >> +}
> >> +
> >> +static struct rte_event_dispatcher *
> >> +red_get_dispatcher(uint8_t id)
> >> +{
> >> +       return dispatchers[id];
> >> +}
> >> +
> >> +static void
> >> +red_set_dispatcher(uint8_t id, struct rte_event_dispatcher *dispatcher)
> >> +{
> >> +       dispatchers[id] = dispatcher;
> >> +}
> >> +
> >> +#define RED_VALID_ID_OR_RET_EINVAL(id)                                 \
> >> +       do {                                                            \
> >> +               if (unlikely(!red_has_dispatcher(id))) {                \
> >> +                       RTE_EDEV_LOG_ERR("Invalid dispatcher id %d\n", id); \
> >> +                       return -EINVAL;                                 \
> >> +               }                                                       \
> >> +       } while (0)
> >> +
> >> +static struct rte_event_dispatcher_cb *
> >> +red_lookup_cb(struct rte_event_dispatcher *dispatcher, uint8_t queue_id)
> >> +{
> >> +       struct rte_event_dispatcher_cb *cb = &dispatcher->queue_cbs[queue_id];
> >> +
> >> +       if (unlikely(cb->cb_fun == NULL))
> >> +           cb = &dispatcher->fallback;
> >> +
> >> +       return cb;
> >> +}
> >> +
> >> +static void
> >> +red_dispatch_events(struct rte_event_dispatcher *dispatcher,
> >> +                   struct rte_event *events, uint16_t num_events)
> >> +{
> >> +       uint16_t cb_start;
> >> +       uint16_t cb_len;
> >> +
> >> +       for (cb_start = 0; cb_start < num_events; cb_start += cb_len) {
> >> +               uint16_t cb_end = cb_start;
> >> +               uint8_t queue_id = events[cb_start].queue_id;
> >> +               struct rte_event_dispatcher_cb *cb;
> >> +
> >> +               while (++cb_end < num_events &&
> >> +                      events[cb_end].queue_id == queue_id)
> >> +                       ;
> >> +
> >> +               cb_len = cb_end - cb_start;
> >> +
> >> +               cb = red_lookup_cb(dispatcher, queue_id);
> >> +
> >> +               if (unlikely(cb->cb_fun == NULL)) {
> >> +                       RTE_EDEV_LOG_ERR("Attempted to dispatch %d events "
> >> +                                        "for queue id %d, but no queue or "
> >> +                                        "fallback cb were configured\n",
> >> +                                        cb_len, queue_id);
> >> +                       continue;
> >> +               }
> >> +
> >> +               cb->cb_fun(&events[cb_start], cb_len, cb->cb_data);
> >> +       }
> >> +}
> >> +
> >> +static void
> >> +red_port_dequeue(struct rte_event_dispatcher *dispatcher,
> >> +                struct rte_event_dispatcher_lcore_port *port)
> >> +{
> >> +       uint16_t batch_size = port->batch_size;
> >> +       struct rte_event events[batch_size];
> >> +       uint16_t n;
> >> +
> >> +       n = rte_event_dequeue_burst(dispatcher->event_dev_id, port->port_id,
> >> +                                   events, batch_size, port->timeout);
> >> +
> >> +       red_dispatch_events(dispatcher, events, n);
> >> +}
> >> +
> >> +static int32_t
> >> +red_lcore_process(void *userdata)
> >> +{
> >> +       uint16_t i;
> >> +       struct rte_event_dispatcher *dispatcher = userdata;
> >> +       unsigned int lcore_id = rte_lcore_id();
> >> +       struct rte_event_dispatcher_lcore *lcore =
> >> +               &dispatcher->lcores[lcore_id];
> >> +
> >> +       for (i = 0; i < lcore->num_ports; i++) {
> >> +               struct rte_event_dispatcher_lcore_port *port =
> >> +                       &lcore->ports[i];
> >> +
> >> +               red_port_dequeue(dispatcher, port);
> >> +       }
> >> +
> >> +       return 0;
> >> +}
> >> +
> >> +static int
> >> +red_service_runstate_set(uint32_t service_id, int state)
> >> +{
> >> +       int rc;
> >> +
> >> +       rc = rte_service_component_runstate_set(service_id, state);
> >> +
> >> +       if (rc)
> >> +               RTE_EDEV_LOG_ERR("Error %d occurred while setting service "
> >> +                                "component run state to %d\n", rc, state);
> >> +
> >> +       return rc;
> >> +}
> >> +
> >> +static int
> >> +red_service_register(struct rte_event_dispatcher *dispatcher)
> >> +{
> >> +       struct rte_service_spec service = {
> >> +               .callback = red_lcore_process,
> >> +               .callback_userdata = dispatcher,
> >> +               .capabilities = RTE_SERVICE_CAP_MT_SAFE,
> >> +               .socket_id = dispatcher->socket_id
> >> +       };
> >> +       int rc;
> >> +
> >> +       snprintf(service.name, RTE_SERVICE_NAME_MAX - 1, "red_%d",
> >> +                dispatcher->id);
> >> +
> >> +       rc = rte_service_component_register(&service, &dispatcher->service_id);
> >> +
> >> +       if (rc)
> >> +               RTE_EDEV_LOG_ERR("Registration of event dispatcher service "
> >> +                                "%s failed with error code %d\n",
> >> +                                service.name, rc);
> >> +
> >> +       rc = red_service_runstate_set(dispatcher->service_id, 1);
> >> +
> >> +       if (rc)
> >> +               rte_service_component_unregister(dispatcher->service_id);
> >> +
> >> +       return rc;
> >> +}
> >> +
> >> +static int
> >> +red_service_unregister(struct rte_event_dispatcher *dispatcher)
> >> +{
> >> +       int rc;
> >> +
> >> +       rc = red_service_runstate_set(dispatcher->service_id, 0);
> >> +
> >> +       if (rc)
> >> +               return rc;
> >> +
> >> +       rc = rte_service_component_unregister(dispatcher->service_id);
> >> +
> >> +       if (rc)
> >> +               RTE_EDEV_LOG_ERR("Unregistration of event dispatcher service "
> >> +                                "failed with error code %d\n", rc);
> >> +
> >> +       return rc;
> >> +}
> >> +
> >> +int
> >> +rte_event_dispatcher_create(uint8_t id, uint8_t event_dev_id)
> >> +{
> >> +       int socket_id;
> >> +       struct rte_event_dispatcher *dispatcher;
> >> +       int rc;
> >> +
> >> +       if (red_has_dispatcher(id)) {
> >> +               RTE_EDEV_LOG_ERR("Dispatcher with id %d already exists\n",
> >> +                                id);
> >> +               return -EEXIST;
> >> +       }
> >> +
> >> +       socket_id = rte_event_dev_socket_id(event_dev_id);
> >> +
> >> +       dispatcher =
> >> +               rte_malloc_socket("event dispatcher",
> >> +                                 sizeof(struct rte_event_dispatcher),
> >> +                                 RTE_CACHE_LINE_SIZE, socket_id);
> >> +
> >> +       if (dispatcher == NULL) {
> >> +               RTE_EDEV_LOG_ERR("Unable to allocate memory for event "
> >> +                                "dispatcher\n");
> >> +               return -ENOMEM;
> >> +       }
> >> +
> >> +       *dispatcher = (struct rte_event_dispatcher) {
> >> +               .id = id,
> >> +               .event_dev_id = event_dev_id,
> >> +               .socket_id = socket_id
> >> +       };
> >> +
> >> +       rc = red_service_register(dispatcher);
> >> +
> >> +       if (rc < 0) {
> >> +               rte_free(dispatcher);
> >> +               return rc;
> >> +       }
> >> +
> >> +       red_set_dispatcher(id, dispatcher);
> >> +
> >> +       return 0;
> >> +}
> >> +
> >> +int
> >> +rte_event_dispatcher_free(uint8_t id)
> >> +{
> >> +       struct rte_event_dispatcher *dispatcher;
> >> +       int rc;
> >> +
> >> +       RED_VALID_ID_OR_RET_EINVAL(id);
> >> +       dispatcher = red_get_dispatcher(id);
> >> +
> >> +       rc = red_service_unregister(dispatcher);
> >> +
> >> +       if (rc)
> >> +               return rc;
> >> +
> >> +       red_set_dispatcher(id, NULL);
> >> +
> >> +       rte_free(dispatcher);
> >> +
> >> +       return 0;
> >> +}
> >> +
> >> +int
> >> +rte_event_dispatcher_service_id_get(uint8_t id, uint32_t *service_id)
> >> +{
> >> +       struct rte_event_dispatcher *dispatcher;
> >> +
> >> +       RED_VALID_ID_OR_RET_EINVAL(id);
> >> +       dispatcher = red_get_dispatcher(id);
> >> +
> >> +       *service_id = dispatcher->service_id;
> >> +
> >> +       return 0;
> >> +}
> >> +
> >> +static int16_t
> >> +lcore_port_index(struct rte_event_dispatcher_lcore *lcore,
> >> +                uint8_t event_port_id)
> >> +{
> >> +       uint16_t i;
> >> +
> >> +       for (i = 0; i < lcore->num_ports; i++) {
> >> +               struct rte_event_dispatcher_lcore_port *port =
> >> +                       &lcore->ports[i];
> >> +               if (port->port_id == event_port_id)
> >> +                       return i;
> >> +       }
> >> +
> >> +       return -1;
> >> +}
> >> +
> >> +int
> >> +rte_event_dispatcher_bind_port_to_lcore(uint8_t id, uint8_t event_port_id,
> >> +                                       uint16_t batch_size, uint64_t timeout,
> >> +                                       unsigned int lcore_id)
> >> +{
> >> +       struct rte_event_dispatcher *dispatcher;
> >> +       struct rte_event_dispatcher_lcore *lcore;
> >> +       struct rte_event_dispatcher_lcore_port *port;
> >> +
> >> +       RED_VALID_ID_OR_RET_EINVAL(id);
> >> +       dispatcher = red_get_dispatcher(id);
> >> +
> >> +       lcore = &dispatcher->lcores[lcore_id];
> >> +
> >> +       if (lcore->num_ports == RED_MAX_PORTS_PER_LCORE)
> >> +               return -ENOMEM;
> >> +
> >> +       if (lcore_port_index(lcore, event_port_id) >= 0)
> >> +               return -EEXIST;
> >> +
> >> +       port = &lcore->ports[lcore->num_ports];
> >> +
> >> +       *port = (struct rte_event_dispatcher_lcore_port) {
> >> +               .port_id = event_port_id,
> >> +               .batch_size = batch_size,
> >> +               .timeout = timeout
> >> +       };
> >> +
> >> +       lcore->num_ports++;
> >> +
> >> +       return 0;
> >> +}
> >> +
> >> +int
> >> +rte_event_dispatcher_unbind_port_from_lcore(uint8_t id, uint8_t event_port_id,
> >> +                                           unsigned int lcore_id)
> >> +{
> >> +       struct rte_event_dispatcher *dispatcher;
> >> +       struct rte_event_dispatcher_lcore *lcore;
> >> +       int16_t port_idx;
> >> +       struct rte_event_dispatcher_lcore_port *port;
> >> +       struct rte_event_dispatcher_lcore_port *last;
> >> +
> >> +       RED_VALID_ID_OR_RET_EINVAL(id);
> >> +       dispatcher = red_get_dispatcher(id);
> >> +
> >> +       lcore = &dispatcher->lcores[lcore_id];
> >> +
> >> +       port_idx = lcore_port_index(lcore, event_port_id);
> >> +
> >> +       if (port_idx < 0)
> >> +               return -ENOENT;
> >> +
> >> +       port = &lcore->ports[port_idx];
> >> +       last = &lcore->ports[lcore->num_ports - 1];
> >> +
> >> +       if (port != last)
> >> +               *port = *last;
> >> +
> >> +       lcore->num_ports--;
> >> +
> >> +       return 0;
> >> +}
> >> +
> >> +static int
> >> +red_cb_manage(uint8_t id, uint8_t queue_id, bool reg, bool fallback,
> >> +          rte_event_dispatcher_cb_t cb_fun, void *cb_data)
> >> +{
> >> +       struct rte_event_dispatcher *dispatcher;
> >> +       struct rte_event_dispatcher_cb *cb;
> >> +
> >> +       RED_VALID_ID_OR_RET_EINVAL(id);
> >> +       dispatcher = red_get_dispatcher(id);
> >> +
> >> +       if (fallback)
> >> +               cb = &dispatcher->fallback;
> >> +       else
> >> +               cb = &dispatcher->queue_cbs[queue_id];
> >> +
> >> +       if (reg && cb->cb_fun != NULL)
> >> +               return -EEXIST;
> >> +
> >> +       if (!reg && cb->cb_fun == NULL)
> >> +               return -ENOENT;
> >> +
> >> +       *cb = (struct rte_event_dispatcher_cb) {
> >> +               .cb_fun = cb_fun,
> >> +               .cb_data = cb_data
> >> +       };
> >> +
> >> +       return 0;
> >> +}
> >> +
> >> +int
> >> +rte_event_dispatcher_register(uint8_t id, uint8_t queue_id,
> >> +                             rte_event_dispatcher_cb_t cb_fun, void *cb_data)
> >> +{
> >> +       return red_cb_manage(id, queue_id, true, false, cb_fun, cb_data);
> >> +}
> >> +
> >> +int
> >> +rte_event_dispatcher_unregister(uint8_t id, uint8_t queue_id)
> >> +{
> >> +       return red_cb_manage(id, queue_id, false, false, NULL, NULL);
> >> +}
> >> +
> >> +int
> >> +rte_event_dispatcher_register_fallback(uint8_t id,
> >> +                                      rte_event_dispatcher_cb_t cb_fun,
> >> +                                      void *cb_data)
> >> +{
> >> +       return red_cb_manage(id, 0, true, true, cb_fun, cb_data);
> >> +}
> >> +
> >> +int
> >> +rte_event_dispatcher_unregister_fallback(uint8_t id)
> >> +{
> >> +       return red_cb_manage(id, 0, false, true, NULL, NULL);
> >> +}
> >> diff --git a/lib/librte_eventdev/rte_event_dispatcher.h b/lib/librte_eventdev/rte_event_dispatcher.h
> >> new file mode 100644
> >> index 0000000000..11f57571ab
> >> --- /dev/null
> >> +++ b/lib/librte_eventdev/rte_event_dispatcher.h
> >> @@ -0,0 +1,251 @@
> >> +/* SPDX-License-Identifier: BSD-3-Clause
> >> + * Copyright(c) 2021 Ericsson AB
> >> + */
> >> +
> >> +#ifndef __RTE_EVENT_DISPATCHER_H__
> >> +#define __RTE_EVENT_DISPATCHER_H__
> >> +
> >> +/**
> >> + * @file
> >> + *
> >> + * RTE Event Dispatcher
> >> + *
> >> + */
> >> +
> >> +#ifdef __cplusplus
> >> +extern "C" {
> >> +#endif
> >> +
> >> +#include <rte_eventdev.h>
> >> +
> >> +/**
> >> + * Function prototype for dispatcher callbacks.
> >> + *
> >> + * @param events
> >> + *  Pointer to an array of events.
> >> + *
> >> + * @param num
> >> + *  The number of events in the @p events array.
> >> + *
> >> + * @param cb_data
> >> + *  The pointer supplied by the application in
> >> + *  rte_event_dispatcher_register() or
> >> + *  rte_event_dispatcher_register_fallback().
> >> + */
> >> +
> >> +typedef void (*rte_event_dispatcher_cb_t)(struct rte_event *events,
> >> +                                         uint16_t num, void *cb_data);
> >> +
> >> +/**
> >> + * Create an event dispatcher with the specified id.
> >> + *
> >> + * @param id
> >> + *  An application-specified, unique (across all event dispatcher
> >> + *  instances) identifier.
> >> + *
> >> + * @param event_dev_id
> >> + *  The identifier of the event device from which this event dispatcher
> >> + *  will dequeue events.
> >> + *
> >> + * @return
> >> + *   - 0: Success
> >> + *   - <0: Error code on failure
> >> + */
> >> +__rte_experimental
> >> +int
> >> +rte_event_dispatcher_create(uint8_t id, uint8_t event_dev_id);
> >> +
> >> +/**
> >> + * Frees an event dispatcher with the specified id.
> >> + *
> >> + * @param id
> >> + *  The event dispatcher identifier.
> >> + *
> >> + * @return
> >> + *   - 0: Success
> >> + *   - <0: Error code on failure
> >> + */
> >> +__rte_experimental
> >> +int
> >> +rte_event_dispatcher_free(uint8_t id);
> >> +
> >> +/**
> >> + * Retrieve the service identifier of the event dispatcher.
> >> + *
> >> + * @param id
> >> + *  The event dispatcher identifier.
> >> + *
> >> + * @param [out] service_id
> >> + *  A pointer to a caller-supplied buffer where the event dispatcher's
> >> + *  service id will be stored.
> >> + *
> >> + * @return
> >> + *  - 0: Success
> >> + *  - <0: Error code on failure.
> >> + */
> >> +__rte_experimental
> >> +int
> >> +rte_event_dispatcher_service_id_get(uint8_t id, uint32_t *service_id);
> >> +
> >> +/**
> >> + * Binds an event device port to a specific lcore on the specified
> >> + * event dispatcher.
> >> + *
> >> + * This function configures an event dispatcher to dequeue events from
> >> + * an event device port (as specified by @p event_port_id), in case
> >> + * its service function is run on particular lcore (as specified by @p
> >> + * lcore_id).
> >> + *
> >> + * Multiple event device ports may be bound to the same lcore. A
> >> + * particular port may only be bound to one lcore.
> >> + *
> >> + * If the event dispatcher service is mapped (with
> >> + * rte_service_map_lcore_set()) to a lcore for which no ports are
> >> + * bound, the service function will be a no-operation.
> >> + *
> >> + * @param id
> >> + *  The event dispatcher identifier.
> >> + *
> >> + * @param event_port_id
> >> + *  The event device port identifier.
> >> + *
> >> + * @param batch_size
> >> + *  The batch size to use in rte_event_dequeue_burst(), for the
> >> + *  configured event device port and lcore.
> >> + *
> >> + * @param timeout
> >> + *  The timeout parameter to use in rte_event_dequeue_burst(), for the
> >> + *  configured event device port and lcore.
> >> + *
> >> + * @return
> >> + *  - 0: Success
> >> + *  - <0: Error code on failure.
> >> + */
> >> +__rte_experimental
> >> +int
> >> +rte_event_dispatcher_bind_port_to_lcore(uint8_t id, uint8_t event_port_id,
> >> +                                       uint16_t batch_size, uint64_t timeout,
> >> +                                       unsigned int lcore_id);
> >> +
> >> +/**
> >> + * Unbind an event device port from a specific lcore.
> >> + *
> >> + * @param id
> >> + *  The event dispatcher identifier.
> >> + *
> >> + * @param event_port_id
> >> + *  The event device port identifier.
> >> + *
> >> + * @return
> >> + *  - 0: Success
> >> + *  - <0: Error code on failure.
> >> + */
> >> +__rte_experimental
> >> +int
> >> +rte_event_dispatcher_unbind_port_from_lcore(uint8_t id, uint8_t event_port_id,
> >> +                                           unsigned int lcore_id);
> >> +
> >> +/**
> >> + * Register a callback function for the specified queue identifier.
> >> + *
> >> + * At most one callback may be registered per queue id.
> >> + *
> >> + * The same callback function may be registered for multiple queue ids.
> >> + *
> >> + * For each callback invocation, events belonging to a single queue id
> >> + * will dispatched.
> >> + *
> >> + * @param id
> >> + *  The event dispatcher identifier.
> >> + *
> >> + * @param queue_id
> >> + *  The event device queue id for which @p cb_fun should be called.
> >> + *
> >> + * @param cb_fun
> >> + *  The callback function.
> >> + *
> >> + * @param cb_data
> >> + *  A pointer to some application-specific opaque data (or NULL),
> >> + *  which is supplied back to the application in the callback.
> >> + *
> >> + * @return
> >> + *  - 0: Success
> >> + *  - <0: Error code on failure.
> >> + */
> >> +__rte_experimental
> >> +int
> >> +rte_event_dispatcher_register(uint8_t id, uint8_t queue_id,
> >> +                             rte_event_dispatcher_cb_t cb_fun, void *cb_data);
> >> +
> >> +/**
> >> + * Unregister a callback function for the specified queue identifier.
> >> + *
> >> + * @param id
> >> + *  The event dispatcher identifier.
> >> + *
> >> + * @param queue_id
> >> + *  The event device queue id for which the callback should be removed.
> >> + *
> >> + * @return
> >> + *  - 0: Success
> >> + *  - <0: Error code on failure.
> >> + */
> >> +__rte_experimental
> >> +int
> >> +rte_event_dispatcher_unregister(uint8_t id, uint8_t queue_id);
> >> +
> >> +/**
> >> + * Register a fallback callback function for the specified queue
> >> + * identifier.
> >> + *
> >> + * Only events for which no queue-specific callback function will be
> >> + * dispatched to the @p cb_fun callback.
> >> + *
> >> + * At most one callback fallback function may be registered.
> >> + *
> >> + * For each callback invocation, only events belonging to a single
> >> + * queue id will be included.
> >> + *
> >> + * If the event dispatcher encounters an event with a queue id for
> >> + * which the application has not registered any specific callback, and
> >> + * there is also no fallback configured, the event will be dropped.
> >> + *
> >> + * @param id
> >> + *  The event dispatcher identifier.
> >> + *
> >> + * @param cb_fun
> >> + *  The fallback callback function.
> >> + *
> >> + * @param cb_data
> >> + *  A pointer to some application-specific opaque data (or NULL),
> >> + *  which is supplied back to the application in the callback.
> >> + *
> >> + * @return
> >> + *  - 0: Success
> >> + *  - <0: Error code on failure.
> >> + */
> >> +__rte_experimental
> >> +int
> >> +rte_event_dispatcher_register_fallback(uint8_t id,
> >> +                                      rte_event_dispatcher_cb_t cb_fun,
> >> +                                      void *cb_data);
> >> +
> >> +/**
> >> + * Unregister the fallback callback function.
> >> + *
> >> + * @param id
> >> + *  The event dispatcher identifier.
> >> + *
> >> + * @return
> >> + *  - 0: Success
> >> + *  - <0: Error code on failure.
> >> + */
> >> +__rte_experimental
> >> +int
> >> +rte_event_dispatcher_unregister_fallback(uint8_t id);
> >> +
> >> +#ifdef __cplusplus
> >> +}
> >> +#endif
> >> +
> >> +#endif /* __RTE_EVENT_DISPATCHER__ */
> >> diff --git a/lib/librte_eventdev/rte_eventdev_version.map b/lib/librte_eventdev/rte_eventdev_version.map
> >> index 91a62cd077..dcb887601b 100644
> >> --- a/lib/librte_eventdev/rte_eventdev_version.map
> >> +++ b/lib/librte_eventdev/rte_eventdev_version.map
> >> @@ -134,4 +134,14 @@ EXPERIMENTAL {
> >>          __rte_eventdev_trace_crypto_adapter_queue_pair_del;
> >>          __rte_eventdev_trace_crypto_adapter_start;
> >>          __rte_eventdev_trace_crypto_adapter_stop;
> >> +
> >> +       rte_event_dispatcher_create;
> >> +       rte_event_dispatcher_free;
> >> +       rte_event_dispatcher_service_id_get;
> >> +       rte_event_dispatcher_bind_port_to_lcore;
> >> +       rte_event_dispatcher_unbind_port_from_lcore;
> >> +       rte_event_dispatcher_register;
> >> +       rte_event_dispatcher_unregister;
> >> +       rte_event_dispatcher_register_fallback;
> >> +       rte_event_dispatcher_unregister_fallback;
> >>   };
> >> --
> >> 2.25.1
> >>
>
  
Mattias Rönnblom March 15, 2021, 2:44 p.m. UTC | #6
On 2021-03-07 14:04, Jerin Jacob wrote:
> On Fri, Feb 26, 2021 at 1:31 PM Mattias Rönnblom
> <mattias.ronnblom@ericsson.com> wrote:
>> On 2021-02-25 13:32, Jerin Jacob wrote:
>>> On Fri, Feb 19, 2021 at 12:00 AM Mattias Rönnblom
>>> <mattias.ronnblom@ericsson.com> wrote:
>>>> The purpose of the event dispatcher is primarily to decouple different
>>>> parts of an application (e.g., processing pipeline stages), which
>>>> share the same underlying event device.
>>>>
>>>> The event dispatcher replaces the conditional logic (often, a switch
>>>> statement) that typically follows an event device dequeue operation,
>>>> where events are dispatched to different parts of the application
>>>> based on the destination queue id.
>>> # If the device has all type queue[1] this RFC would restrict to
>>> use queue ONLY as stage. A stage can be a Queue Type also.
>>> How we can abstract this in this model?
>>
>> "All queue type" is about scheduling policy. I would think that would be
>> independent of the "logical endpoint" of the event (i.e., the queue id).
>> I feel like I'm missing something here.
> Each queue type also can be represented as a stage.
> For example, If the system has only one queue, the Typical IPsec
> outbound stages can be
> Q0-Ordered(For SA lookup) -> Q0(Atomic)(For Sequence number update) ->
> Q0(Orderd)(Crypto operation)->Q0(Atomic)(Send on wire)


OK, this makes sense.


Would such an application want to add a callback 
per-queue-per-sched-type, or just per-sched-type? In your example, if 
you would have a queue Q1 as well, would want to have the option to have 
different callbacks for atomic-type events on Q0 and Q1?


Would you want to dispatch based on anything else in the event? You 
could basically do it on any field (flow id, priority, etc.), but is 
there some other field that's commonly used to denote a processing stage?


>>
>>> # Also, I think, it may make sense to add this type of infrastructure as
>>> helper functions as these are built on top of existing APIs i.e There
>>> is no support
>>> required from the driver to establish this model. IMO, If we need to
>>> add such support as
>>> one fixed set of functionality, we could have helper APIs to express a certain
>>> usage of eventdev. Rather defining the that's only way to do this.
>>> I think, A helper function can be used to as abstraction to define
>>> this kind of model.
>>>
>>> # Also, There is function pointer overhead and aggregating the events
>>> in implementation,
>>> That may be not always "the" optimized model of making it work vs switch case in
>>> application.
>>
>> Sure, but what to do in a reasonable generic framework?
>>
>>
>> If you are very sensitive to that 20 cc or whatever function pointer
>> call, you won't use this library. Or you will, and use static linking
>> and LTO to get rid of that overhead.
>>
>>
>> Probably, you have a few queues, not many. Probably, your dequeue bursts
>> are large, if the system load is high (and otherwise, you don't care
>> about efficiency). Then, you will have at least of couple of events per
>> function call.
> I am fine with this library and exposing it as a function pointer if
> someone needs to
> have a "helper" function to model the system around this logic.
>
> This RFC looks good to me in general. I would suggest to make it as
>
> - Helper functions i.e if someone chooses to do write the stage in
> this way, it can be enabled through this helper function.
> By choosing as helper function it depicts, this is one way to do the
> stuff but the NOT ONLY WAY.
> - Abstract stages as a queue(which already added in the patch) and
> each type in the queue for all type queue cases.
> - Enhance test-eventdev to showcase the functionality and performance
> of these helpers.
>
> Thanks for the RFC.
>
>>
>>> [1]
>>> See RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES in
>>> https://protect2.fireeye.com/v1/url?k=dcf3a2b9-83689b94-dcf3e222-8692dc8284cb-5ba19813a1556a85&q=1&e=0ff1861f-8e24-453c-a93b-73fd88e0f316&u=https%3A%2F%2Fdoc.dpdk.org%2Fguides%2Fprog_guide%2Feventdev.html
>>>
>>>
>>>> The concept is similar to a UNIX file descriptor event loop library.
>>>> Instead of tying callback functions to fds as for example libevent
>>>> does, the event dispatcher binds callbacks to queue ids.
>>>>
>>>> An event dispatcher is configured to dequeue events from a specific
>>>> event device, and ties into the service core framework, to do its (and
>>>> the application's) work.
>>>>
>>>> The event dispatcher provides a convenient way for an eventdev-based
>>>> application to use service cores for application-level processing, and
>>>> thus for sharing those cores with other DPDK services.
>>>>
>>>> Signed-off-by: Mattias Rönnblom <mattias.ronnblom@ericsson.com>
>>>> ---
>>>>    lib/librte_eventdev/Makefile                 |   2 +
>>>>    lib/librte_eventdev/meson.build              |   6 +-
>>>>    lib/librte_eventdev/rte_event_dispatcher.c   | 420 +++++++++++++++++++
>>>>    lib/librte_eventdev/rte_event_dispatcher.h   | 251 +++++++++++
>>>>    lib/librte_eventdev/rte_eventdev_version.map |  10 +
>>>>    5 files changed, 687 insertions(+), 2 deletions(-)
>>>>    create mode 100644 lib/librte_eventdev/rte_event_dispatcher.c
>>>>    create mode 100644 lib/librte_eventdev/rte_event_dispatcher.h
>>>>
>>>> diff --git a/lib/librte_eventdev/Makefile b/lib/librte_eventdev/Makefile
>>>> index 0715256bb4..614d53af1b 100644
>>>> --- a/lib/librte_eventdev/Makefile
>>>> +++ b/lib/librte_eventdev/Makefile
>>>> @@ -26,6 +26,7 @@ SRCS-y += rte_event_eth_rx_adapter.c
>>>>    SRCS-y += rte_event_timer_adapter.c
>>>>    SRCS-y += rte_event_crypto_adapter.c
>>>>    SRCS-y += rte_event_eth_tx_adapter.c
>>>> +SRCS-y += rte_event_dispatcher.c
>>>>
>>>>    # export include files
>>>>    SYMLINK-y-include += rte_eventdev.h
>>>> @@ -40,6 +41,7 @@ SYMLINK-y-include += rte_event_timer_adapter.h
>>>>    SYMLINK-y-include += rte_event_timer_adapter_pmd.h
>>>>    SYMLINK-y-include += rte_event_crypto_adapter.h
>>>>    SYMLINK-y-include += rte_event_eth_tx_adapter.h
>>>> +SYMLINK-y-include += rte_event_dispatcher.h
>>>>
>>>>    # versioning export map
>>>>    EXPORT_MAP := rte_eventdev_version.map
>>>> diff --git a/lib/librte_eventdev/meson.build b/lib/librte_eventdev/meson.build
>>>> index d1f25ee5ca..2ca81983b5 100644
>>>> --- a/lib/librte_eventdev/meson.build
>>>> +++ b/lib/librte_eventdev/meson.build
>>>> @@ -13,7 +13,8 @@ sources = files('rte_eventdev.c',
>>>>                   'rte_event_eth_rx_adapter.c',
>>>>                   'rte_event_timer_adapter.c',
>>>>                   'rte_event_crypto_adapter.c',
>>>> -               'rte_event_eth_tx_adapter.c')
>>>> +               'rte_event_eth_tx_adapter.c',
>>>> +               'rte_event_dispatcher.c')
>>>>    headers = files('rte_eventdev.h',
>>>>                   'rte_eventdev_pmd.h',
>>>>                   'rte_eventdev_pmd_pci.h',
>>>> @@ -25,5 +26,6 @@ headers = files('rte_eventdev.h',
>>>>                   'rte_event_timer_adapter.h',
>>>>                   'rte_event_timer_adapter_pmd.h',
>>>>                   'rte_event_crypto_adapter.h',
>>>> -               'rte_event_eth_tx_adapter.h')
>>>> +               'rte_event_eth_tx_adapter.h',
>>>> +               'rte_event_dispatcher.h')
>>>>    deps += ['ring', 'ethdev', 'hash', 'mempool', 'mbuf', 'timer', 'cryptodev']
>>>> diff --git a/lib/librte_eventdev/rte_event_dispatcher.c b/lib/librte_eventdev/rte_event_dispatcher.c
>>>> new file mode 100644
>>>> index 0000000000..1c7e55a752
>>>> --- /dev/null
>>>> +++ b/lib/librte_eventdev/rte_event_dispatcher.c
>>>> @@ -0,0 +1,420 @@
>>>> +/* SPDX-License-Identifier: BSD-3-Clause
>>>> + * Copyright(c) 2021 Ericsson AB
>>>> + */
>>>> +
>>>> +#include <stdbool.h>
>>>> +#include <stdint.h>
>>>> +
>>>> +#include <rte_lcore.h>
>>>> +#include <rte_service_component.h>
>>>> +#include <rte_eventdev_pmd.h>
>>>> +
>>>> +#include <rte_event_dispatcher.h>
>>>> +
>>>> +#define RED_MAX_PORTS_PER_LCORE (4)
>>>> +
>>>> +struct rte_event_dispatcher_lcore_port {
>>>> +       uint8_t port_id;
>>>> +       uint16_t batch_size;
>>>> +       uint64_t timeout;
>>>> +};
>>>> +
>>>> +struct rte_event_dispatcher_lcore {
>>>> +       uint8_t num_ports;
>>>> +       struct rte_event_dispatcher_lcore_port ports[RED_MAX_PORTS_PER_LCORE];
>>>> +};
>>>> +
>>>> +struct rte_event_dispatcher_cb {
>>>> +       rte_event_dispatcher_cb_t cb_fun;
>>>> +       void *cb_data;
>>>> +};
>>>> +
>>>> +struct rte_event_dispatcher {
>>>> +       uint8_t id;
>>>> +       uint8_t event_dev_id;
>>>> +       int socket_id;
>>>> +       uint32_t service_id;
>>>> +       struct rte_event_dispatcher_lcore lcores[RTE_MAX_LCORE];
>>>> +       struct rte_event_dispatcher_cb queue_cbs[UINT8_MAX];
>>>> +       struct rte_event_dispatcher_cb fallback;
>>>> +};
>>>> +
>>>> +static struct rte_event_dispatcher *dispatchers[UINT8_MAX];
>>>> +
>>>> +static bool
>>>> +red_has_dispatcher(uint8_t id)
>>>> +{
>>>> +       return dispatchers[id] != NULL;
>>>> +}
>>>> +
>>>> +static struct rte_event_dispatcher *
>>>> +red_get_dispatcher(uint8_t id)
>>>> +{
>>>> +       return dispatchers[id];
>>>> +}
>>>> +
>>>> +static void
>>>> +red_set_dispatcher(uint8_t id, struct rte_event_dispatcher *dispatcher)
>>>> +{
>>>> +       dispatchers[id] = dispatcher;
>>>> +}
>>>> +
>>>> +#define RED_VALID_ID_OR_RET_EINVAL(id)                                 \
>>>> +       do {                                                            \
>>>> +               if (unlikely(!red_has_dispatcher(id))) {                \
>>>> +                       RTE_EDEV_LOG_ERR("Invalid dispatcher id %d\n", id); \
>>>> +                       return -EINVAL;                                 \
>>>> +               }                                                       \
>>>> +       } while (0)
>>>> +
>>>> +static struct rte_event_dispatcher_cb *
>>>> +red_lookup_cb(struct rte_event_dispatcher *dispatcher, uint8_t queue_id)
>>>> +{
>>>> +       struct rte_event_dispatcher_cb *cb = &dispatcher->queue_cbs[queue_id];
>>>> +
>>>> +       if (unlikely(cb->cb_fun == NULL))
>>>> +           cb = &dispatcher->fallback;
>>>> +
>>>> +       return cb;
>>>> +}
>>>> +
>>>> +static void
>>>> +red_dispatch_events(struct rte_event_dispatcher *dispatcher,
>>>> +                   struct rte_event *events, uint16_t num_events)
>>>> +{
>>>> +       uint16_t cb_start;
>>>> +       uint16_t cb_len;
>>>> +
>>>> +       for (cb_start = 0; cb_start < num_events; cb_start += cb_len) {
>>>> +               uint16_t cb_end = cb_start;
>>>> +               uint8_t queue_id = events[cb_start].queue_id;
>>>> +               struct rte_event_dispatcher_cb *cb;
>>>> +
>>>> +               while (++cb_end < num_events &&
>>>> +                      events[cb_end].queue_id == queue_id)
>>>> +                       ;
>>>> +
>>>> +               cb_len = cb_end - cb_start;
>>>> +
>>>> +               cb = red_lookup_cb(dispatcher, queue_id);
>>>> +
>>>> +               if (unlikely(cb->cb_fun == NULL)) {
>>>> +                       RTE_EDEV_LOG_ERR("Attempted to dispatch %d events "
>>>> +                                        "for queue id %d, but no queue or "
>>>> +                                        "fallback cb were configured\n",
>>>> +                                        cb_len, queue_id);
>>>> +                       continue;
>>>> +               }
>>>> +
>>>> +               cb->cb_fun(&events[cb_start], cb_len, cb->cb_data);
>>>> +       }
>>>> +}
>>>> +
>>>> +static void
>>>> +red_port_dequeue(struct rte_event_dispatcher *dispatcher,
>>>> +                struct rte_event_dispatcher_lcore_port *port)
>>>> +{
>>>> +       uint16_t batch_size = port->batch_size;
>>>> +       struct rte_event events[batch_size];
>>>> +       uint16_t n;
>>>> +
>>>> +       n = rte_event_dequeue_burst(dispatcher->event_dev_id, port->port_id,
>>>> +                                   events, batch_size, port->timeout);
>>>> +
>>>> +       red_dispatch_events(dispatcher, events, n);
>>>> +}
>>>> +
>>>> +static int32_t
>>>> +red_lcore_process(void *userdata)
>>>> +{
>>>> +       uint16_t i;
>>>> +       struct rte_event_dispatcher *dispatcher = userdata;
>>>> +       unsigned int lcore_id = rte_lcore_id();
>>>> +       struct rte_event_dispatcher_lcore *lcore =
>>>> +               &dispatcher->lcores[lcore_id];
>>>> +
>>>> +       for (i = 0; i < lcore->num_ports; i++) {
>>>> +               struct rte_event_dispatcher_lcore_port *port =
>>>> +                       &lcore->ports[i];
>>>> +
>>>> +               red_port_dequeue(dispatcher, port);
>>>> +       }
>>>> +
>>>> +       return 0;
>>>> +}
>>>> +
>>>> +static int
>>>> +red_service_runstate_set(uint32_t service_id, int state)
>>>> +{
>>>> +       int rc;
>>>> +
>>>> +       rc = rte_service_component_runstate_set(service_id, state);
>>>> +
>>>> +       if (rc)
>>>> +               RTE_EDEV_LOG_ERR("Error %d occurred while setting service "
>>>> +                                "component run state to %d\n", rc, state);
>>>> +
>>>> +       return rc;
>>>> +}
>>>> +
>>>> +static int
>>>> +red_service_register(struct rte_event_dispatcher *dispatcher)
>>>> +{
>>>> +       struct rte_service_spec service = {
>>>> +               .callback = red_lcore_process,
>>>> +               .callback_userdata = dispatcher,
>>>> +               .capabilities = RTE_SERVICE_CAP_MT_SAFE,
>>>> +               .socket_id = dispatcher->socket_id
>>>> +       };
>>>> +       int rc;
>>>> +
>>>> +       snprintf(service.name, RTE_SERVICE_NAME_MAX - 1, "red_%d",
>>>> +                dispatcher->id);
>>>> +
>>>> +       rc = rte_service_component_register(&service, &dispatcher->service_id);
>>>> +
>>>> +       if (rc)
>>>> +               RTE_EDEV_LOG_ERR("Registration of event dispatcher service "
>>>> +                                "%s failed with error code %d\n",
>>>> +                                service.name, rc);
>>>> +
>>>> +       rc = red_service_runstate_set(dispatcher->service_id, 1);
>>>> +
>>>> +       if (rc)
>>>> +               rte_service_component_unregister(dispatcher->service_id);
>>>> +
>>>> +       return rc;
>>>> +}
>>>> +
>>>> +static int
>>>> +red_service_unregister(struct rte_event_dispatcher *dispatcher)
>>>> +{
>>>> +       int rc;
>>>> +
>>>> +       rc = red_service_runstate_set(dispatcher->service_id, 0);
>>>> +
>>>> +       if (rc)
>>>> +               return rc;
>>>> +
>>>> +       rc = rte_service_component_unregister(dispatcher->service_id);
>>>> +
>>>> +       if (rc)
>>>> +               RTE_EDEV_LOG_ERR("Unregistration of event dispatcher service "
>>>> +                                "failed with error code %d\n", rc);
>>>> +
>>>> +       return rc;
>>>> +}
>>>> +
>>>> +int
>>>> +rte_event_dispatcher_create(uint8_t id, uint8_t event_dev_id)
>>>> +{
>>>> +       int socket_id;
>>>> +       struct rte_event_dispatcher *dispatcher;
>>>> +       int rc;
>>>> +
>>>> +       if (red_has_dispatcher(id)) {
>>>> +               RTE_EDEV_LOG_ERR("Dispatcher with id %d already exists\n",
>>>> +                                id);
>>>> +               return -EEXIST;
>>>> +       }
>>>> +
>>>> +       socket_id = rte_event_dev_socket_id(event_dev_id);
>>>> +
>>>> +       dispatcher =
>>>> +               rte_malloc_socket("event dispatcher",
>>>> +                                 sizeof(struct rte_event_dispatcher),
>>>> +                                 RTE_CACHE_LINE_SIZE, socket_id);
>>>> +
>>>> +       if (dispatcher == NULL) {
>>>> +               RTE_EDEV_LOG_ERR("Unable to allocate memory for event "
>>>> +                                "dispatcher\n");
>>>> +               return -ENOMEM;
>>>> +       }
>>>> +
>>>> +       *dispatcher = (struct rte_event_dispatcher) {
>>>> +               .id = id,
>>>> +               .event_dev_id = event_dev_id,
>>>> +               .socket_id = socket_id
>>>> +       };
>>>> +
>>>> +       rc = red_service_register(dispatcher);
>>>> +
>>>> +       if (rc < 0) {
>>>> +               rte_free(dispatcher);
>>>> +               return rc;
>>>> +       }
>>>> +
>>>> +       red_set_dispatcher(id, dispatcher);
>>>> +
>>>> +       return 0;
>>>> +}
>>>> +
>>>> +int
>>>> +rte_event_dispatcher_free(uint8_t id)
>>>> +{
>>>> +       struct rte_event_dispatcher *dispatcher;
>>>> +       int rc;
>>>> +
>>>> +       RED_VALID_ID_OR_RET_EINVAL(id);
>>>> +       dispatcher = red_get_dispatcher(id);
>>>> +
>>>> +       rc = red_service_unregister(dispatcher);
>>>> +
>>>> +       if (rc)
>>>> +               return rc;
>>>> +
>>>> +       red_set_dispatcher(id, NULL);
>>>> +
>>>> +       rte_free(dispatcher);
>>>> +
>>>> +       return 0;
>>>> +}
>>>> +
>>>> +int
>>>> +rte_event_dispatcher_service_id_get(uint8_t id, uint32_t *service_id)
>>>> +{
>>>> +       struct rte_event_dispatcher *dispatcher;
>>>> +
>>>> +       RED_VALID_ID_OR_RET_EINVAL(id);
>>>> +       dispatcher = red_get_dispatcher(id);
>>>> +
>>>> +       *service_id = dispatcher->service_id;
>>>> +
>>>> +       return 0;
>>>> +}
>>>> +
>>>> +static int16_t
>>>> +lcore_port_index(struct rte_event_dispatcher_lcore *lcore,
>>>> +                uint8_t event_port_id)
>>>> +{
>>>> +       uint16_t i;
>>>> +
>>>> +       for (i = 0; i < lcore->num_ports; i++) {
>>>> +               struct rte_event_dispatcher_lcore_port *port =
>>>> +                       &lcore->ports[i];
>>>> +               if (port->port_id == event_port_id)
>>>> +                       return i;
>>>> +       }
>>>> +
>>>> +       return -1;
>>>> +}
>>>> +
>>>> +int
>>>> +rte_event_dispatcher_bind_port_to_lcore(uint8_t id, uint8_t event_port_id,
>>>> +                                       uint16_t batch_size, uint64_t timeout,
>>>> +                                       unsigned int lcore_id)
>>>> +{
>>>> +       struct rte_event_dispatcher *dispatcher;
>>>> +       struct rte_event_dispatcher_lcore *lcore;
>>>> +       struct rte_event_dispatcher_lcore_port *port;
>>>> +
>>>> +       RED_VALID_ID_OR_RET_EINVAL(id);
>>>> +       dispatcher = red_get_dispatcher(id);
>>>> +
>>>> +       lcore = &dispatcher->lcores[lcore_id];
>>>> +
>>>> +       if (lcore->num_ports == RED_MAX_PORTS_PER_LCORE)
>>>> +               return -ENOMEM;
>>>> +
>>>> +       if (lcore_port_index(lcore, event_port_id) >= 0)
>>>> +               return -EEXIST;
>>>> +
>>>> +       port = &lcore->ports[lcore->num_ports];
>>>> +
>>>> +       *port = (struct rte_event_dispatcher_lcore_port) {
>>>> +               .port_id = event_port_id,
>>>> +               .batch_size = batch_size,
>>>> +               .timeout = timeout
>>>> +       };
>>>> +
>>>> +       lcore->num_ports++;
>>>> +
>>>> +       return 0;
>>>> +}
>>>> +
>>>> +int
>>>> +rte_event_dispatcher_unbind_port_from_lcore(uint8_t id, uint8_t event_port_id,
>>>> +                                           unsigned int lcore_id)
>>>> +{
>>>> +       struct rte_event_dispatcher *dispatcher;
>>>> +       struct rte_event_dispatcher_lcore *lcore;
>>>> +       int16_t port_idx;
>>>> +       struct rte_event_dispatcher_lcore_port *port;
>>>> +       struct rte_event_dispatcher_lcore_port *last;
>>>> +
>>>> +       RED_VALID_ID_OR_RET_EINVAL(id);
>>>> +       dispatcher = red_get_dispatcher(id);
>>>> +
>>>> +       lcore = &dispatcher->lcores[lcore_id];
>>>> +
>>>> +       port_idx = lcore_port_index(lcore, event_port_id);
>>>> +
>>>> +       if (port_idx < 0)
>>>> +               return -ENOENT;
>>>> +
>>>> +       port = &lcore->ports[port_idx];
>>>> +       last = &lcore->ports[lcore->num_ports - 1];
>>>> +
>>>> +       if (port != last)
>>>> +               *port = *last;
>>>> +
>>>> +       lcore->num_ports--;
>>>> +
>>>> +       return 0;
>>>> +}
>>>> +
>>>> +static int
>>>> +red_cb_manage(uint8_t id, uint8_t queue_id, bool reg, bool fallback,
>>>> +          rte_event_dispatcher_cb_t cb_fun, void *cb_data)
>>>> +{
>>>> +       struct rte_event_dispatcher *dispatcher;
>>>> +       struct rte_event_dispatcher_cb *cb;
>>>> +
>>>> +       RED_VALID_ID_OR_RET_EINVAL(id);
>>>> +       dispatcher = red_get_dispatcher(id);
>>>> +
>>>> +       if (fallback)
>>>> +               cb = &dispatcher->fallback;
>>>> +       else
>>>> +               cb = &dispatcher->queue_cbs[queue_id];
>>>> +
>>>> +       if (reg && cb->cb_fun != NULL)
>>>> +               return -EEXIST;
>>>> +
>>>> +       if (!reg && cb->cb_fun == NULL)
>>>> +               return -ENOENT;
>>>> +
>>>> +       *cb = (struct rte_event_dispatcher_cb) {
>>>> +               .cb_fun = cb_fun,
>>>> +               .cb_data = cb_data
>>>> +       };
>>>> +
>>>> +       return 0;
>>>> +}
>>>> +
>>>> +int
>>>> +rte_event_dispatcher_register(uint8_t id, uint8_t queue_id,
>>>> +                             rte_event_dispatcher_cb_t cb_fun, void *cb_data)
>>>> +{
>>>> +       return red_cb_manage(id, queue_id, true, false, cb_fun, cb_data);
>>>> +}
>>>> +
>>>> +int
>>>> +rte_event_dispatcher_unregister(uint8_t id, uint8_t queue_id)
>>>> +{
>>>> +       return red_cb_manage(id, queue_id, false, false, NULL, NULL);
>>>> +}
>>>> +
>>>> +int
>>>> +rte_event_dispatcher_register_fallback(uint8_t id,
>>>> +                                      rte_event_dispatcher_cb_t cb_fun,
>>>> +                                      void *cb_data)
>>>> +{
>>>> +       return red_cb_manage(id, 0, true, true, cb_fun, cb_data);
>>>> +}
>>>> +
>>>> +int
>>>> +rte_event_dispatcher_unregister_fallback(uint8_t id)
>>>> +{
>>>> +       return red_cb_manage(id, 0, false, true, NULL, NULL);
>>>> +}
>>>> diff --git a/lib/librte_eventdev/rte_event_dispatcher.h b/lib/librte_eventdev/rte_event_dispatcher.h
>>>> new file mode 100644
>>>> index 0000000000..11f57571ab
>>>> --- /dev/null
>>>> +++ b/lib/librte_eventdev/rte_event_dispatcher.h
>>>> @@ -0,0 +1,251 @@
>>>> +/* SPDX-License-Identifier: BSD-3-Clause
>>>> + * Copyright(c) 2021 Ericsson AB
>>>> + */
>>>> +
>>>> +#ifndef __RTE_EVENT_DISPATCHER_H__
>>>> +#define __RTE_EVENT_DISPATCHER_H__
>>>> +
>>>> +/**
>>>> + * @file
>>>> + *
>>>> + * RTE Event Dispatcher
>>>> + *
>>>> + */
>>>> +
>>>> +#ifdef __cplusplus
>>>> +extern "C" {
>>>> +#endif
>>>> +
>>>> +#include <rte_eventdev.h>
>>>> +
>>>> +/**
>>>> + * Function prototype for dispatcher callbacks.
>>>> + *
>>>> + * @param events
>>>> + *  Pointer to an array of events.
>>>> + *
>>>> + * @param num
>>>> + *  The number of events in the @p events array.
>>>> + *
>>>> + * @param cb_data
>>>> + *  The pointer supplied by the application in
>>>> + *  rte_event_dispatcher_register() or
>>>> + *  rte_event_dispatcher_register_fallback().
>>>> + */
>>>> +
>>>> +typedef void (*rte_event_dispatcher_cb_t)(struct rte_event *events,
>>>> +                                         uint16_t num, void *cb_data);
>>>> +
>>>> +/**
>>>> + * Create an event dispatcher with the specified id.
>>>> + *
>>>> + * @param id
>>>> + *  An application-specified, unique (across all event dispatcher
>>>> + *  instances) identifier.
>>>> + *
>>>> + * @param event_dev_id
>>>> + *  The identifier of the event device from which this event dispatcher
>>>> + *  will dequeue events.
>>>> + *
>>>> + * @return
>>>> + *   - 0: Success
>>>> + *   - <0: Error code on failure
>>>> + */
>>>> +__rte_experimental
>>>> +int
>>>> +rte_event_dispatcher_create(uint8_t id, uint8_t event_dev_id);
>>>> +
>>>> +/**
>>>> + * Frees an event dispatcher with the specified id.
>>>> + *
>>>> + * @param id
>>>> + *  The event dispatcher identifier.
>>>> + *
>>>> + * @return
>>>> + *   - 0: Success
>>>> + *   - <0: Error code on failure
>>>> + */
>>>> +__rte_experimental
>>>> +int
>>>> +rte_event_dispatcher_free(uint8_t id);
>>>> +
>>>> +/**
>>>> + * Retrieve the service identifier of the event dispatcher.
>>>> + *
>>>> + * @param id
>>>> + *  The event dispatcher identifier.
>>>> + *
>>>> + * @param [out] service_id
>>>> + *  A pointer to a caller-supplied buffer where the event dispatcher's
>>>> + *  service id will be stored.
>>>> + *
>>>> + * @return
>>>> + *  - 0: Success
>>>> + *  - <0: Error code on failure.
>>>> + */
>>>> +__rte_experimental
>>>> +int
>>>> +rte_event_dispatcher_service_id_get(uint8_t id, uint32_t *service_id);
>>>> +
>>>> +/**
>>>> + * Binds an event device port to a specific lcore on the specified
>>>> + * event dispatcher.
>>>> + *
>>>> + * This function configures an event dispatcher to dequeue events from
>>>> + * an event device port (as specified by @p event_port_id), in case
>>>> + * its service function is run on particular lcore (as specified by @p
>>>> + * lcore_id).
>>>> + *
>>>> + * Multiple event device ports may be bound to the same lcore. A
>>>> + * particular port may only be bound to one lcore.
>>>> + *
>>>> + * If the event dispatcher service is mapped (with
>>>> + * rte_service_map_lcore_set()) to a lcore for which no ports are
>>>> + * bound, the service function will be a no-operation.
>>>> + *
>>>> + * @param id
>>>> + *  The event dispatcher identifier.
>>>> + *
>>>> + * @param event_port_id
>>>> + *  The event device port identifier.
>>>> + *
>>>> + * @param batch_size
>>>> + *  The batch size to use in rte_event_dequeue_burst(), for the
>>>> + *  configured event device port and lcore.
>>>> + *
>>>> + * @param timeout
>>>> + *  The timeout parameter to use in rte_event_dequeue_burst(), for the
>>>> + *  configured event device port and lcore.
>>>> + *
>>>> + * @return
>>>> + *  - 0: Success
>>>> + *  - <0: Error code on failure.
>>>> + */
>>>> +__rte_experimental
>>>> +int
>>>> +rte_event_dispatcher_bind_port_to_lcore(uint8_t id, uint8_t event_port_id,
>>>> +                                       uint16_t batch_size, uint64_t timeout,
>>>> +                                       unsigned int lcore_id);
>>>> +
>>>> +/**
>>>> + * Unbind an event device port from a specific lcore.
>>>> + *
>>>> + * @param id
>>>> + *  The event dispatcher identifier.
>>>> + *
>>>> + * @param event_port_id
>>>> + *  The event device port identifier.
>>>> + *
>>>> + * @return
>>>> + *  - 0: Success
>>>> + *  - <0: Error code on failure.
>>>> + */
>>>> +__rte_experimental
>>>> +int
>>>> +rte_event_dispatcher_unbind_port_from_lcore(uint8_t id, uint8_t event_port_id,
>>>> +                                           unsigned int lcore_id);
>>>> +
>>>> +/**
>>>> + * Register a callback function for the specified queue identifier.
>>>> + *
>>>> + * At most one callback may be registered per queue id.
>>>> + *
>>>> + * The same callback function may be registered for multiple queue ids.
>>>> + *
>>>> + * For each callback invocation, events belonging to a single queue id
>>>> + * will dispatched.
>>>> + *
>>>> + * @param id
>>>> + *  The event dispatcher identifier.
>>>> + *
>>>> + * @param queue_id
>>>> + *  The event device queue id for which @p cb_fun should be called.
>>>> + *
>>>> + * @param cb_fun
>>>> + *  The callback function.
>>>> + *
>>>> + * @param cb_data
>>>> + *  A pointer to some application-specific opaque data (or NULL),
>>>> + *  which is supplied back to the application in the callback.
>>>> + *
>>>> + * @return
>>>> + *  - 0: Success
>>>> + *  - <0: Error code on failure.
>>>> + */
>>>> +__rte_experimental
>>>> +int
>>>> +rte_event_dispatcher_register(uint8_t id, uint8_t queue_id,
>>>> +                             rte_event_dispatcher_cb_t cb_fun, void *cb_data);
>>>> +
>>>> +/**
>>>> + * Unregister a callback function for the specified queue identifier.
>>>> + *
>>>> + * @param id
>>>> + *  The event dispatcher identifier.
>>>> + *
>>>> + * @param queue_id
>>>> + *  The event device queue id for which the callback should be removed.
>>>> + *
>>>> + * @return
>>>> + *  - 0: Success
>>>> + *  - <0: Error code on failure.
>>>> + */
>>>> +__rte_experimental
>>>> +int
>>>> +rte_event_dispatcher_unregister(uint8_t id, uint8_t queue_id);
>>>> +
>>>> +/**
>>>> + * Register a fallback callback function for the specified queue
>>>> + * identifier.
>>>> + *
>>>> + * Only events for which no queue-specific callback function will be
>>>> + * dispatched to the @p cb_fun callback.
>>>> + *
>>>> + * At most one callback fallback function may be registered.
>>>> + *
>>>> + * For each callback invocation, only events belonging to a single
>>>> + * queue id will be included.
>>>> + *
>>>> + * If the event dispatcher encounters an event with a queue id for
>>>> + * which the application has not registered any specific callback, and
>>>> + * there is also no fallback configured, the event will be dropped.
>>>> + *
>>>> + * @param id
>>>> + *  The event dispatcher identifier.
>>>> + *
>>>> + * @param cb_fun
>>>> + *  The fallback callback function.
>>>> + *
>>>> + * @param cb_data
>>>> + *  A pointer to some application-specific opaque data (or NULL),
>>>> + *  which is supplied back to the application in the callback.
>>>> + *
>>>> + * @return
>>>> + *  - 0: Success
>>>> + *  - <0: Error code on failure.
>>>> + */
>>>> +__rte_experimental
>>>> +int
>>>> +rte_event_dispatcher_register_fallback(uint8_t id,
>>>> +                                      rte_event_dispatcher_cb_t cb_fun,
>>>> +                                      void *cb_data);
>>>> +
>>>> +/**
>>>> + * Unregister the fallback callback function.
>>>> + *
>>>> + * @param id
>>>> + *  The event dispatcher identifier.
>>>> + *
>>>> + * @return
>>>> + *  - 0: Success
>>>> + *  - <0: Error code on failure.
>>>> + */
>>>> +__rte_experimental
>>>> +int
>>>> +rte_event_dispatcher_unregister_fallback(uint8_t id);
>>>> +
>>>> +#ifdef __cplusplus
>>>> +}
>>>> +#endif
>>>> +
>>>> +#endif /* __RTE_EVENT_DISPATCHER__ */
>>>> diff --git a/lib/librte_eventdev/rte_eventdev_version.map b/lib/librte_eventdev/rte_eventdev_version.map
>>>> index 91a62cd077..dcb887601b 100644
>>>> --- a/lib/librte_eventdev/rte_eventdev_version.map
>>>> +++ b/lib/librte_eventdev/rte_eventdev_version.map
>>>> @@ -134,4 +134,14 @@ EXPERIMENTAL {
>>>>           __rte_eventdev_trace_crypto_adapter_queue_pair_del;
>>>>           __rte_eventdev_trace_crypto_adapter_start;
>>>>           __rte_eventdev_trace_crypto_adapter_stop;
>>>> +
>>>> +       rte_event_dispatcher_create;
>>>> +       rte_event_dispatcher_free;
>>>> +       rte_event_dispatcher_service_id_get;
>>>> +       rte_event_dispatcher_bind_port_to_lcore;
>>>> +       rte_event_dispatcher_unbind_port_from_lcore;
>>>> +       rte_event_dispatcher_register;
>>>> +       rte_event_dispatcher_unregister;
>>>> +       rte_event_dispatcher_register_fallback;
>>>> +       rte_event_dispatcher_unregister_fallback;
>>>>    };
>>>> --
>>>> 2.25.1
>>>>
  
Van Haaren, Harry March 15, 2021, 3 p.m. UTC | #7
> -----Original Message-----
> From: dev <dev-bounces@dpdk.org> On Behalf Of Mattias Rönnblom
> Sent: Monday, March 15, 2021 2:45 PM
> To: Jerin Jacob <jerinjacobk@gmail.com>
> Cc: Jerin Jacob <jerinj@marvell.com>; dpdk-dev <dev@dpdk.org>; Richardson,
> Bruce <bruce.richardson@intel.com>
> Subject: Re: [dpdk-dev] [RFC] eventdev: introduce event dispatcher
> 
> On 2021-03-07 14:04, Jerin Jacob wrote:
> > On Fri, Feb 26, 2021 at 1:31 PM Mattias Rönnblom
> > <mattias.ronnblom@ericsson.com> wrote:
> >> On 2021-02-25 13:32, Jerin Jacob wrote:
> >>> On Fri, Feb 19, 2021 at 12:00 AM Mattias Rönnblom
> >>> <mattias.ronnblom@ericsson.com> wrote:
> >>>> The purpose of the event dispatcher is primarily to decouple different
> >>>> parts of an application (e.g., processing pipeline stages), which
> >>>> share the same underlying event device.
> >>>>
> >>>> The event dispatcher replaces the conditional logic (often, a switch
> >>>> statement) that typically follows an event device dequeue operation,
> >>>> where events are dispatched to different parts of the application
> >>>> based on the destination queue id.
> >>> # If the device has all type queue[1] this RFC would restrict to
> >>> use queue ONLY as stage. A stage can be a Queue Type also.
> >>> How we can abstract this in this model?
> >>
> >> "All queue type" is about scheduling policy. I would think that would be
> >> independent of the "logical endpoint" of the event (i.e., the queue id).
> >> I feel like I'm missing something here.
> > Each queue type also can be represented as a stage.
> > For example, If the system has only one queue, the Typical IPsec
> > outbound stages can be
> > Q0-Ordered(For SA lookup) -> Q0(Atomic)(For Sequence number update) ->
> > Q0(Orderd)(Crypto operation)->Q0(Atomic)(Send on wire)
> 
> 
> OK, this makes sense.
> 
> 
> Would such an application want to add a callback
> per-queue-per-sched-type, or just per-sched-type? In your example, if
> you would have a queue Q1 as well, would want to have the option to have
> different callbacks for atomic-type events on Q0 and Q1?
> 
> 
> Would you want to dispatch based on anything else in the event? You
> could basically do it on any field (flow id, priority, etc.), but is
> there some other field that's commonly used to denote a processing stage?

I expect that struct rte_event::event_type and sub_event_type would regularly
be used to split out different type of "things" that would be handled separately.

Overall, I think we could imagine the Queue number, Queue Scheduling type (Re-Ordered, Atomic), 
Event type, sub event type, Flow-ID.. all contributing somehow to what function to execute in some situation.

As a somewhat extreme example to prove a point:
An RX core might use rte_flow rules to split traffic into some arbitrary grouping, and
then the rte_event::flow_id could be used to select the function-pointer to jump to handle it?

I like the *concept* of having a table of func-ptrs, and removing of a switch() in that way,
but I'm not sure that DPDK Eventdev APIs are the right place for it. I think Jerin already
suggested the "helper function" concept, which seems a good idea to allow optional usage.

To be clear, I'm not against upstreaming of such an event-dispatcher, but I'm not sure
its possible to build it to be generic enough for all use-cases. Maybe focusing on an actual
use-case and driving the design from that is a good approach?


Regards, -Harry

<snip patch contents below>
  
Mattias Rönnblom March 22, 2021, 9:50 a.m. UTC | #8
On 2021-03-15 16:00, Van Haaren, Harry wrote:
>> -----Original Message-----
>> From: dev <dev-bounces@dpdk.org> On Behalf Of Mattias Rönnblom
>> Sent: Monday, March 15, 2021 2:45 PM
>> To: Jerin Jacob <jerinjacobk@gmail.com>
>> Cc: Jerin Jacob <jerinj@marvell.com>; dpdk-dev <dev@dpdk.org>; Richardson,
>> Bruce <bruce.richardson@intel.com>
>> Subject: Re: [dpdk-dev] [RFC] eventdev: introduce event dispatcher
>>
>> On 2021-03-07 14:04, Jerin Jacob wrote:
>>> On Fri, Feb 26, 2021 at 1:31 PM Mattias Rönnblom
>>> <mattias.ronnblom@ericsson.com> wrote:
>>>> On 2021-02-25 13:32, Jerin Jacob wrote:
>>>>> On Fri, Feb 19, 2021 at 12:00 AM Mattias Rönnblom
>>>>> <mattias.ronnblom@ericsson.com> wrote:
>>>>>> The purpose of the event dispatcher is primarily to decouple different
>>>>>> parts of an application (e.g., processing pipeline stages), which
>>>>>> share the same underlying event device.
>>>>>>
>>>>>> The event dispatcher replaces the conditional logic (often, a switch
>>>>>> statement) that typically follows an event device dequeue operation,
>>>>>> where events are dispatched to different parts of the application
>>>>>> based on the destination queue id.
>>>>> # If the device has all type queue[1] this RFC would restrict to
>>>>> use queue ONLY as stage. A stage can be a Queue Type also.
>>>>> How we can abstract this in this model?
>>>> "All queue type" is about scheduling policy. I would think that would be
>>>> independent of the "logical endpoint" of the event (i.e., the queue id).
>>>> I feel like I'm missing something here.
>>> Each queue type also can be represented as a stage.
>>> For example, If the system has only one queue, the Typical IPsec
>>> outbound stages can be
>>> Q0-Ordered(For SA lookup) -> Q0(Atomic)(For Sequence number update) ->
>>> Q0(Orderd)(Crypto operation)->Q0(Atomic)(Send on wire)
>>
>> OK, this makes sense.
>>
>>
>> Would such an application want to add a callback
>> per-queue-per-sched-type, or just per-sched-type? In your example, if
>> you would have a queue Q1 as well, would want to have the option to have
>> different callbacks for atomic-type events on Q0 and Q1?
>>
>>
>> Would you want to dispatch based on anything else in the event? You
>> could basically do it on any field (flow id, priority, etc.), but is
>> there some other field that's commonly used to denote a processing stage?
> I expect that struct rte_event::event_type and sub_event_type would regularly
> be used to split out different type of "things" that would be handled separately.
>
> Overall, I think we could imagine the Queue number, Queue Scheduling type (Re-Ordered, Atomic),
> Event type, sub event type, Flow-ID.. all contributing somehow to what function to execute in some situation.


Sure, and add to this list the contents of the mbuf (or other user payload).


What you should keep in mind (and maybe you did), is that the primary 
aim is to allow decoupling of different parts of an application (or even 
multiple applications), sharing an event device.


> As a somewhat extreme example to prove a point:
> An RX core might use rte_flow rules to split traffic into some arbitrary grouping, and
> then the rte_event::flow_id could be used to select the function-pointer to jump to handle it?
>
> I like the *concept* of having a table of func-ptrs, and removing of a switch() in that way,
> but I'm not sure that DPDK Eventdev APIs are the right place for it. I think Jerin already
> suggested the "helper function" concept, which seems a good idea to allow optional usage.
>
> To be clear, I'm not against upstreaming of such an event-dispatcher, but I'm not sure
> its possible to build it to be generic enough for all use-cases. Maybe focusing on an actual
> use-case and driving the design from that is a good approach?
>
The intention is that the event dispatcher is optional. For performance 
and/or flexibility, many applications would still use the normal event 
device enqueue and dequeue operations.


The event dispatcher not supposed to cover all possible use cases in the 
sense that it will able to remove all conditional logic used to select 
the function call which marks the beginning of the processing for an 
event. It should be an aim to cover most cases, where one set of events 
goes to one software module, and another goes elsewhere. (The original 
RFC design stems from an actual use case - but it's just one.)


An alternative design would be to turn the whole thing around, in the 
sense that instead of the application specifying which queue id/sched 
type/sub event type/etc goes to which callback, you split the callback 
in two: one callback to answer the question "is this your event" and 
another callback to actually dispatch the event (with the same signature 
as the callback in the RFC).


If made completely generic (and thus remove any references made to queue 
id in the API), it would require a handful (or maybe more) callback 
calls per event in the dispatcher. Sounds less than ideal from a 
performance point of view.


If you kept the queue id as the basic arbiter of the event stream, the 
overhead should be reduced to something more manageable in most 
applications.


A side-effect of this scheme is it provides an opportunity for the 
dispatcher to order the just-dequeued events in batches to the different 
consumer callbacks, before dispatching them - basically for free. For 
many applications, this should have a large upside in the form of 
improved cache locality and fewer branch mispredicts. To reap those 
benefits, batching on queue id should also be performed.


If there's interest, I could try to do a RFC of this alternative 
approach as well, and have a look at the performance implications.

> Regards, -Harry
>
> <snip patch contents below>
  

Patch

diff --git a/lib/librte_eventdev/Makefile b/lib/librte_eventdev/Makefile
index 0715256bb4..614d53af1b 100644
--- a/lib/librte_eventdev/Makefile
+++ b/lib/librte_eventdev/Makefile
@@ -26,6 +26,7 @@  SRCS-y += rte_event_eth_rx_adapter.c
 SRCS-y += rte_event_timer_adapter.c
 SRCS-y += rte_event_crypto_adapter.c
 SRCS-y += rte_event_eth_tx_adapter.c
+SRCS-y += rte_event_dispatcher.c
 
 # export include files
 SYMLINK-y-include += rte_eventdev.h
@@ -40,6 +41,7 @@  SYMLINK-y-include += rte_event_timer_adapter.h
 SYMLINK-y-include += rte_event_timer_adapter_pmd.h
 SYMLINK-y-include += rte_event_crypto_adapter.h
 SYMLINK-y-include += rte_event_eth_tx_adapter.h
+SYMLINK-y-include += rte_event_dispatcher.h
 
 # versioning export map
 EXPORT_MAP := rte_eventdev_version.map
diff --git a/lib/librte_eventdev/meson.build b/lib/librte_eventdev/meson.build
index d1f25ee5ca..2ca81983b5 100644
--- a/lib/librte_eventdev/meson.build
+++ b/lib/librte_eventdev/meson.build
@@ -13,7 +13,8 @@  sources = files('rte_eventdev.c',
 		'rte_event_eth_rx_adapter.c',
 		'rte_event_timer_adapter.c',
 		'rte_event_crypto_adapter.c',
-		'rte_event_eth_tx_adapter.c')
+		'rte_event_eth_tx_adapter.c',
+		'rte_event_dispatcher.c')
 headers = files('rte_eventdev.h',
 		'rte_eventdev_pmd.h',
 		'rte_eventdev_pmd_pci.h',
@@ -25,5 +26,6 @@  headers = files('rte_eventdev.h',
 		'rte_event_timer_adapter.h',
 		'rte_event_timer_adapter_pmd.h',
 		'rte_event_crypto_adapter.h',
-		'rte_event_eth_tx_adapter.h')
+		'rte_event_eth_tx_adapter.h',
+		'rte_event_dispatcher.h')
 deps += ['ring', 'ethdev', 'hash', 'mempool', 'mbuf', 'timer', 'cryptodev']
diff --git a/lib/librte_eventdev/rte_event_dispatcher.c b/lib/librte_eventdev/rte_event_dispatcher.c
new file mode 100644
index 0000000000..1c7e55a752
--- /dev/null
+++ b/lib/librte_eventdev/rte_event_dispatcher.c
@@ -0,0 +1,420 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2021 Ericsson AB
+ */
+
+#include <stdbool.h>
+#include <stdint.h>
+
+#include <rte_lcore.h>
+#include <rte_service_component.h>
+#include <rte_eventdev_pmd.h>
+
+#include <rte_event_dispatcher.h>
+
+#define RED_MAX_PORTS_PER_LCORE (4)
+
+struct rte_event_dispatcher_lcore_port {
+	uint8_t port_id;
+	uint16_t batch_size;
+	uint64_t timeout;
+};
+
+struct rte_event_dispatcher_lcore {
+	uint8_t num_ports;
+	struct rte_event_dispatcher_lcore_port ports[RED_MAX_PORTS_PER_LCORE];
+};
+
+struct rte_event_dispatcher_cb {
+	rte_event_dispatcher_cb_t cb_fun;
+	void *cb_data;
+};
+
+struct rte_event_dispatcher {
+	uint8_t id;
+	uint8_t event_dev_id;
+	int socket_id;
+	uint32_t service_id;
+	struct rte_event_dispatcher_lcore lcores[RTE_MAX_LCORE];
+	struct rte_event_dispatcher_cb queue_cbs[UINT8_MAX];
+	struct rte_event_dispatcher_cb fallback;
+};
+
+static struct rte_event_dispatcher *dispatchers[UINT8_MAX];
+
+static bool
+red_has_dispatcher(uint8_t id)
+{
+	return dispatchers[id] != NULL;
+}
+
+static struct rte_event_dispatcher *
+red_get_dispatcher(uint8_t id)
+{
+	return dispatchers[id];
+}
+
+static void
+red_set_dispatcher(uint8_t id, struct rte_event_dispatcher *dispatcher)
+{
+	dispatchers[id] = dispatcher;
+}
+
+#define RED_VALID_ID_OR_RET_EINVAL(id)					\
+	do {								\
+		if (unlikely(!red_has_dispatcher(id))) {		\
+			RTE_EDEV_LOG_ERR("Invalid dispatcher id %d\n", id); \
+			return -EINVAL;					\
+		}							\
+	} while (0)
+
+static struct rte_event_dispatcher_cb *
+red_lookup_cb(struct rte_event_dispatcher *dispatcher, uint8_t queue_id)
+{
+	struct rte_event_dispatcher_cb *cb = &dispatcher->queue_cbs[queue_id];
+
+	if (unlikely(cb->cb_fun == NULL))
+	    cb = &dispatcher->fallback;
+
+	return cb;
+}
+
+static void
+red_dispatch_events(struct rte_event_dispatcher *dispatcher,
+		    struct rte_event *events, uint16_t num_events)
+{
+	uint16_t cb_start;
+	uint16_t cb_len;
+
+	for (cb_start = 0; cb_start < num_events; cb_start += cb_len) {
+		uint16_t cb_end = cb_start;
+		uint8_t queue_id = events[cb_start].queue_id;
+		struct rte_event_dispatcher_cb *cb;
+
+		while (++cb_end < num_events &&
+		       events[cb_end].queue_id == queue_id)
+			;
+
+		cb_len = cb_end - cb_start;
+
+		cb = red_lookup_cb(dispatcher, queue_id);
+
+		if (unlikely(cb->cb_fun == NULL)) {
+			RTE_EDEV_LOG_ERR("Attempted to dispatch %d events "
+					 "for queue id %d, but no queue or "
+					 "fallback cb were configured\n",
+					 cb_len, queue_id);
+			continue;
+		}
+
+		cb->cb_fun(&events[cb_start], cb_len, cb->cb_data);
+	}
+}
+
+static void
+red_port_dequeue(struct rte_event_dispatcher *dispatcher,
+		 struct rte_event_dispatcher_lcore_port *port)
+{
+	uint16_t batch_size = port->batch_size;
+	struct rte_event events[batch_size];
+	uint16_t n;
+
+	n = rte_event_dequeue_burst(dispatcher->event_dev_id, port->port_id,
+				    events, batch_size, port->timeout);
+
+	red_dispatch_events(dispatcher, events, n);
+}
+
+static int32_t
+red_lcore_process(void *userdata)
+{
+	uint16_t i;
+	struct rte_event_dispatcher *dispatcher = userdata;
+	unsigned int lcore_id = rte_lcore_id();
+	struct rte_event_dispatcher_lcore *lcore =
+		&dispatcher->lcores[lcore_id];
+
+	for (i = 0; i < lcore->num_ports; i++) {
+		struct rte_event_dispatcher_lcore_port *port =
+			&lcore->ports[i];
+
+		red_port_dequeue(dispatcher, port);
+	}
+
+	return 0;
+}
+
+static int
+red_service_runstate_set(uint32_t service_id, int state)
+{
+	int rc;
+
+	rc = rte_service_component_runstate_set(service_id, state);
+
+	if (rc)
+		RTE_EDEV_LOG_ERR("Error %d occurred while setting service "
+				 "component run state to %d\n", rc, state);
+
+	return rc;
+}
+
+static int
+red_service_register(struct rte_event_dispatcher *dispatcher)
+{
+	struct rte_service_spec service = {
+		.callback = red_lcore_process,
+		.callback_userdata = dispatcher,
+		.capabilities = RTE_SERVICE_CAP_MT_SAFE,
+		.socket_id = dispatcher->socket_id
+	};
+	int rc;
+
+	snprintf(service.name, RTE_SERVICE_NAME_MAX - 1, "red_%d",
+		 dispatcher->id);
+
+	rc = rte_service_component_register(&service, &dispatcher->service_id);
+
+	if (rc)
+		RTE_EDEV_LOG_ERR("Registration of event dispatcher service "
+				 "%s failed with error code %d\n",
+				 service.name, rc);
+
+	rc = red_service_runstate_set(dispatcher->service_id, 1);
+
+	if (rc)
+		rte_service_component_unregister(dispatcher->service_id);
+
+	return rc;
+}
+
+static int
+red_service_unregister(struct rte_event_dispatcher *dispatcher)
+{
+	int rc;
+
+	rc = red_service_runstate_set(dispatcher->service_id, 0);
+
+	if (rc)
+		return rc;
+
+	rc = rte_service_component_unregister(dispatcher->service_id);
+
+	if (rc)
+		RTE_EDEV_LOG_ERR("Unregistration of event dispatcher service "
+				 "failed with error code %d\n", rc);
+
+	return rc;
+}
+
+int
+rte_event_dispatcher_create(uint8_t id, uint8_t event_dev_id)
+{
+	int socket_id;
+	struct rte_event_dispatcher *dispatcher;
+	int rc;
+
+	if (red_has_dispatcher(id)) {
+		RTE_EDEV_LOG_ERR("Dispatcher with id %d already exists\n",
+				 id);
+		return -EEXIST;
+	}
+
+	socket_id = rte_event_dev_socket_id(event_dev_id);
+
+	dispatcher =
+		rte_malloc_socket("event dispatcher",
+				  sizeof(struct rte_event_dispatcher),
+				  RTE_CACHE_LINE_SIZE, socket_id);
+
+	if (dispatcher == NULL) {
+		RTE_EDEV_LOG_ERR("Unable to allocate memory for event "
+				 "dispatcher\n");
+		return -ENOMEM;
+	}
+
+	*dispatcher = (struct rte_event_dispatcher) {
+		.id = id,
+		.event_dev_id = event_dev_id,
+		.socket_id = socket_id
+	};
+
+	rc = red_service_register(dispatcher);
+
+	if (rc < 0) {
+		rte_free(dispatcher);
+		return rc;
+	}
+
+	red_set_dispatcher(id, dispatcher);
+
+	return 0;
+}
+
+int
+rte_event_dispatcher_free(uint8_t id)
+{
+	struct rte_event_dispatcher *dispatcher;
+	int rc;
+
+	RED_VALID_ID_OR_RET_EINVAL(id);
+	dispatcher = red_get_dispatcher(id);
+
+	rc = red_service_unregister(dispatcher);
+
+	if (rc)
+		return rc;
+
+	red_set_dispatcher(id, NULL);
+
+	rte_free(dispatcher);
+
+	return 0;
+}
+
+int
+rte_event_dispatcher_service_id_get(uint8_t id, uint32_t *service_id)
+{
+	struct rte_event_dispatcher *dispatcher;
+
+	RED_VALID_ID_OR_RET_EINVAL(id);
+	dispatcher = red_get_dispatcher(id);
+
+	*service_id = dispatcher->service_id;
+
+	return 0;
+}
+
+static int16_t
+lcore_port_index(struct rte_event_dispatcher_lcore *lcore,
+		 uint8_t event_port_id)
+{
+	uint16_t i;
+
+	for (i = 0; i < lcore->num_ports; i++) {
+		struct rte_event_dispatcher_lcore_port *port =
+			&lcore->ports[i];
+		if (port->port_id == event_port_id)
+			return i;
+	}
+
+	return -1;
+}
+
+int
+rte_event_dispatcher_bind_port_to_lcore(uint8_t id, uint8_t event_port_id,
+					uint16_t batch_size, uint64_t timeout,
+					unsigned int lcore_id)
+{
+	struct rte_event_dispatcher *dispatcher;
+	struct rte_event_dispatcher_lcore *lcore;
+	struct rte_event_dispatcher_lcore_port *port;
+
+	RED_VALID_ID_OR_RET_EINVAL(id);
+	dispatcher = red_get_dispatcher(id);
+
+	lcore =	&dispatcher->lcores[lcore_id];
+
+	if (lcore->num_ports == RED_MAX_PORTS_PER_LCORE)
+		return -ENOMEM;
+
+	if (lcore_port_index(lcore, event_port_id) >= 0)
+		return -EEXIST;
+
+	port = &lcore->ports[lcore->num_ports];
+
+	*port = (struct rte_event_dispatcher_lcore_port) {
+		.port_id = event_port_id,
+		.batch_size = batch_size,
+		.timeout = timeout
+	};
+
+	lcore->num_ports++;
+
+	return 0;
+}
+
+int
+rte_event_dispatcher_unbind_port_from_lcore(uint8_t id, uint8_t event_port_id,
+					    unsigned int lcore_id)
+{
+	struct rte_event_dispatcher *dispatcher;
+	struct rte_event_dispatcher_lcore *lcore;
+	int16_t port_idx;
+	struct rte_event_dispatcher_lcore_port *port;
+	struct rte_event_dispatcher_lcore_port *last;
+
+	RED_VALID_ID_OR_RET_EINVAL(id);
+	dispatcher = red_get_dispatcher(id);
+
+	lcore =	&dispatcher->lcores[lcore_id];
+
+	port_idx = lcore_port_index(lcore, event_port_id);
+
+	if (port_idx < 0)
+		return -ENOENT;
+
+	port = &lcore->ports[port_idx];
+	last = &lcore->ports[lcore->num_ports - 1];
+
+	if (port != last)
+		*port = *last;
+
+	lcore->num_ports--;
+
+	return 0;
+}
+
+static int
+red_cb_manage(uint8_t id, uint8_t queue_id, bool reg, bool fallback,
+	   rte_event_dispatcher_cb_t cb_fun, void *cb_data)
+{
+	struct rte_event_dispatcher *dispatcher;
+	struct rte_event_dispatcher_cb *cb;
+
+	RED_VALID_ID_OR_RET_EINVAL(id);
+	dispatcher = red_get_dispatcher(id);
+
+	if (fallback)
+		cb = &dispatcher->fallback;
+	else
+		cb = &dispatcher->queue_cbs[queue_id];
+
+	if (reg && cb->cb_fun != NULL)
+		return -EEXIST;
+
+	if (!reg && cb->cb_fun == NULL)
+		return -ENOENT;
+
+	*cb = (struct rte_event_dispatcher_cb) {
+		.cb_fun = cb_fun,
+		.cb_data = cb_data
+	};
+
+	return 0;
+}
+
+int
+rte_event_dispatcher_register(uint8_t id, uint8_t queue_id,
+			      rte_event_dispatcher_cb_t cb_fun, void *cb_data)
+{
+	return red_cb_manage(id, queue_id, true, false, cb_fun, cb_data);
+}
+
+int
+rte_event_dispatcher_unregister(uint8_t id, uint8_t queue_id)
+{
+	return red_cb_manage(id, queue_id, false, false, NULL, NULL);
+}
+
+int
+rte_event_dispatcher_register_fallback(uint8_t id,
+				       rte_event_dispatcher_cb_t cb_fun,
+				       void *cb_data)
+{
+	return red_cb_manage(id, 0, true, true, cb_fun, cb_data);
+}
+
+int
+rte_event_dispatcher_unregister_fallback(uint8_t id)
+{
+	return red_cb_manage(id, 0, false, true, NULL, NULL);
+}
diff --git a/lib/librte_eventdev/rte_event_dispatcher.h b/lib/librte_eventdev/rte_event_dispatcher.h
new file mode 100644
index 0000000000..11f57571ab
--- /dev/null
+++ b/lib/librte_eventdev/rte_event_dispatcher.h
@@ -0,0 +1,251 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2021 Ericsson AB
+ */
+
+#ifndef __RTE_EVENT_DISPATCHER_H__
+#define __RTE_EVENT_DISPATCHER_H__
+
+/**
+ * @file
+ *
+ * RTE Event Dispatcher
+ *
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <rte_eventdev.h>
+
+/**
+ * Function prototype for dispatcher callbacks.
+ *
+ * @param events
+ *  Pointer to an array of events.
+ *
+ * @param num
+ *  The number of events in the @p events array.
+ *
+ * @param cb_data
+ *  The pointer supplied by the application in
+ *  rte_event_dispatcher_register() or
+ *  rte_event_dispatcher_register_fallback().
+ */
+
+typedef void (*rte_event_dispatcher_cb_t)(struct rte_event *events,
+					  uint16_t num, void *cb_data);
+
+/**
+ * Create an event dispatcher with the specified id.
+ *
+ * @param id
+ *  An application-specified, unique (across all event dispatcher
+ *  instances) identifier.
+ *
+ * @param event_dev_id
+ *  The identifier of the event device from which this event dispatcher
+ *  will dequeue events.
+ *
+ * @return
+ *   - 0: Success
+ *   - <0: Error code on failure
+ */
+__rte_experimental
+int
+rte_event_dispatcher_create(uint8_t id, uint8_t event_dev_id);
+
+/**
+ * Frees an event dispatcher with the specified id.
+ *
+ * @param id
+ *  The event dispatcher identifier.
+ *
+ * @return
+ *   - 0: Success
+ *   - <0: Error code on failure
+ */
+__rte_experimental
+int
+rte_event_dispatcher_free(uint8_t id);
+
+/**
+ * Retrieve the service identifier of the event dispatcher.
+ *
+ * @param id
+ *  The event dispatcher identifier.
+ *
+ * @param [out] service_id
+ *  A pointer to a caller-supplied buffer where the event dispatcher's
+ *  service id will be stored.
+ *
+ * @return
+ *  - 0: Success
+ *  - <0: Error code on failure.
+ */
+__rte_experimental
+int
+rte_event_dispatcher_service_id_get(uint8_t id, uint32_t *service_id);
+
+/**
+ * Binds an event device port to a specific lcore on the specified
+ * event dispatcher.
+ *
+ * This function configures an event dispatcher to dequeue events from
+ * an event device port (as specified by @p event_port_id), in case
+ * its service function is run on particular lcore (as specified by @p
+ * lcore_id).
+ *
+ * Multiple event device ports may be bound to the same lcore. A
+ * particular port may only be bound to one lcore.
+ *
+ * If the event dispatcher service is mapped (with
+ * rte_service_map_lcore_set()) to a lcore for which no ports are
+ * bound, the service function will be a no-operation.
+ *
+ * @param id
+ *  The event dispatcher identifier.
+ *
+ * @param event_port_id
+ *  The event device port identifier.
+ *
+ * @param batch_size
+ *  The batch size to use in rte_event_dequeue_burst(), for the
+ *  configured event device port and lcore.
+ *
+ * @param timeout
+ *  The timeout parameter to use in rte_event_dequeue_burst(), for the
+ *  configured event device port and lcore.
+ *
+ * @return
+ *  - 0: Success
+ *  - <0: Error code on failure.
+ */
+__rte_experimental
+int
+rte_event_dispatcher_bind_port_to_lcore(uint8_t id, uint8_t event_port_id,
+					uint16_t batch_size, uint64_t timeout,
+					unsigned int lcore_id);
+
+/**
+ * Unbind an event device port from a specific lcore.
+ *
+ * @param id
+ *  The event dispatcher identifier.
+ *
+ * @param event_port_id
+ *  The event device port identifier.
+ *
+ * @return
+ *  - 0: Success
+ *  - <0: Error code on failure.
+ */
+__rte_experimental
+int
+rte_event_dispatcher_unbind_port_from_lcore(uint8_t id, uint8_t event_port_id,
+					    unsigned int lcore_id);
+
+/**
+ * Register a callback function for the specified queue identifier.
+ *
+ * At most one callback may be registered per queue id.
+ *
+ * The same callback function may be registered for multiple queue ids.
+ *
+ * For each callback invocation, events belonging to a single queue id
+ * will dispatched.
+ *
+ * @param id
+ *  The event dispatcher identifier.
+ *
+ * @param queue_id
+ *  The event device queue id for which @p cb_fun should be called.
+ *
+ * @param cb_fun
+ *  The callback function.
+ *
+ * @param cb_data
+ *  A pointer to some application-specific opaque data (or NULL),
+ *  which is supplied back to the application in the callback.
+ *
+ * @return
+ *  - 0: Success
+ *  - <0: Error code on failure.
+ */
+__rte_experimental
+int
+rte_event_dispatcher_register(uint8_t id, uint8_t queue_id,
+			      rte_event_dispatcher_cb_t cb_fun, void *cb_data);
+
+/**
+ * Unregister a callback function for the specified queue identifier.
+ *
+ * @param id
+ *  The event dispatcher identifier.
+ *
+ * @param queue_id
+ *  The event device queue id for which the callback should be removed.
+ *
+ * @return
+ *  - 0: Success
+ *  - <0: Error code on failure.
+ */
+__rte_experimental
+int
+rte_event_dispatcher_unregister(uint8_t id, uint8_t queue_id);
+
+/**
+ * Register a fallback callback function for the specified queue
+ * identifier.
+ *
+ * Only events for which no queue-specific callback function will be
+ * dispatched to the @p cb_fun callback.
+ *
+ * At most one callback fallback function may be registered.
+ *
+ * For each callback invocation, only events belonging to a single
+ * queue id will be included.
+ *
+ * If the event dispatcher encounters an event with a queue id for
+ * which the application has not registered any specific callback, and
+ * there is also no fallback configured, the event will be dropped.
+ *
+ * @param id
+ *  The event dispatcher identifier.
+ *
+ * @param cb_fun
+ *  The fallback callback function.
+ *
+ * @param cb_data
+ *  A pointer to some application-specific opaque data (or NULL),
+ *  which is supplied back to the application in the callback.
+ *
+ * @return
+ *  - 0: Success
+ *  - <0: Error code on failure.
+ */
+__rte_experimental
+int
+rte_event_dispatcher_register_fallback(uint8_t id,
+				       rte_event_dispatcher_cb_t cb_fun,
+				       void *cb_data);
+
+/**
+ * Unregister the fallback callback function.
+ *
+ * @param id
+ *  The event dispatcher identifier.
+ *
+ * @return
+ *  - 0: Success
+ *  - <0: Error code on failure.
+ */
+__rte_experimental
+int
+rte_event_dispatcher_unregister_fallback(uint8_t id);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* __RTE_EVENT_DISPATCHER__ */
diff --git a/lib/librte_eventdev/rte_eventdev_version.map b/lib/librte_eventdev/rte_eventdev_version.map
index 91a62cd077..dcb887601b 100644
--- a/lib/librte_eventdev/rte_eventdev_version.map
+++ b/lib/librte_eventdev/rte_eventdev_version.map
@@ -134,4 +134,14 @@  EXPERIMENTAL {
 	__rte_eventdev_trace_crypto_adapter_queue_pair_del;
 	__rte_eventdev_trace_crypto_adapter_start;
 	__rte_eventdev_trace_crypto_adapter_stop;
+
+	rte_event_dispatcher_create;
+	rte_event_dispatcher_free;
+	rte_event_dispatcher_service_id_get;
+	rte_event_dispatcher_bind_port_to_lcore;
+	rte_event_dispatcher_unbind_port_from_lcore;
+	rte_event_dispatcher_register;
+	rte_event_dispatcher_unregister;
+	rte_event_dispatcher_register_fallback;
+	rte_event_dispatcher_unregister_fallback;
 };