[RFC,1/3] lib/coroutine: add coroutine library
Checks
Commit Message
This patch adds coroutine library. The main elements are:
1. scheduler: container of coroutines, which is responsible for
scheduling coroutine.
2. coroutine: Minimum scheduling unit, it should associated to one
scheduler.
In the coroutine callback, application could invoke rte_co_yield() to
give up the CPU, and invoke rte_co_delay() to delay the specified
microsecond.
Signed-off-by: Chengwen Feng <fengchengwen@huawei.com>
---
lib/coroutine/meson.build | 8 ++
lib/coroutine/rte_coroutine.c | 190 ++++++++++++++++++++++++++++++
lib/coroutine/rte_coroutine.h | 110 +++++++++++++++++
lib/coroutine/rte_coroutine_imp.h | 46 ++++++++
lib/coroutine/version.map | 11 ++
lib/meson.build | 1 +
6 files changed, 366 insertions(+)
create mode 100644 lib/coroutine/meson.build
create mode 100644 lib/coroutine/rte_coroutine.c
create mode 100644 lib/coroutine/rte_coroutine.h
create mode 100644 lib/coroutine/rte_coroutine_imp.h
create mode 100644 lib/coroutine/version.map
Comments
On 4/24/2023 2:02 PM, Chengwen Feng wrote:
> This patch adds coroutine library. The main elements are:
> 1. scheduler: container of coroutines, which is responsible for
> scheduling coroutine.
> 2. coroutine: Minimum scheduling unit, it should associated to one
> scheduler.
>
> In the coroutine callback, application could invoke rte_co_yield() to
> give up the CPU, and invoke rte_co_delay() to delay the specified
> microsecond.
>
> Signed-off-by: Chengwen Feng <fengchengwen@huawei.com>
<...>
> +++ b/lib/coroutine/rte_coroutine.c
> @@ -0,0 +1,190 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2023 HiSilicon Limited
> + */
> +
> +#include <pthread.h>
> +#include <stdio.h>
> +#include <stdlib.h>
> +#include <ucontext.h>
> +
> +#include <rte_alarm.h>
> +#include <rte_ring.h>
> +
> +#include "rte_coroutine.h"
> +#include "rte_coroutine_imp.h"
> +
> +#define FATAL(fmt, args...) printf("[FATAL] %s() %d: " fmt "\n", __func__, __LINE__, ##args)
> +
No 'printf' for logging please.
<...>
> +int
> +rte_schedule_run(struct rte_schedule *s)
> +{
> + struct rte_cocontext *co = NULL;
> + uintptr_t ptr;
> +
> + /* Set local thread variable as input argument. */
> + co_schedule = s;
> +
> + while (!rte_ring_empty(s->ring)) {
> + rte_ring_dequeue(s->ring, (void **)&co);
> + if (co->state == COROUTINE_READY) {
> + getcontext(&co->ctx);
> + co->ctx.uc_stack.ss_sp = co->stack;
> + co->ctx.uc_stack.ss_size = co->stack_sz;
> + co->ctx.uc_link = &s->main;
> + co->state = COROUTINE_RUNNING;
> + s->running = co;
> + ptr = (uintptr_t)co;
> + makecontext(&co->ctx, (void (*)(void))co_run_func, 2,
> + (uint32_t)ptr, (uint32_t)(ptr >> 32));
Why passing 'co' address as two 32bit values? Why not just pass address
of it?
Also can it be possible to set context to 'co->cb()' here and do the
cleanup after context returned to 's->main' (below)?
> + swapcontext(&s->main, &co->ctx);
> + } else if (co->state == COROUTINE_SUSPEND) {
> + co->state = COROUTINE_RUNNING;
> + s->running = co;
> + swapcontext(&s->main, &co->ctx);
> + } else {
> + FATAL("invalid state!");
> + }
> + }
> +
> + while (s->yield_head != NULL) {
> + co = s->yield_head;
> + s->yield_head = co->yield_next;
> + if (co->state == COROUTINE_YIELD) {
> + co->state = COROUTINE_RUNNING;
> + s->running = co;
> + swapcontext(&s->main, &co->ctx);
> + } else {
> + FATAL("invalid yield state!");
> + }
> + }
> +
As coroutines in 'ready' state stored in a ring, same can be done for
the coroutines in 'yield' state, instead of using a linked list, to
simplify the flow. Just a question, both works fine.
> + return 0;
> +}
> +
> +int
> +rte_co_create(struct rte_schedule *s, coroutine_callback_t cb, void *arg, uint32_t stack_sz)
> +{
> + struct rte_cocontext *co = calloc(1, sizeof(struct rte_cocontext));
> + int ret;
> + if (co == NULL)
> + return -ENOMEM;
> +
I guess somewhere here should check the 's->max_coroutines';
new file mode 100644
@@ -0,0 +1,8 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 HiSilicon Limited.
+
+sources = files('rte_coroutine.c')
+headers = files('rte_coroutine.h')
+indirect_headers += files('rte_coroutine_imp.h')
+
+deps += ['ring']
new file mode 100644
@@ -0,0 +1,190 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 HiSilicon Limited
+ */
+
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <ucontext.h>
+
+#include <rte_alarm.h>
+#include <rte_ring.h>
+
+#include "rte_coroutine.h"
+#include "rte_coroutine_imp.h"
+
+#define FATAL(fmt, args...) printf("[FATAL] %s() %d: " fmt "\n", __func__, __LINE__, ##args)
+
+static __thread struct rte_schedule *co_schedule;
+
+struct rte_schedule *
+rte_schedule_create(const char *name, uint32_t max_coroutines)
+{
+ struct rte_schedule *s = calloc(1, sizeof(struct rte_schedule));
+ if (s == NULL)
+ return NULL;
+
+ s->ring = rte_ring_create(name, max_coroutines, rte_socket_id(),
+ RING_F_SC_DEQ);
+ if (s->ring == NULL) {
+ free(s);
+ return NULL;
+ }
+
+ s->max_coroutines = max_coroutines;
+
+ return s;
+}
+
+static void
+co_run_func(uint32_t low, uint32_t hi)
+{
+ uintptr_t ptr = (uint64_t)low | ((uint64_t)hi << 32);
+ struct rte_cocontext *co = (struct rte_cocontext *)ptr;
+ co->cb(co->arg);
+ /* Run complete, so free it. */
+ free(co->stack);
+ free(co);
+}
+
+int
+rte_schedule_run(struct rte_schedule *s)
+{
+ struct rte_cocontext *co = NULL;
+ uintptr_t ptr;
+
+ /* Set local thread variable as input argument. */
+ co_schedule = s;
+
+ while (!rte_ring_empty(s->ring)) {
+ rte_ring_dequeue(s->ring, (void **)&co);
+ if (co->state == COROUTINE_READY) {
+ getcontext(&co->ctx);
+ co->ctx.uc_stack.ss_sp = co->stack;
+ co->ctx.uc_stack.ss_size = co->stack_sz;
+ co->ctx.uc_link = &s->main;
+ co->state = COROUTINE_RUNNING;
+ s->running = co;
+ ptr = (uintptr_t)co;
+ makecontext(&co->ctx, (void (*)(void))co_run_func, 2,
+ (uint32_t)ptr, (uint32_t)(ptr >> 32));
+ swapcontext(&s->main, &co->ctx);
+ } else if (co->state == COROUTINE_SUSPEND) {
+ co->state = COROUTINE_RUNNING;
+ s->running = co;
+ swapcontext(&s->main, &co->ctx);
+ } else {
+ FATAL("invalid state!");
+ }
+ }
+
+ while (s->yield_head != NULL) {
+ co = s->yield_head;
+ s->yield_head = co->yield_next;
+ if (co->state == COROUTINE_YIELD) {
+ co->state = COROUTINE_RUNNING;
+ s->running = co;
+ swapcontext(&s->main, &co->ctx);
+ } else {
+ FATAL("invalid yield state!");
+ }
+ }
+
+ return 0;
+}
+
+int
+rte_co_create(struct rte_schedule *s, coroutine_callback_t cb, void *arg, uint32_t stack_sz)
+{
+ struct rte_cocontext *co = calloc(1, sizeof(struct rte_cocontext));
+ int ret;
+ if (co == NULL)
+ return -ENOMEM;
+
+ co->owner = s;
+ co->state = COROUTINE_READY;
+ co->cb = cb;
+ co->arg = arg;
+ if (stack_sz < MIN_STACK_SIZE)
+ stack_sz = MIN_STACK_SIZE;
+ co->stack_sz = stack_sz;
+ co->stack = calloc(1, stack_sz);
+ if (co->stack == NULL) {
+ free(co);
+ return -ENOMEM;
+ }
+
+ ret = rte_ring_enqueue(s->ring, co);
+ if (ret != 0) {
+ free(co->stack);
+ free(co);
+ }
+
+ return ret;
+}
+
+static inline void
+co_addto_yield_list(struct rte_schedule *s, struct rte_cocontext *co)
+{
+ co->yield_next = NULL;
+ if (s->yield_head == NULL) {
+ s->yield_head = s->yield_tail = co;
+ } else {
+ s->yield_tail->yield_next = co;
+ s->yield_tail = co;
+ }
+}
+
+void
+rte_co_yield(void)
+{
+ struct rte_schedule *s = co_schedule;
+ struct rte_cocontext *co;
+ if (s == NULL) {
+ FATAL("thread co_schedule is NULL!");
+ return;
+ }
+ co = s->running;
+ if (co == NULL) {
+ FATAL("running is NULL!");
+ return;
+ }
+ co->state = COROUTINE_YIELD;
+ s->running = NULL;
+ co_addto_yield_list(s, co);
+ swapcontext(&co->ctx, &s->main);
+}
+
+static void
+co_delay_imp(void *arg)
+{
+ struct rte_cocontext *co = (struct rte_cocontext *)arg;
+ int ret;
+
+ ret = rte_ring_enqueue(co->owner->ring, (void *)co);
+ if (ret != 0)
+ FATAL("enqueue failed!");
+}
+
+void
+rte_co_delay(unsigned int us)
+{
+ struct rte_schedule *s = co_schedule;
+ struct rte_cocontext *co;
+
+ if (s == NULL) {
+ FATAL("thread co_schedule is NULL!");
+ return;
+ }
+
+ co = s->running;
+ if (co == NULL) {
+ FATAL("running is NULL!");
+ return;
+ }
+
+ rte_eal_alarm_set(us, co_delay_imp, (void *)co);
+ co->state = COROUTINE_SUSPEND;
+ s->running = NULL;
+ swapcontext(&co->ctx, &s->main);
+}
new file mode 100644
@@ -0,0 +1,110 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 HiSilicon Limited
+ */
+
+#ifndef RTE_COROUTINE_H
+#define RTE_COROUTINE_H
+
+#include <stdint.h>
+
+#include <rte_compat.h>
+
+/**
+ * Callback prototype of coroutine.
+ *
+ * @param arg
+ * An arg pointer coming from the caller.
+ */
+typedef void (*coroutine_callback_t)(void *arg);
+
+struct rte_schedule;
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice.
+ *
+ * Create coroutine scheduler.
+ *
+ * The scheduler is a coroutines container, which could schedule coroutine
+ * running.
+ *
+ * @param name
+ * The unique name of scheduler.
+ * @param max_coroutines
+ * Maximum number of coroutines that can be processed by the scheduler.
+ *
+ * @return
+ * Non-NULL on success. Otherwise NULL value is returned.
+ */
+__rte_experimental
+struct rte_schedule *rte_schedule_create(const char *name, uint32_t max_coroutines);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice.
+ *
+ * Run the coroutine scheduler.
+ *
+ * This function will schedule all associated coroutine, it will return zero if
+ * no coroutines are active after scheduled.
+ *
+ * @param s
+ * The pointer of the scheduler.
+ *
+ * @return
+ * 0 on success. Otherwise negative value is returned.
+ */
+__rte_experimental
+int rte_schedule_run(struct rte_schedule *s);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice.
+ *
+ * Create one coroutine.
+ *
+ * This function will create one coroutine which associated target scheduler
+ * 's'.
+ *
+ * @param s
+ * The pointer of the scheduler.
+ * @param cb
+ * The callback function.
+ * @param arg
+ * The argument parameter for callback function.
+ * @param stack_sz
+ * The stack size which associated to coroutine. The value zero indicates that
+ * the default size (which is 8KB) is used.
+ *
+ * @return
+ * 0 on success. Otherwise negative value is returned.
+ */
+__rte_experimental
+int rte_co_create(struct rte_schedule *s, coroutine_callback_t cb, void *arg, uint32_t stack_sz);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice.
+ *
+ * Coroutine yield the CPU.
+ *
+ * This function yield the cpu of current coroutine.
+ */
+__rte_experimental
+void rte_co_yield(void);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice.
+ *
+ * Coroutine delay function.
+ *
+ * This function support delay microseconds of current coroutine.
+ *
+ * @param us
+ * The time in microseconds which need delay.
+ */
+__rte_experimental
+void rte_co_delay(unsigned int us);
+
+#endif /* RTE_COROUTINE_H */
new file mode 100644
@@ -0,0 +1,46 @@
+#ifndef RTE_COROUTINE_IMP_H
+#define RTE_COROUTINE_IMP_H
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <ucontext.h>
+
+#include "rte_coroutine.h"
+
+#define MIN_STACK_SIZE 8192
+
+enum rte_costate {
+ COROUTINE_INVALID,
+ COROUTINE_READY,
+ COROUTINE_RUNNING,
+ COROUTINE_YIELD,
+ COROUTINE_SUSPEND,
+};
+
+struct rte_cocontext {
+ struct rte_schedule *owner; /**< Which scheduler this coroutine belongs. */
+ int state; /**< The current coroutine state. */
+ ucontext_t ctx;
+
+ coroutine_callback_t cb; /**< The coroutine callback function. */
+ void *arg; /**< The coroutine callback function's input argument. */
+
+ struct rte_cocontext *yield_next;
+
+ void *stack; /**< The allocated stack pointer. */
+ uint32_t stack_sz; /**< The allocated stack size. */
+};
+
+struct rte_schedule {
+ uint32_t max_coroutines; /**< Max coroutines which this scheduler supports. */
+
+ ucontext_t main;
+ struct rte_cocontext *running; /**< Current running coroutine. */
+
+ struct rte_ring *ring; /**< Command ring for schedule. */
+
+ struct rte_cocontext *yield_head; /**< Yield coroutine list for schedule. */
+ struct rte_cocontext *yield_tail;
+};
+
+#endif /* RTE_COROUTINE_IMP_H */
new file mode 100644
@@ -0,0 +1,11 @@
+EXPERIMENTAL {
+ global:
+
+ rte_co_create;
+ rte_co_delay;
+ rte_co_yield;
+ rte_schedule_create;
+ rte_schedule_run;
+
+ local: *;
+};
@@ -64,6 +64,7 @@ libraries = [
'flow_classify', # flow_classify lib depends on pkt framework table lib
'graph',
'node',
+ 'coroutine',
]
optional_libs = [