[v5,09/15] graph: introduce stream moving cross cores
Checks
Commit Message
This patch introduces key functions to allow a worker thread to
enable enqueue and move streams of objects to the next nodes over
different cores.
Signed-off-by: Haiyue Wang <haiyue.wang@intel.com>
Signed-off-by: Cunming Liang <cunming.liang@intel.com>
Signed-off-by: Zhirun Yan <zhirun.yan@intel.com>
---
lib/graph/graph_private.h | 27 +++++
lib/graph/meson.build | 2 +-
lib/graph/rte_graph_model_dispatch.c | 145 +++++++++++++++++++++++++++
lib/graph/rte_graph_model_dispatch.h | 37 +++++++
lib/graph/version.map | 2 +
5 files changed, 212 insertions(+), 1 deletion(-)
Comments
> This patch introduces key functions to allow a worker thread to
> enable enqueue and move streams of objects to the next nodes over
> different cores.
>
> Signed-off-by: Haiyue Wang <haiyue.wang@intel.com>
> Signed-off-by: Cunming Liang <cunming.liang@intel.com>
> Signed-off-by: Zhirun Yan <zhirun.yan@intel.com>
> ---
> lib/graph/graph_private.h | 27 +++++
> lib/graph/meson.build | 2 +-
> lib/graph/rte_graph_model_dispatch.c | 145
> +++++++++++++++++++++++++++
> lib/graph/rte_graph_model_dispatch.h | 37 +++++++
> lib/graph/version.map | 2 +
> 5 files changed, 212 insertions(+), 1 deletion(-)
>
> diff --git a/lib/graph/graph_private.h b/lib/graph/graph_private.h
> index b66b18ebbc..e1a2a4bfd8 100644
> --- a/lib/graph/graph_private.h
> +++ b/lib/graph/graph_private.h
> @@ -366,4 +366,31 @@ void graph_dump(FILE *f, struct graph *g);
> */
> void node_dump(FILE *f, struct node *n);
>
> +/**
> + * @internal
> + *
> + * Create the graph schedule work queue. And all cloned graphs attached to
> the
> + * parent graph MUST be destroyed together for fast schedule design
> limitation.
> + *
> + * @param _graph
> + * The graph object
> + * @param _parent_graph
> + * The parent graph object which holds the run-queue head.
> + *
> + * @return
> + * - 0: Success.
> + * - <0: Graph schedule work queue related error.
> + */
> +int graph_sched_wq_create(struct graph *_graph, struct graph
> *_parent_graph);
> +
> +/**
> + * @internal
> + *
> + * Destroy the graph schedule work queue.
> + *
> + * @param _graph
> + * The graph object
> + */
> +void graph_sched_wq_destroy(struct graph *_graph);
> +
> #endif /* _RTE_GRAPH_PRIVATE_H_ */
> diff --git a/lib/graph/meson.build b/lib/graph/meson.build
> index c729d984b6..e21affa280 100644
> --- a/lib/graph/meson.build
> +++ b/lib/graph/meson.build
> @@ -20,4 +20,4 @@ sources = files(
> )
> headers = files('rte_graph.h', 'rte_graph_worker.h')
>
> -deps += ['eal', 'pcapng']
> +deps += ['eal', 'pcapng', 'mempool', 'ring']
> diff --git a/lib/graph/rte_graph_model_dispatch.c
> b/lib/graph/rte_graph_model_dispatch.c
> index 4a2f99496d..a300fefb85 100644
> --- a/lib/graph/rte_graph_model_dispatch.c
> +++ b/lib/graph/rte_graph_model_dispatch.c
> @@ -5,6 +5,151 @@
> #include "graph_private.h"
> #include "rte_graph_model_dispatch.h"
>
> +int
> +graph_sched_wq_create(struct graph *_graph, struct graph
> *_parent_graph)
> +{
> + struct rte_graph *parent_graph = _parent_graph->graph;
> + struct rte_graph *graph = _graph->graph;
> + unsigned int wq_size;
> +
> + wq_size = GRAPH_SCHED_WQ_SIZE(graph->nb_nodes);
> + wq_size = rte_align32pow2(wq_size + 1);
Hi Zhirun,
We should introduce a new function `rte_graph_configure` which can help
application to control the ring size and mempool size of the work queue?
We could fallback to default values if nothing is configured.
rte_graph_configure should take a
struct rte_graph_config {
struct {
u64 rsvd[8];
} rtc;
struct {
u16 wq_size;
...
} dispatch;
};
This will help future graph models to have their own configuration.
We can have a rte_graph_config_init() function to initialize the rte_graph_config structure.
> +
> + graph->wq = rte_ring_create(graph->name, wq_size, graph->socket,
> + RING_F_SC_DEQ);
> + if (graph->wq == NULL)
> + SET_ERR_JMP(EIO, fail, "Failed to allocate graph WQ");
> +
> + graph->mp = rte_mempool_create(graph->name, wq_size,
> + sizeof(struct graph_sched_wq_node),
> + 0, 0, NULL, NULL, NULL, NULL,
> + graph->socket, MEMPOOL_F_SP_PUT);
> + if (graph->mp == NULL)
> + SET_ERR_JMP(EIO, fail_mp,
> + "Failed to allocate graph WQ schedule entry");
> +
> + graph->lcore_id = _graph->lcore_id;
> +
> + if (parent_graph->rq == NULL) {
> + parent_graph->rq = &parent_graph->rq_head;
> + SLIST_INIT(parent_graph->rq);
> + }
> +
> + graph->rq = parent_graph->rq;
> + SLIST_INSERT_HEAD(graph->rq, graph, rq_next);
> +
> + return 0;
> +
> +fail_mp:
> + rte_ring_free(graph->wq);
> + graph->wq = NULL;
> +fail:
> + return -rte_errno;
> +}
> +
> +void
> +graph_sched_wq_destroy(struct graph *_graph)
> +{
> + struct rte_graph *graph = _graph->graph;
> +
> + if (graph == NULL)
> + return;
> +
> + rte_ring_free(graph->wq);
> + graph->wq = NULL;
> +
> + rte_mempool_free(graph->mp);
> + graph->mp = NULL;
> +}
> +
> +static __rte_always_inline bool
> +__graph_sched_node_enqueue(struct rte_node *node, struct rte_graph
> *graph)
> +{
> + struct graph_sched_wq_node *wq_node;
> + uint16_t off = 0;
> + uint16_t size;
> +
> +submit_again:
> + if (rte_mempool_get(graph->mp, (void **)&wq_node) < 0)
> + goto fallback;
> +
> + size = RTE_MIN(node->idx, RTE_DIM(wq_node->objs));
> + wq_node->node_off = node->off;
> + wq_node->nb_objs = size;
> + rte_memcpy(wq_node->objs, &node->objs[off], size * sizeof(void
> *));
> +
> + while (rte_ring_mp_enqueue_bulk_elem(graph->wq, (void
> *)&wq_node,
> + sizeof(wq_node), 1, NULL) == 0)
> + rte_pause();
> +
> + off += size;
> + node->idx -= size;
> + if (node->idx > 0)
> + goto submit_again;
> +
> + return true;
> +
> +fallback:
> + if (off != 0)
> + memmove(&node->objs[0], &node->objs[off],
> + node->idx * sizeof(void *));
> +
> + return false;
> +}
> +
> +bool __rte_noinline
> +__rte_graph_sched_node_enqueue(struct rte_node *node,
> + struct rte_graph_rq_head *rq)
> +{
> + const unsigned int lcore_id = node->lcore_id;
> + struct rte_graph *graph;
> +
> + SLIST_FOREACH(graph, rq, rq_next)
> + if (graph->lcore_id == lcore_id)
> + break;
> +
> + return graph != NULL ? __graph_sched_node_enqueue(node,
> graph) : false;
> +}
> +
> +void
> +__rte_graph_sched_wq_process(struct rte_graph *graph)
> +{
> + struct graph_sched_wq_node *wq_node;
> + struct rte_mempool *mp = graph->mp;
> + struct rte_ring *wq = graph->wq;
> + uint16_t idx, free_space;
> + struct rte_node *node;
> + unsigned int i, n;
> + struct graph_sched_wq_node *wq_nodes[32];
> +
> + n = rte_ring_sc_dequeue_burst_elem(wq, wq_nodes,
> sizeof(wq_nodes[0]),
> + RTE_DIM(wq_nodes), NULL);
> + if (n == 0)
> + return;
> +
> + for (i = 0; i < n; i++) {
> + wq_node = wq_nodes[i];
> + node = RTE_PTR_ADD(graph, wq_node->node_off);
> + RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
> + idx = node->idx;
> + free_space = node->size - idx;
> +
> + if (unlikely(free_space < wq_node->nb_objs))
> + __rte_node_stream_alloc_size(graph, node, node-
> >size + wq_node->nb_objs);
> +
> + memmove(&node->objs[idx], wq_node->objs, wq_node-
> >nb_objs * sizeof(void *));
> + memset(wq_node->objs, 0, wq_node->nb_objs *
> sizeof(void *));
Memset should be avoided in fastpath for better performance as we anyway set wq_node->nb_objs as 0.
> + node->idx = idx + wq_node->nb_objs;
> +
> + __rte_node_process(graph, node);
> +
> + wq_node->nb_objs = 0;
> + node->idx = 0;
> + }
> +
> + rte_mempool_put_bulk(mp, (void **)wq_nodes, n);
> +}
> +
> int
> rte_graph_model_dispatch_lcore_affinity_set(const char *name, unsigned
> int lcore_id)
> {
> diff --git a/lib/graph/rte_graph_model_dispatch.h
> b/lib/graph/rte_graph_model_dispatch.h
> index 179624e972..18fa7ce0ab 100644
> --- a/lib/graph/rte_graph_model_dispatch.h
> +++ b/lib/graph/rte_graph_model_dispatch.h
> @@ -14,12 +14,49 @@
> *
> * This API allows to set core affinity with the node.
> */
> +#include <rte_errno.h>
> +#include <rte_mempool.h>
> +#include <rte_memzone.h>
> +#include <rte_ring.h>
> +
> #include "rte_graph_worker_common.h"
>
> #ifdef __cplusplus
> extern "C" {
> #endif
>
> +#define GRAPH_SCHED_WQ_SIZE_MULTIPLIER 8
> +#define GRAPH_SCHED_WQ_SIZE(nb_nodes) \
> + ((typeof(nb_nodes))((nb_nodes) *
> GRAPH_SCHED_WQ_SIZE_MULTIPLIER))
> +
> +/**
> + * @internal
> + *
> + * Schedule the node to the right graph's work queue.
> + *
> + * @param node
> + * Pointer to the scheduled node object.
> + * @param rq
> + * Pointer to the scheduled run-queue for all graphs.
> + *
> + * @return
> + * True on success, false otherwise.
> + */
> +__rte_experimental
> +bool __rte_noinline __rte_graph_sched_node_enqueue(struct rte_node
> *node,
> + struct rte_graph_rq_head *rq);
> +
> +/**
> + * @internal
> + *
> + * Process all nodes (streams) in the graph's work queue.
> + *
> + * @param graph
> + * Pointer to the graph object.
> + */
> +__rte_experimental
> +void __rte_graph_sched_wq_process(struct rte_graph *graph);
> +
> /**
> * Set lcore affinity with the node.
> *
> diff --git a/lib/graph/version.map b/lib/graph/version.map
> index aaa86f66ed..d511133f39 100644
> --- a/lib/graph/version.map
> +++ b/lib/graph/version.map
> @@ -48,6 +48,8 @@ EXPERIMENTAL {
>
> rte_graph_worker_model_set;
> rte_graph_worker_model_get;
> + __rte_graph_sched_wq_process;
> + __rte_graph_sched_node_enqueue;
>
> rte_graph_model_dispatch_lcore_affinity_set;
>
> --
> 2.37.2
> -----Original Message-----
> From: Pavan Nikhilesh Bhagavatula <pbhagavatula@marvell.com>
> Sent: Thursday, April 27, 2023 10:53 PM
> To: Yan, Zhirun <zhirun.yan@intel.com>; dev@dpdk.org; Jerin Jacob
> Kollanukkaran <jerinj@marvell.com>; Kiran Kumar Kokkilagadda
> <kirankumark@marvell.com>; Nithin Kumar Dabilpuram
> <ndabilpuram@marvell.com>; stephen@networkplumber.org
> Cc: Liang, Cunming <cunming.liang@intel.com>; Wang, Haiyue
> <haiyue.wang@intel.com>
> Subject: RE: [EXT] [PATCH v5 09/15] graph: introduce stream moving cross cores
>
> > This patch introduces key functions to allow a worker thread to enable
> > enqueue and move streams of objects to the next nodes over different
> > cores.
> >
> > Signed-off-by: Haiyue Wang <haiyue.wang@intel.com>
> > Signed-off-by: Cunming Liang <cunming.liang@intel.com>
> > Signed-off-by: Zhirun Yan <zhirun.yan@intel.com>
> > ---
> > lib/graph/graph_private.h | 27 +++++
> > lib/graph/meson.build | 2 +-
> > lib/graph/rte_graph_model_dispatch.c | 145
> > +++++++++++++++++++++++++++
> > lib/graph/rte_graph_model_dispatch.h | 37 +++++++
> > lib/graph/version.map | 2 +
> > 5 files changed, 212 insertions(+), 1 deletion(-)
> >
> > diff --git a/lib/graph/graph_private.h b/lib/graph/graph_private.h
> > index b66b18ebbc..e1a2a4bfd8 100644
> > --- a/lib/graph/graph_private.h
> > +++ b/lib/graph/graph_private.h
> > @@ -366,4 +366,31 @@ void graph_dump(FILE *f, struct graph *g);
> > */
> > void node_dump(FILE *f, struct node *n);
> >
> > +/**
> > + * @internal
> > + *
> > + * Create the graph schedule work queue. And all cloned graphs
> > +attached to
> > the
> > + * parent graph MUST be destroyed together for fast schedule design
> > limitation.
> > + *
> > + * @param _graph
> > + * The graph object
> > + * @param _parent_graph
> > + * The parent graph object which holds the run-queue head.
> > + *
> > + * @return
> > + * - 0: Success.
> > + * - <0: Graph schedule work queue related error.
> > + */
> > +int graph_sched_wq_create(struct graph *_graph, struct graph
> > *_parent_graph);
> > +
> > +/**
> > + * @internal
> > + *
> > + * Destroy the graph schedule work queue.
> > + *
> > + * @param _graph
> > + * The graph object
> > + */
> > +void graph_sched_wq_destroy(struct graph *_graph);
> > +
> > #endif /* _RTE_GRAPH_PRIVATE_H_ */
> > diff --git a/lib/graph/meson.build b/lib/graph/meson.build index
> > c729d984b6..e21affa280 100644
> > --- a/lib/graph/meson.build
> > +++ b/lib/graph/meson.build
> > @@ -20,4 +20,4 @@ sources = files(
> > )
> > headers = files('rte_graph.h', 'rte_graph_worker.h')
> >
> > -deps += ['eal', 'pcapng']
> > +deps += ['eal', 'pcapng', 'mempool', 'ring']
> > diff --git a/lib/graph/rte_graph_model_dispatch.c
> > b/lib/graph/rte_graph_model_dispatch.c
> > index 4a2f99496d..a300fefb85 100644
> > --- a/lib/graph/rte_graph_model_dispatch.c
> > +++ b/lib/graph/rte_graph_model_dispatch.c
> > @@ -5,6 +5,151 @@
> > #include "graph_private.h"
> > #include "rte_graph_model_dispatch.h"
> >
> > +int
> > +graph_sched_wq_create(struct graph *_graph, struct graph
> > *_parent_graph)
> > +{
> > + struct rte_graph *parent_graph = _parent_graph->graph;
> > + struct rte_graph *graph = _graph->graph;
> > + unsigned int wq_size;
> > +
> > + wq_size = GRAPH_SCHED_WQ_SIZE(graph->nb_nodes);
> > + wq_size = rte_align32pow2(wq_size + 1);
>
> Hi Zhirun,
>
> We should introduce a new function `rte_graph_configure` which can help
> application to control the ring size and mempool size of the work queue?
> We could fallback to default values if nothing is configured.
>
> rte_graph_configure should take a
> struct rte_graph_config {
> struct {
> u64 rsvd[8];
> } rtc;
> struct {
> u16 wq_size;
> ...
> } dispatch;
> };
>
> This will help future graph models to have their own configuration.
>
> We can have a rte_graph_config_init() function to initialize the rte_graph_config
> structure.
>
Hi Pavan,
Thanks for your comments. I am agree with you. It would be more friendly for user/developer.
And for ring and mempool, there are some limitations(must be a power of 2) about the size. So
I prefer to use u16 wq_size_max and u32 mp_size_max for user if they have limited resources.
>
> > +
> > + graph->wq = rte_ring_create(graph->name, wq_size, graph->socket,
> > + RING_F_SC_DEQ);
> > + if (graph->wq == NULL)
> > + SET_ERR_JMP(EIO, fail, "Failed to allocate graph WQ");
> > +
> > + graph->mp = rte_mempool_create(graph->name, wq_size,
> > + sizeof(struct graph_sched_wq_node),
> > + 0, 0, NULL, NULL, NULL, NULL,
> > + graph->socket, MEMPOOL_F_SP_PUT);
> > + if (graph->mp == NULL)
> > + SET_ERR_JMP(EIO, fail_mp,
> > + "Failed to allocate graph WQ schedule entry");
> > +
> > + graph->lcore_id = _graph->lcore_id;
> > +
> > + if (parent_graph->rq == NULL) {
> > + parent_graph->rq = &parent_graph->rq_head;
> > + SLIST_INIT(parent_graph->rq);
> > + }
> > +
> > + graph->rq = parent_graph->rq;
> > + SLIST_INSERT_HEAD(graph->rq, graph, rq_next);
> > +
> > + return 0;
> > +
> > +fail_mp:
> > + rte_ring_free(graph->wq);
> > + graph->wq = NULL;
> > +fail:
> > + return -rte_errno;
> > +}
> > +
> > +void
> > +graph_sched_wq_destroy(struct graph *_graph) {
> > + struct rte_graph *graph = _graph->graph;
> > +
> > + if (graph == NULL)
> > + return;
> > +
> > + rte_ring_free(graph->wq);
> > + graph->wq = NULL;
> > +
> > + rte_mempool_free(graph->mp);
> > + graph->mp = NULL;
> > +}
> > +
> > +static __rte_always_inline bool
> > +__graph_sched_node_enqueue(struct rte_node *node, struct rte_graph
> > *graph)
> > +{
> > + struct graph_sched_wq_node *wq_node;
> > + uint16_t off = 0;
> > + uint16_t size;
> > +
> > +submit_again:
> > + if (rte_mempool_get(graph->mp, (void **)&wq_node) < 0)
> > + goto fallback;
> > +
> > + size = RTE_MIN(node->idx, RTE_DIM(wq_node->objs));
> > + wq_node->node_off = node->off;
> > + wq_node->nb_objs = size;
> > + rte_memcpy(wq_node->objs, &node->objs[off], size * sizeof(void
> > *));
> > +
> > + while (rte_ring_mp_enqueue_bulk_elem(graph->wq, (void
> > *)&wq_node,
> > + sizeof(wq_node), 1, NULL) == 0)
> > + rte_pause();
> > +
> > + off += size;
> > + node->idx -= size;
> > + if (node->idx > 0)
> > + goto submit_again;
> > +
> > + return true;
> > +
> > +fallback:
> > + if (off != 0)
> > + memmove(&node->objs[0], &node->objs[off],
> > + node->idx * sizeof(void *));
> > +
> > + return false;
> > +}
> > +
> > +bool __rte_noinline
> > +__rte_graph_sched_node_enqueue(struct rte_node *node,
> > + struct rte_graph_rq_head *rq) {
> > + const unsigned int lcore_id = node->lcore_id;
> > + struct rte_graph *graph;
> > +
> > + SLIST_FOREACH(graph, rq, rq_next)
> > + if (graph->lcore_id == lcore_id)
> > + break;
> > +
> > + return graph != NULL ? __graph_sched_node_enqueue(node,
> > graph) : false;
> > +}
> > +
> > +void
> > +__rte_graph_sched_wq_process(struct rte_graph *graph) {
> > + struct graph_sched_wq_node *wq_node;
> > + struct rte_mempool *mp = graph->mp;
> > + struct rte_ring *wq = graph->wq;
> > + uint16_t idx, free_space;
> > + struct rte_node *node;
> > + unsigned int i, n;
> > + struct graph_sched_wq_node *wq_nodes[32];
> > +
> > + n = rte_ring_sc_dequeue_burst_elem(wq, wq_nodes,
> > sizeof(wq_nodes[0]),
> > + RTE_DIM(wq_nodes), NULL);
> > + if (n == 0)
> > + return;
> > +
> > + for (i = 0; i < n; i++) {
> > + wq_node = wq_nodes[i];
> > + node = RTE_PTR_ADD(graph, wq_node->node_off);
> > + RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
> > + idx = node->idx;
> > + free_space = node->size - idx;
> > +
> > + if (unlikely(free_space < wq_node->nb_objs))
> > + __rte_node_stream_alloc_size(graph, node, node-
> > >size + wq_node->nb_objs);
> > +
> > + memmove(&node->objs[idx], wq_node->objs, wq_node-
> > >nb_objs * sizeof(void *));
> > + memset(wq_node->objs, 0, wq_node->nb_objs *
> > sizeof(void *));
>
> Memset should be avoided in fastpath for better performance as we anyway set
> wq_node->nb_objs as 0.
>
> > + node->idx = idx + wq_node->nb_objs;
> > +
> > + __rte_node_process(graph, node);
> > +
> > + wq_node->nb_objs = 0;
> > + node->idx = 0;
> > + }
> > +
> > + rte_mempool_put_bulk(mp, (void **)wq_nodes, n); }
> > +
> > int
> > rte_graph_model_dispatch_lcore_affinity_set(const char *name,
> > unsigned int lcore_id) { diff --git
> > a/lib/graph/rte_graph_model_dispatch.h
> > b/lib/graph/rte_graph_model_dispatch.h
> > index 179624e972..18fa7ce0ab 100644
> > --- a/lib/graph/rte_graph_model_dispatch.h
> > +++ b/lib/graph/rte_graph_model_dispatch.h
> > @@ -14,12 +14,49 @@
> > *
> > * This API allows to set core affinity with the node.
> > */
> > +#include <rte_errno.h>
> > +#include <rte_mempool.h>
> > +#include <rte_memzone.h>
> > +#include <rte_ring.h>
> > +
> > #include "rte_graph_worker_common.h"
> >
> > #ifdef __cplusplus
> > extern "C" {
> > #endif
> >
> > +#define GRAPH_SCHED_WQ_SIZE_MULTIPLIER 8
> > +#define GRAPH_SCHED_WQ_SIZE(nb_nodes) \
> > + ((typeof(nb_nodes))((nb_nodes) *
> > GRAPH_SCHED_WQ_SIZE_MULTIPLIER))
> > +
> > +/**
> > + * @internal
> > + *
> > + * Schedule the node to the right graph's work queue.
> > + *
> > + * @param node
> > + * Pointer to the scheduled node object.
> > + * @param rq
> > + * Pointer to the scheduled run-queue for all graphs.
> > + *
> > + * @return
> > + * True on success, false otherwise.
> > + */
> > +__rte_experimental
> > +bool __rte_noinline __rte_graph_sched_node_enqueue(struct rte_node
> > *node,
> > + struct rte_graph_rq_head *rq);
> > +
> > +/**
> > + * @internal
> > + *
> > + * Process all nodes (streams) in the graph's work queue.
> > + *
> > + * @param graph
> > + * Pointer to the graph object.
> > + */
> > +__rte_experimental
> > +void __rte_graph_sched_wq_process(struct rte_graph *graph);
> > +
> > /**
> > * Set lcore affinity with the node.
> > *
> > diff --git a/lib/graph/version.map b/lib/graph/version.map index
> > aaa86f66ed..d511133f39 100644
> > --- a/lib/graph/version.map
> > +++ b/lib/graph/version.map
> > @@ -48,6 +48,8 @@ EXPERIMENTAL {
> >
> > rte_graph_worker_model_set;
> > rte_graph_worker_model_get;
> > + __rte_graph_sched_wq_process;
> > + __rte_graph_sched_node_enqueue;
> >
> > rte_graph_model_dispatch_lcore_affinity_set;
> >
> > --
> > 2.37.2
@@ -366,4 +366,31 @@ void graph_dump(FILE *f, struct graph *g);
*/
void node_dump(FILE *f, struct node *n);
+/**
+ * @internal
+ *
+ * Create the graph schedule work queue. And all cloned graphs attached to the
+ * parent graph MUST be destroyed together for fast schedule design limitation.
+ *
+ * @param _graph
+ * The graph object
+ * @param _parent_graph
+ * The parent graph object which holds the run-queue head.
+ *
+ * @return
+ * - 0: Success.
+ * - <0: Graph schedule work queue related error.
+ */
+int graph_sched_wq_create(struct graph *_graph, struct graph *_parent_graph);
+
+/**
+ * @internal
+ *
+ * Destroy the graph schedule work queue.
+ *
+ * @param _graph
+ * The graph object
+ */
+void graph_sched_wq_destroy(struct graph *_graph);
+
#endif /* _RTE_GRAPH_PRIVATE_H_ */
@@ -20,4 +20,4 @@ sources = files(
)
headers = files('rte_graph.h', 'rte_graph_worker.h')
-deps += ['eal', 'pcapng']
+deps += ['eal', 'pcapng', 'mempool', 'ring']
@@ -5,6 +5,151 @@
#include "graph_private.h"
#include "rte_graph_model_dispatch.h"
+int
+graph_sched_wq_create(struct graph *_graph, struct graph *_parent_graph)
+{
+ struct rte_graph *parent_graph = _parent_graph->graph;
+ struct rte_graph *graph = _graph->graph;
+ unsigned int wq_size;
+
+ wq_size = GRAPH_SCHED_WQ_SIZE(graph->nb_nodes);
+ wq_size = rte_align32pow2(wq_size + 1);
+
+ graph->wq = rte_ring_create(graph->name, wq_size, graph->socket,
+ RING_F_SC_DEQ);
+ if (graph->wq == NULL)
+ SET_ERR_JMP(EIO, fail, "Failed to allocate graph WQ");
+
+ graph->mp = rte_mempool_create(graph->name, wq_size,
+ sizeof(struct graph_sched_wq_node),
+ 0, 0, NULL, NULL, NULL, NULL,
+ graph->socket, MEMPOOL_F_SP_PUT);
+ if (graph->mp == NULL)
+ SET_ERR_JMP(EIO, fail_mp,
+ "Failed to allocate graph WQ schedule entry");
+
+ graph->lcore_id = _graph->lcore_id;
+
+ if (parent_graph->rq == NULL) {
+ parent_graph->rq = &parent_graph->rq_head;
+ SLIST_INIT(parent_graph->rq);
+ }
+
+ graph->rq = parent_graph->rq;
+ SLIST_INSERT_HEAD(graph->rq, graph, rq_next);
+
+ return 0;
+
+fail_mp:
+ rte_ring_free(graph->wq);
+ graph->wq = NULL;
+fail:
+ return -rte_errno;
+}
+
+void
+graph_sched_wq_destroy(struct graph *_graph)
+{
+ struct rte_graph *graph = _graph->graph;
+
+ if (graph == NULL)
+ return;
+
+ rte_ring_free(graph->wq);
+ graph->wq = NULL;
+
+ rte_mempool_free(graph->mp);
+ graph->mp = NULL;
+}
+
+static __rte_always_inline bool
+__graph_sched_node_enqueue(struct rte_node *node, struct rte_graph *graph)
+{
+ struct graph_sched_wq_node *wq_node;
+ uint16_t off = 0;
+ uint16_t size;
+
+submit_again:
+ if (rte_mempool_get(graph->mp, (void **)&wq_node) < 0)
+ goto fallback;
+
+ size = RTE_MIN(node->idx, RTE_DIM(wq_node->objs));
+ wq_node->node_off = node->off;
+ wq_node->nb_objs = size;
+ rte_memcpy(wq_node->objs, &node->objs[off], size * sizeof(void *));
+
+ while (rte_ring_mp_enqueue_bulk_elem(graph->wq, (void *)&wq_node,
+ sizeof(wq_node), 1, NULL) == 0)
+ rte_pause();
+
+ off += size;
+ node->idx -= size;
+ if (node->idx > 0)
+ goto submit_again;
+
+ return true;
+
+fallback:
+ if (off != 0)
+ memmove(&node->objs[0], &node->objs[off],
+ node->idx * sizeof(void *));
+
+ return false;
+}
+
+bool __rte_noinline
+__rte_graph_sched_node_enqueue(struct rte_node *node,
+ struct rte_graph_rq_head *rq)
+{
+ const unsigned int lcore_id = node->lcore_id;
+ struct rte_graph *graph;
+
+ SLIST_FOREACH(graph, rq, rq_next)
+ if (graph->lcore_id == lcore_id)
+ break;
+
+ return graph != NULL ? __graph_sched_node_enqueue(node, graph) : false;
+}
+
+void
+__rte_graph_sched_wq_process(struct rte_graph *graph)
+{
+ struct graph_sched_wq_node *wq_node;
+ struct rte_mempool *mp = graph->mp;
+ struct rte_ring *wq = graph->wq;
+ uint16_t idx, free_space;
+ struct rte_node *node;
+ unsigned int i, n;
+ struct graph_sched_wq_node *wq_nodes[32];
+
+ n = rte_ring_sc_dequeue_burst_elem(wq, wq_nodes, sizeof(wq_nodes[0]),
+ RTE_DIM(wq_nodes), NULL);
+ if (n == 0)
+ return;
+
+ for (i = 0; i < n; i++) {
+ wq_node = wq_nodes[i];
+ node = RTE_PTR_ADD(graph, wq_node->node_off);
+ RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
+ idx = node->idx;
+ free_space = node->size - idx;
+
+ if (unlikely(free_space < wq_node->nb_objs))
+ __rte_node_stream_alloc_size(graph, node, node->size + wq_node->nb_objs);
+
+ memmove(&node->objs[idx], wq_node->objs, wq_node->nb_objs * sizeof(void *));
+ memset(wq_node->objs, 0, wq_node->nb_objs * sizeof(void *));
+ node->idx = idx + wq_node->nb_objs;
+
+ __rte_node_process(graph, node);
+
+ wq_node->nb_objs = 0;
+ node->idx = 0;
+ }
+
+ rte_mempool_put_bulk(mp, (void **)wq_nodes, n);
+}
+
int
rte_graph_model_dispatch_lcore_affinity_set(const char *name, unsigned int lcore_id)
{
@@ -14,12 +14,49 @@
*
* This API allows to set core affinity with the node.
*/
+#include <rte_errno.h>
+#include <rte_mempool.h>
+#include <rte_memzone.h>
+#include <rte_ring.h>
+
#include "rte_graph_worker_common.h"
#ifdef __cplusplus
extern "C" {
#endif
+#define GRAPH_SCHED_WQ_SIZE_MULTIPLIER 8
+#define GRAPH_SCHED_WQ_SIZE(nb_nodes) \
+ ((typeof(nb_nodes))((nb_nodes) * GRAPH_SCHED_WQ_SIZE_MULTIPLIER))
+
+/**
+ * @internal
+ *
+ * Schedule the node to the right graph's work queue.
+ *
+ * @param node
+ * Pointer to the scheduled node object.
+ * @param rq
+ * Pointer to the scheduled run-queue for all graphs.
+ *
+ * @return
+ * True on success, false otherwise.
+ */
+__rte_experimental
+bool __rte_noinline __rte_graph_sched_node_enqueue(struct rte_node *node,
+ struct rte_graph_rq_head *rq);
+
+/**
+ * @internal
+ *
+ * Process all nodes (streams) in the graph's work queue.
+ *
+ * @param graph
+ * Pointer to the graph object.
+ */
+__rte_experimental
+void __rte_graph_sched_wq_process(struct rte_graph *graph);
+
/**
* Set lcore affinity with the node.
*
@@ -48,6 +48,8 @@ EXPERIMENTAL {
rte_graph_worker_model_set;
rte_graph_worker_model_get;
+ __rte_graph_sched_wq_process;
+ __rte_graph_sched_node_enqueue;
rte_graph_model_dispatch_lcore_affinity_set;