[RFC,1/3] lib/coroutine: add coroutine library

Message ID 20230424130208.9517-2-fengchengwen@huawei.com (mailing list archive)
State Rejected, archived
Delegated to: Ferruh Yigit
Headers
Series introduce coroutine library |

Checks

Context Check Description
ci/checkpatch warning coding style issues

Commit Message

fengchengwen April 24, 2023, 1:02 p.m. UTC
  This patch adds coroutine library. The main elements are:
1. scheduler: container of coroutines, which is responsible for
scheduling coroutine.
2. coroutine: Minimum scheduling unit, it should associated to one
scheduler.

In the coroutine callback, application could invoke rte_co_yield() to
give up the CPU, and invoke rte_co_delay() to delay the specified
microsecond.

Signed-off-by: Chengwen Feng <fengchengwen@huawei.com>
---
 lib/coroutine/meson.build         |   8 ++
 lib/coroutine/rte_coroutine.c     | 190 ++++++++++++++++++++++++++++++
 lib/coroutine/rte_coroutine.h     | 110 +++++++++++++++++
 lib/coroutine/rte_coroutine_imp.h |  46 ++++++++
 lib/coroutine/version.map         |  11 ++
 lib/meson.build                   |   1 +
 6 files changed, 366 insertions(+)
 create mode 100644 lib/coroutine/meson.build
 create mode 100644 lib/coroutine/rte_coroutine.c
 create mode 100644 lib/coroutine/rte_coroutine.h
 create mode 100644 lib/coroutine/rte_coroutine_imp.h
 create mode 100644 lib/coroutine/version.map
  

Comments

Ferruh Yigit April 26, 2023, 11:28 a.m. UTC | #1
On 4/24/2023 2:02 PM, Chengwen Feng wrote:
> This patch adds coroutine library. The main elements are:
> 1. scheduler: container of coroutines, which is responsible for
> scheduling coroutine.
> 2. coroutine: Minimum scheduling unit, it should associated to one
> scheduler.
> 
> In the coroutine callback, application could invoke rte_co_yield() to
> give up the CPU, and invoke rte_co_delay() to delay the specified
> microsecond.
> 
> Signed-off-by: Chengwen Feng <fengchengwen@huawei.com>

<...>

> +++ b/lib/coroutine/rte_coroutine.c
> @@ -0,0 +1,190 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2023 HiSilicon Limited
> + */
> +
> +#include <pthread.h>
> +#include <stdio.h>
> +#include <stdlib.h>
> +#include <ucontext.h>
> +
> +#include <rte_alarm.h>
> +#include <rte_ring.h>
> +
> +#include "rte_coroutine.h"
> +#include "rte_coroutine_imp.h"
> +
> +#define FATAL(fmt, args...) printf("[FATAL] %s() %d: " fmt "\n", __func__, __LINE__, ##args)
> +

No 'printf' for logging please.

<...>

> +int
> +rte_schedule_run(struct rte_schedule *s)
> +{
> +	struct rte_cocontext *co = NULL;
> +	uintptr_t ptr;
> +
> +	/* Set local thread variable as input argument. */
> +	co_schedule = s;
> +
> +	while (!rte_ring_empty(s->ring)) {
> +		rte_ring_dequeue(s->ring, (void **)&co);
> +		if (co->state == COROUTINE_READY) {
> +			getcontext(&co->ctx);
> +			co->ctx.uc_stack.ss_sp = co->stack;
> +			co->ctx.uc_stack.ss_size = co->stack_sz;
> +			co->ctx.uc_link = &s->main;
> +			co->state = COROUTINE_RUNNING;
> +			s->running = co;
> +			ptr = (uintptr_t)co;
> +			makecontext(&co->ctx, (void (*)(void))co_run_func, 2,
> +				    (uint32_t)ptr, (uint32_t)(ptr >> 32));

Why passing 'co' address as two 32bit values? Why not just pass address
of it?

Also can it be possible to set context to 'co->cb()' here and do the
cleanup after context returned to 's->main' (below)?

> +			swapcontext(&s->main, &co->ctx);
> +		} else if (co->state == COROUTINE_SUSPEND) {
> +			co->state = COROUTINE_RUNNING;
> +			s->running = co;
> +			swapcontext(&s->main, &co->ctx);
> +		} else {
> +			FATAL("invalid state!");
> +		}
> +	}
> +
> +	while (s->yield_head != NULL) {
> +		co = s->yield_head;
> +		s->yield_head = co->yield_next;
> +		if (co->state == COROUTINE_YIELD) {
> +			co->state = COROUTINE_RUNNING;
> +			s->running = co;
> +			swapcontext(&s->main, &co->ctx);
> +		} else {
> +			FATAL("invalid yield state!");
> +		}
> +	}
> +

As coroutines in 'ready' state stored in a ring, same can be done for
the coroutines in 'yield' state, instead of using a linked list, to
simplify the flow. Just a question, both works fine.

> +	return 0;
> +}
> +
> +int
> +rte_co_create(struct rte_schedule *s, coroutine_callback_t cb, void *arg, uint32_t stack_sz)
> +{
> +	struct rte_cocontext *co = calloc(1, sizeof(struct rte_cocontext));
> +	int ret;
> +	if (co == NULL)
> +		return -ENOMEM;
> +

I guess somewhere here should check the 's->max_coroutines';
  

Patch

diff --git a/lib/coroutine/meson.build b/lib/coroutine/meson.build
new file mode 100644
index 0000000000..2064fb1909
--- /dev/null
+++ b/lib/coroutine/meson.build
@@ -0,0 +1,8 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 HiSilicon Limited.
+
+sources = files('rte_coroutine.c')
+headers = files('rte_coroutine.h')
+indirect_headers += files('rte_coroutine_imp.h')
+
+deps += ['ring']
diff --git a/lib/coroutine/rte_coroutine.c b/lib/coroutine/rte_coroutine.c
new file mode 100644
index 0000000000..07c79fc901
--- /dev/null
+++ b/lib/coroutine/rte_coroutine.c
@@ -0,0 +1,190 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 HiSilicon Limited
+ */
+
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <ucontext.h>
+
+#include <rte_alarm.h>
+#include <rte_ring.h>
+
+#include "rte_coroutine.h"
+#include "rte_coroutine_imp.h"
+
+#define FATAL(fmt, args...) printf("[FATAL] %s() %d: " fmt "\n", __func__, __LINE__, ##args)
+
+static __thread struct rte_schedule *co_schedule;
+
+struct rte_schedule *
+rte_schedule_create(const char *name, uint32_t max_coroutines)
+{
+	struct rte_schedule *s = calloc(1, sizeof(struct rte_schedule));
+	if (s == NULL)
+		return NULL;
+
+	s->ring = rte_ring_create(name, max_coroutines, rte_socket_id(),
+				  RING_F_SC_DEQ);
+	if (s->ring == NULL) {
+		free(s);
+		return NULL;
+	}
+
+	s->max_coroutines = max_coroutines;
+
+	return s;
+}
+
+static void
+co_run_func(uint32_t low, uint32_t hi)
+{
+	uintptr_t ptr = (uint64_t)low | ((uint64_t)hi << 32);
+	struct rte_cocontext *co = (struct rte_cocontext *)ptr;
+	co->cb(co->arg);
+	/* Run complete, so free it. */
+	free(co->stack);
+	free(co);
+}
+
+int
+rte_schedule_run(struct rte_schedule *s)
+{
+	struct rte_cocontext *co = NULL;
+	uintptr_t ptr;
+
+	/* Set local thread variable as input argument. */
+	co_schedule = s;
+
+	while (!rte_ring_empty(s->ring)) {
+		rte_ring_dequeue(s->ring, (void **)&co);
+		if (co->state == COROUTINE_READY) {
+			getcontext(&co->ctx);
+			co->ctx.uc_stack.ss_sp = co->stack;
+			co->ctx.uc_stack.ss_size = co->stack_sz;
+			co->ctx.uc_link = &s->main;
+			co->state = COROUTINE_RUNNING;
+			s->running = co;
+			ptr = (uintptr_t)co;
+			makecontext(&co->ctx, (void (*)(void))co_run_func, 2,
+				    (uint32_t)ptr, (uint32_t)(ptr >> 32));
+			swapcontext(&s->main, &co->ctx);
+		} else if (co->state == COROUTINE_SUSPEND) {
+			co->state = COROUTINE_RUNNING;
+			s->running = co;
+			swapcontext(&s->main, &co->ctx);
+		} else {
+			FATAL("invalid state!");
+		}
+	}
+
+	while (s->yield_head != NULL) {
+		co = s->yield_head;
+		s->yield_head = co->yield_next;
+		if (co->state == COROUTINE_YIELD) {
+			co->state = COROUTINE_RUNNING;
+			s->running = co;
+			swapcontext(&s->main, &co->ctx);
+		} else {
+			FATAL("invalid yield state!");
+		}
+	}
+
+	return 0;
+}
+
+int
+rte_co_create(struct rte_schedule *s, coroutine_callback_t cb, void *arg, uint32_t stack_sz)
+{
+	struct rte_cocontext *co = calloc(1, sizeof(struct rte_cocontext));
+	int ret;
+	if (co == NULL)
+		return -ENOMEM;
+
+	co->owner = s;
+	co->state = COROUTINE_READY;
+	co->cb = cb;
+	co->arg = arg;
+	if (stack_sz < MIN_STACK_SIZE)
+		stack_sz = MIN_STACK_SIZE;
+	co->stack_sz = stack_sz;
+	co->stack = calloc(1, stack_sz);
+	if (co->stack == NULL) {
+		free(co);
+		return -ENOMEM;
+	}
+
+	ret = rte_ring_enqueue(s->ring, co);
+	if (ret != 0) {
+		free(co->stack);
+		free(co);
+	}
+
+	return ret;
+}
+
+static inline void
+co_addto_yield_list(struct rte_schedule *s, struct rte_cocontext *co)
+{
+	co->yield_next = NULL;
+	if (s->yield_head == NULL) {
+		s->yield_head = s->yield_tail = co;
+	} else {
+		s->yield_tail->yield_next = co;
+		s->yield_tail = co;
+	}
+}
+
+void
+rte_co_yield(void)
+{
+	struct rte_schedule *s = co_schedule;
+	struct rte_cocontext *co;
+	if (s == NULL) {
+		FATAL("thread co_schedule is NULL!");
+		return;
+	}
+	co = s->running;
+	if (co == NULL) {
+		FATAL("running is NULL!");
+		return;
+	}
+	co->state = COROUTINE_YIELD;
+	s->running = NULL;
+	co_addto_yield_list(s, co);
+	swapcontext(&co->ctx, &s->main);
+}
+
+static void
+co_delay_imp(void *arg)
+{
+	struct rte_cocontext *co = (struct rte_cocontext *)arg;
+	int ret;
+
+	ret = rte_ring_enqueue(co->owner->ring, (void *)co);
+	if (ret != 0)
+		FATAL("enqueue failed!");
+}
+
+void
+rte_co_delay(unsigned int us)
+{
+	struct rte_schedule *s = co_schedule;
+	struct rte_cocontext *co;
+
+	if (s == NULL) {
+		FATAL("thread co_schedule is NULL!");
+		return;
+	}
+
+	co = s->running;
+	if (co == NULL) {
+		FATAL("running is NULL!");
+		return;
+	}
+
+	rte_eal_alarm_set(us, co_delay_imp, (void *)co);
+	co->state = COROUTINE_SUSPEND;
+	s->running = NULL;
+	swapcontext(&co->ctx, &s->main);
+}
diff --git a/lib/coroutine/rte_coroutine.h b/lib/coroutine/rte_coroutine.h
new file mode 100644
index 0000000000..71ac9488b6
--- /dev/null
+++ b/lib/coroutine/rte_coroutine.h
@@ -0,0 +1,110 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 HiSilicon Limited
+ */
+
+#ifndef RTE_COROUTINE_H
+#define RTE_COROUTINE_H
+
+#include <stdint.h>
+
+#include <rte_compat.h>
+
+/**
+ * Callback prototype of coroutine.
+ *
+ * @param arg
+ *   An arg pointer coming from the caller.
+ */
+typedef void (*coroutine_callback_t)(void *arg);
+
+struct rte_schedule;
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice.
+ *
+ * Create coroutine scheduler.
+ *
+ * The scheduler is a coroutines container, which could schedule coroutine
+ * running.
+ *
+ * @param name
+ *   The unique name of scheduler.
+ * @param max_coroutines
+ *   Maximum number of coroutines that can be processed by the scheduler.
+ *
+ * @return
+ *   Non-NULL on success. Otherwise NULL value is returned.
+ */
+__rte_experimental
+struct rte_schedule *rte_schedule_create(const char *name, uint32_t max_coroutines);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice.
+ *
+ * Run the coroutine scheduler.
+ *
+ * This function will schedule all associated coroutine, it will return zero if
+ * no coroutines are active after scheduled.
+ *
+ * @param s
+ *   The pointer of the scheduler.
+ *
+ * @return
+ *   0 on success. Otherwise negative value is returned.
+ */
+__rte_experimental
+int rte_schedule_run(struct rte_schedule *s);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice.
+ *
+ * Create one coroutine.
+ *
+ * This function will create one coroutine which associated target scheduler
+ * 's'.
+ *
+ * @param s
+ *   The pointer of the scheduler.
+ * @param cb
+ *   The callback function.
+ * @param arg
+ *   The argument parameter for callback function.
+ * @param stack_sz
+ *   The stack size which associated to coroutine. The value zero indicates that
+ *   the default size (which is 8KB) is used.
+ *
+ * @return
+ *   0 on success. Otherwise negative value is returned.
+ */
+__rte_experimental
+int rte_co_create(struct rte_schedule *s, coroutine_callback_t cb, void *arg, uint32_t stack_sz);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice.
+ *
+ * Coroutine yield the CPU.
+ *
+ * This function yield the cpu of current coroutine.
+ */
+__rte_experimental
+void rte_co_yield(void);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice.
+ *
+ * Coroutine delay function.
+ *
+ * This function support delay microseconds of current coroutine.
+ *
+ * @param us
+ *   The time in microseconds which need delay.
+ */
+__rte_experimental
+void rte_co_delay(unsigned int us);
+
+#endif /* RTE_COROUTINE_H */
diff --git a/lib/coroutine/rte_coroutine_imp.h b/lib/coroutine/rte_coroutine_imp.h
new file mode 100644
index 0000000000..70b4f19670
--- /dev/null
+++ b/lib/coroutine/rte_coroutine_imp.h
@@ -0,0 +1,46 @@ 
+#ifndef RTE_COROUTINE_IMP_H
+#define RTE_COROUTINE_IMP_H
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <ucontext.h>
+
+#include "rte_coroutine.h"
+
+#define MIN_STACK_SIZE	8192
+
+enum rte_costate {
+	COROUTINE_INVALID,
+	COROUTINE_READY,
+	COROUTINE_RUNNING,
+	COROUTINE_YIELD,
+	COROUTINE_SUSPEND,
+};
+
+struct rte_cocontext {
+	struct rte_schedule *owner; /**< Which scheduler this coroutine belongs. */
+	int state; /**< The current coroutine state. */
+	ucontext_t ctx;
+
+	coroutine_callback_t cb; /**< The coroutine callback function. */
+	void *arg; /**< The coroutine callback function's input argument. */
+
+	struct rte_cocontext *yield_next;
+
+	void *stack; /**< The allocated stack pointer. */
+	uint32_t stack_sz; /**< The allocated stack size. */
+};
+
+struct rte_schedule {
+	uint32_t max_coroutines; /**< Max coroutines which this scheduler supports. */
+
+	ucontext_t main;
+	struct rte_cocontext *running; /**< Current running coroutine. */
+
+	struct rte_ring *ring; /**< Command ring for schedule. */
+
+	struct rte_cocontext *yield_head; /**< Yield coroutine list for schedule. */
+	struct rte_cocontext *yield_tail;
+};
+
+#endif /* RTE_COROUTINE_IMP_H */
diff --git a/lib/coroutine/version.map b/lib/coroutine/version.map
new file mode 100644
index 0000000000..393b8979a6
--- /dev/null
+++ b/lib/coroutine/version.map
@@ -0,0 +1,11 @@ 
+EXPERIMENTAL {
+	global:
+
+	rte_co_create;
+	rte_co_delay;
+	rte_co_yield;
+	rte_schedule_create;
+	rte_schedule_run;
+
+	local: *;
+};
diff --git a/lib/meson.build b/lib/meson.build
index dc8aa4ac84..50e41f1511 100644
--- a/lib/meson.build
+++ b/lib/meson.build
@@ -64,6 +64,7 @@  libraries = [
         'flow_classify', # flow_classify lib depends on pkt framework table lib
         'graph',
         'node',
+        'coroutine',
 ]
 
 optional_libs = [