@@ -146,6 +146,7 @@ test_sources = files(
'test_tailq.c',
'test_thash.c',
'test_thash_perf.c',
+ 'test_threads.c',
'test_timer.c',
'test_timer_perf.c',
'test_timer_racecond.c',
@@ -287,6 +288,7 @@ fast_tests = [
['reorder_autotest', true],
['service_autotest', true],
['thash_autotest', true],
+ ['threads_autotest', true],
['trace_autotest', true],
]
new file mode 100644
@@ -0,0 +1,217 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright (c) 2021 Microsoft.
+ */
+
+#include <rte_thread.h>
+#include <rte_debug.h>
+
+#include "test.h"
+
+#define THREADS_COUNT 20
+
+RTE_LOG_REGISTER(threads_logtype_test, test.threads, INFO);
+
+static void *
+thread_loop_self(void *arg)
+{
+ rte_thread_t *id = arg;
+
+ *id = rte_thread_self();
+
+ return NULL;
+}
+
+static int
+test_thread_self(void)
+{
+ rte_thread_t threads_ids[THREADS_COUNT];
+ rte_thread_t self_ids[THREADS_COUNT] = {};
+ int ret;
+ int i;
+
+ for (i = 0; i < THREADS_COUNT; ++i) {
+ ret = rte_thread_create(&threads_ids[i], NULL, thread_loop_self,
+ &self_ids[i]);
+ RTE_TEST_ASSERT(ret == 0, "Failed to create threads!");
+ }
+
+ for (i = 0; i < THREADS_COUNT; ++i) {
+ RTE_TEST_ASSERT(rte_thread_join(threads_ids[i], NULL) == 0, "Failed to join thread!");
+ RTE_TEST_ASSERT_EQUAL(threads_ids[i].opaque_id,
+ self_ids[i].opaque_id, "Unexpected thread id!");
+ }
+
+ return 0;
+}
+
+struct thread_affinity_ctx {
+ rte_cpuset_t *cpuset;
+ unsigned int result;
+};
+
+static void *
+thread_loop_attributes_affinity(void *arg)
+{
+ struct thread_affinity_ctx *ctx = arg;
+ rte_cpuset_t cpuset;
+ size_t i;
+
+ ctx->result = 0;
+
+ CPU_ZERO(&cpuset);
+ if (rte_thread_get_affinity_by_id(rte_thread_self(), &cpuset) != 0) {
+ ctx->result = 1;
+ rte_log(RTE_LOG_DEBUG, threads_logtype_test, "Failed to get thread affinity!");
+ return NULL;
+ }
+
+ /*
+ * Check that the thread is not running on CPUs which were not
+ * specified in the affinity mask. Note that the CPU mask
+ * retrieved above can be different than the original mask specified
+ * with rte_thread_attr_set_affinity(), since some CPUs may not be
+ * available on the system.
+ */
+ for (i = 0; i < CPU_SETSIZE; ++i) {
+ if (!CPU_ISSET(i, ctx->cpuset) && CPU_ISSET(i, &cpuset)) {
+ ctx->result = 1;
+ rte_log(RTE_LOG_DEBUG, threads_logtype_test, "CPU %zu should not be set for this thread!\n",
+ i);
+ return NULL;
+ }
+ }
+
+ return NULL;
+}
+
+static int
+test_thread_attributes_affinity(void)
+{
+ rte_thread_t threads_ids[THREADS_COUNT];
+ struct thread_affinity_ctx ctx[THREADS_COUNT] = {};
+ rte_thread_attr_t attr;
+ rte_cpuset_t cpuset;
+ size_t i;
+ int ret = 0;
+
+ ret = rte_thread_attr_init(&attr);
+ RTE_TEST_ASSERT(ret == 0, "Failed to initialize thread attributes!");
+
+ CPU_ZERO(&cpuset);
+ ret = rte_thread_get_affinity_by_id(rte_thread_self(), &cpuset);
+ RTE_TEST_ASSERT(ret == 0, "Failed to get main thread affinity!");
+
+ ret = rte_thread_attr_set_affinity(&attr, &cpuset);
+ RTE_TEST_ASSERT(ret == 0, "Failed to set thread attributes!");
+
+ for (i = 0; i < THREADS_COUNT; ++i) {
+ ctx[i].cpuset = &cpuset;
+ ret = rte_thread_create(&threads_ids[i], &attr,
+ thread_loop_attributes_affinity, &ctx[i]);
+ RTE_TEST_ASSERT(ret == 0, "Failed to create threads!");
+ }
+
+ for (i = 0; i < THREADS_COUNT; ++i) {
+ ret = rte_thread_join(threads_ids[i], NULL);
+ RTE_TEST_ASSERT(ret == 0, "Failed to join threads!");
+
+ RTE_TEST_ASSERT_EQUAL(ctx[i].result, 0, "Unexpected thread affinity!");
+ }
+
+ return ret;
+}
+
+static void *
+thread_loop_priority(void *arg)
+{
+ int ret;
+ enum rte_thread_priority priority;
+ int *result = arg;
+
+ *result = 1;
+ ret = rte_thread_get_priority(rte_thread_self(), &priority);
+ if (ret != 0 || priority != RTE_THREAD_PRIORITY_NORMAL)
+ *result = 2;
+
+ return NULL;
+}
+
+static int
+test_thread_attributes_priority(void)
+{
+ rte_thread_t threads_ids[THREADS_COUNT];
+ rte_thread_attr_t attr;
+ size_t i;
+ int ret = 0;
+ int results[THREADS_COUNT] = {};
+
+ ret = rte_thread_attr_init(&attr);
+ RTE_TEST_ASSERT(ret == 0, "Failed to initialize thread attributes!");
+
+ ret = rte_thread_attr_set_priority(&attr, RTE_THREAD_PRIORITY_NORMAL);
+ RTE_TEST_ASSERT(ret == 0, "Failed to set thread priority!");
+
+ for (i = 0; i < THREADS_COUNT; ++i) {
+ ret = rte_thread_create(&threads_ids[i], &attr,
+ thread_loop_priority, &results[i]);
+ RTE_TEST_ASSERT(ret == 0, "Failed to create threads!");
+ }
+
+ for (i = 0; i < THREADS_COUNT; ++i) {
+ ret = rte_thread_join(threads_ids[i], NULL);
+ RTE_TEST_ASSERT(ret == 0, "Failed to join threads!");
+
+ RTE_TEST_ASSERT_EQUAL(results[i], 1, "Unexpected priority value!");
+ }
+
+ return ret;
+}
+
+static void *
+thread_loop_return(void *arg)
+{
+ RTE_SET_USED(arg);
+ return NULL;
+}
+
+static int
+test_thread_detach(void)
+{
+ rte_thread_t threads_ids[THREADS_COUNT];
+ size_t i;
+ int ret = 0;
+
+ for (i = 0; i < THREADS_COUNT; ++i) {
+ ret = rte_thread_create(&threads_ids[i], NULL,
+ thread_loop_return, NULL);
+ RTE_TEST_ASSERT(ret == 0, "Failed to create threads!");
+ }
+
+ for (i = 0; i < THREADS_COUNT; ++i) {
+ ret = rte_thread_detach(threads_ids[i]);
+ RTE_TEST_ASSERT(ret == 0, "Failed to detach thread!");
+ }
+
+ return ret;
+}
+
+static struct unit_test_suite threads_test_suite = {
+ .suite_name = "threads autotest",
+ .setup = NULL,
+ .teardown = NULL,
+ .unit_test_cases = {
+ TEST_CASE(test_thread_self),
+ TEST_CASE(test_thread_attributes_affinity),
+ TEST_CASE(test_thread_attributes_priority),
+ TEST_CASE(test_thread_detach),
+ TEST_CASES_END()
+ }
+};
+
+static int
+test_threads(void)
+{
+ return unit_test_suite_runner(&threads_test_suite);
+}
+
+REGISTER_TEST_COMMAND(threads_autotest, test_threads);
@@ -198,6 +198,117 @@ rte_thread_attr_set_priority(rte_thread_attr_t *thread_attr,
return 0;
}
+int
+rte_thread_create(rte_thread_t *thread_id,
+ const rte_thread_attr_t *thread_attr,
+ rte_thread_func thread_func, void *args)
+{
+ int ret = 0;
+ pthread_attr_t attr;
+ pthread_attr_t *attrp = NULL;
+ struct sched_param param = {
+ .sched_priority = 0,
+ };
+ int policy = SCHED_OTHER;
+
+ if (thread_attr != NULL) {
+ ret = pthread_attr_init(&attr);
+ if (ret != 0) {
+ RTE_LOG(DEBUG, EAL, "pthread_attr_init failed\n");
+ goto cleanup;
+ }
+
+ attrp = &attr;
+
+ /*
+ * Set the inherit scheduler parameter to explicit,
+ * otherwise the priority attribute is ignored.
+ */
+ ret = pthread_attr_setinheritsched(attrp,
+ PTHREAD_EXPLICIT_SCHED);
+ if (ret != 0) {
+ RTE_LOG(DEBUG, EAL, "pthread_attr_setinheritsched failed\n");
+ goto cleanup;
+ }
+
+
+/* Realtime priority can cause crashes on non-Windows platforms. */
+#ifndef RTE_EXEC_ENV_WINDOWS
+ if (thread_attr->priority ==
+ RTE_THREAD_PRIORITY_REALTIME_CRITICAL) {
+ ret = ENOTSUP;
+ goto cleanup;
+ }
+#endif
+ ret = thread_map_priority_to_os_value(thread_attr->priority,
+ ¶m.sched_priority, &policy);
+ if (ret != 0)
+ goto cleanup;
+
+ ret = pthread_attr_setschedpolicy(attrp, policy);
+ if (ret != 0) {
+ RTE_LOG(DEBUG, EAL, "pthread_attr_setschedpolicy failed\n");
+ goto cleanup;
+ }
+
+ ret = pthread_attr_setschedparam(attrp, ¶m);
+ if (ret != 0) {
+ RTE_LOG(DEBUG, EAL, "pthread_attr_setschedparam failed\n");
+ goto cleanup;
+ }
+ }
+
+ ret = pthread_create((pthread_t *)&thread_id->opaque_id, attrp,
+ thread_func, args);
+ if (ret != 0) {
+ RTE_LOG(DEBUG, EAL, "pthread_create failed\n");
+ goto cleanup;
+ }
+
+ if (thread_attr != NULL && CPU_COUNT(&thread_attr->cpuset) > 0) {
+ ret = rte_thread_set_affinity_by_id(*thread_id,
+ &thread_attr->cpuset);
+ if (ret != 0) {
+ RTE_LOG(DEBUG, EAL, "rte_thread_set_affinity_by_id failed\n");
+ goto cleanup;
+ }
+ }
+
+cleanup:
+ if (attrp != NULL)
+ pthread_attr_destroy(&attr);
+
+ return ret;
+}
+
+int
+rte_thread_join(rte_thread_t thread_id, unsigned long *value_ptr)
+{
+ int ret = 0;
+ void *res = NULL;
+ void **pres = NULL;
+
+ if (value_ptr != NULL)
+ pres = &res;
+
+ ret = pthread_join((pthread_t)thread_id.opaque_id, pres);
+ if (ret != 0) {
+ RTE_LOG(DEBUG, EAL, "pthread_join failed\n");
+ return ret;
+ }
+
+ if (value_ptr != NULL && *pres != NULL)
+ *value_ptr = *(unsigned long *)(*pres);
+
+ return 0;
+}
+
+int
+rte_thread_detach(rte_thread_t thread_id)
+{
+ return pthread_detach((pthread_t)thread_id.opaque_id);
+}
+
int
rte_thread_key_create(rte_thread_key *key, void (*destructor)(void *))
{
@@ -31,6 +31,7 @@ typedef struct rte_thread_tag {
uintptr_t opaque_id; /**< thread identifier */
} rte_thread_t;
+typedef void* (*rte_thread_func) (void *);
/**
* Thread priority values.
*/
@@ -211,6 +212,60 @@ int rte_thread_set_affinity(rte_cpuset_t *cpusetp);
*/
void rte_thread_get_affinity(rte_cpuset_t *cpusetp);
+/**
+ * Create a new thread that will invoke the 'thread_func' routine.
+ *
+ * @param thread_id
+ * A pointer that will store the id of the newly created thread.
+ *
+ * @param thread_attr
+ * Attributes that are used at the creation of the new thread.
+ *
+ * @param thread_func
+ * The routine that the new thread will invoke when starting execution.
+ *
+ * @param args
+ * Arguments to be passed to the 'thread_func' routine.
+ *
+ * @return
+ * On success, return 0.
+ * On failure, return a positive errno-style error number.
+ */
+__rte_experimental
+int rte_thread_create(rte_thread_t *thread_id,
+ const rte_thread_attr_t *thread_attr,
+ rte_thread_func thread_func, void *args);
+
+/**
+ * Waits for the thread identified by 'thread_id' to terminate
+ *
+ * @param thread_id
+ * The identifier of the thread.
+ *
+ * @param value_ptr
+ * Stores the exit status of the thread.
+ *
+ * @return
+ * On success, return 0.
+ * On failure, return a positive errno-style error number.
+ */
+__rte_experimental
+int rte_thread_join(rte_thread_t thread_id, unsigned long *value_ptr);
+
+/**
+ * Indicate that the return value of the thread is not needed and
+ * all thread resources should be release when the thread terminates.
+ *
+ * @param thread_id
+ * The id of the thread to be detached.
+ *
+ * @return
+ * On success, return 0.
+ * On failure, return a positive errno-style error number.
+ */
+__rte_experimental
+int rte_thread_detach(rte_thread_t thread_id);
+
#endif /* RTE_HAS_CPUSET */
/**
@@ -431,6 +431,9 @@ EXPERIMENTAL {
rte_thread_set_affinity_by_id;
rte_thread_get_priority;
rte_thread_set_priority;
+ rte_thread_create;
+ rte_thread_join;
+ rte_thread_detach;
};
INTERNAL {
@@ -44,7 +44,7 @@ typedef struct _rte_cpuset_s {
(1LL << _WHICH_BIT(b))) != 0LL)
static inline int
-count_cpu(rte_cpuset_t *s)
+count_cpu(const rte_cpuset_t *s)
{
unsigned int _i;
int count = 0;
@@ -14,6 +14,11 @@ struct eal_tls_key {
DWORD thread_index;
};
+struct thread_routine_ctx {
+ rte_thread_func thread_func;
+ void *routine_args;
+};
+
/* Translates the most common error codes related to threads */
static int
thread_translate_win32_error(DWORD error)
@@ -370,6 +375,135 @@ rte_thread_attr_set_priority(rte_thread_attr_t *thread_attr,
return 0;
}
+static DWORD
+thread_func_wrapper(void *args)
+{
+ struct thread_routine_ctx *pctx = args;
+ struct thread_routine_ctx ctx;
+
+ ctx.thread_func = pctx->thread_func;
+ ctx.routine_args = pctx->routine_args;
+
+ free(pctx);
+
+ return (DWORD)(uintptr_t)ctx.thread_func(ctx.routine_args);
+}
+
+int
+rte_thread_create(rte_thread_t *thread_id,
+ const rte_thread_attr_t *thread_attr,
+ rte_thread_func thread_func, void *args)
+{
+ int ret = 0;
+ DWORD tid;
+ HANDLE thread_handle = NULL;
+ GROUP_AFFINITY thread_affinity;
+ struct thread_routine_ctx *ctx = NULL;
+
+ ctx = calloc(1, sizeof(*ctx));
+ if (ctx == NULL) {
+ RTE_LOG(DEBUG, EAL, "Insufficient memory for thread context allocations\n");
+ ret = ENOMEM;
+ goto cleanup;
+ }
+ ctx->routine_args = args;
+ ctx->thread_func = thread_func;
+
+ thread_handle = CreateThread(NULL, 0, thread_func_wrapper, ctx,
+ CREATE_SUSPENDED, &tid);
+ if (thread_handle == NULL) {
+ ret = thread_log_last_error("CreateThread()");
+ free(ctx);
+ goto cleanup;
+ }
+ thread_id->opaque_id = tid;
+
+ if (thread_attr != NULL) {
+ if (CPU_COUNT(&thread_attr->cpuset) > 0) {
+ ret = rte_convert_cpuset_to_affinity(
+ &thread_attr->cpuset,
+ &thread_affinity
+ );
+ if (ret != 0) {
+ RTE_LOG(DEBUG, EAL, "Unable to convert cpuset to thread affinity\n");
+ goto cleanup;
+ }
+
+ if (!SetThreadGroupAffinity(thread_handle,
+ &thread_affinity, NULL)) {
+ ret = thread_log_last_error("SetThreadGroupAffinity()");
+ goto cleanup;
+ }
+ }
+ ret = rte_thread_set_priority(*thread_id,
+ thread_attr->priority);
+ if (ret != 0) {
+ RTE_LOG(DEBUG, EAL, "Unable to set thread priority\n");
+ goto cleanup;
+ }
+ }
+
+ if (ResumeThread(thread_handle) == (DWORD)-1) {
+ ret = thread_log_last_error("ResumeThread()");
+ goto cleanup;
+ }
+
+cleanup:
+ if (thread_handle != NULL) {
+ CloseHandle(thread_handle);
+ thread_handle = NULL;
+ }
+ return ret;
+}
+
+int
+rte_thread_join(rte_thread_t thread_id, unsigned long *value_ptr)
+{
+ HANDLE thread_handle;
+ DWORD result;
+ DWORD exit_code = 0;
+ BOOL err;
+ int ret = 0;
+
+ thread_handle = OpenThread(SYNCHRONIZE | THREAD_QUERY_INFORMATION,
+ FALSE, thread_id.opaque_id);
+ if (thread_handle == NULL) {
+ ret = thread_log_last_error("OpenThread()");
+ goto cleanup;
+ }
+
+ result = WaitForSingleObject(thread_handle, INFINITE);
+ if (result != WAIT_OBJECT_0) {
+ ret = thread_log_last_error("WaitForSingleObject()");
+ goto cleanup;
+ }
+
+ if (value_ptr != NULL) {
+ err = GetExitCodeThread(thread_handle, &exit_code);
+ if (err == 0) {
+ ret = thread_log_last_error("GetExitCodeThread()");
+ goto cleanup;
+ }
+ *value_ptr = exit_code;
+ }
+
+cleanup:
+ if (thread_handle != NULL) {
+ CloseHandle(thread_handle);
+ thread_handle = NULL;
+ }
+
+ return ret;
+}
+
+int
+rte_thread_detach(rte_thread_t thread_id)
+{
+ /* No resources that need to be released. */
+ RTE_SET_USED(thread_id);
+ return 0;
+}
+
int
rte_thread_key_create(rte_thread_key *key,
__rte_unused void (*destructor)(void *))