[v18,6/8] eal: add thread lifetime management

Message ID 1636594425-9692-7-git-send-email-navasile@linux.microsoft.com (mailing list archive)
State Superseded, archived
Delegated to: David Marchand
Headers
Series eal: Add EAL API for threading |

Checks

Context Check Description
ci/checkpatch warning coding style issues

Commit Message

Narcisa Ana Maria Vasile Nov. 11, 2021, 1:33 a.m. UTC
  From: Narcisa Vasile <navasile@microsoft.com>

Add functions for thread creation, joining, detaching.

The *rte_thread_create()* function can optionally receive
an rte_thread_attr_t object that will cause the thread to be
created with the affinity and priority described by the
attributes object. If no rte_thread_attr_t is passed (parameter is NULL),
the default affinity and priority are used.

On Windows, the function executed by a thread when the thread starts is
represeneted by a function pointer of type DWORD (*func) (void*).
On other platforms, the function pointer is a void* (*func) (void*).

Performing a cast between these two types of function pointers to
uniformize the API on all platforms may result in undefined behavior.
TO fix this issue, a wrapper that respects the signature required by
CreateThread() has been created on Windows.

Add unit tests:
 - verify rte_thread_self() correctly retrieves the thread id.
 - verify that affinity and priority can be set successfully.
 - verify that threads are created and cleaned up correctly.

Signed-off-by: Narcisa Vasile <navasile@microsoft.com>
---
 app/test/meson.build            |   2 +
 app/test/test_threads.c         | 217 ++++++++++++++++++++++++++++++++
 lib/eal/common/rte_thread.c     | 111 ++++++++++++++++
 lib/eal/include/rte_thread.h    |  55 ++++++++
 lib/eal/version.map             |   3 +
 lib/eal/windows/include/sched.h |   2 +-
 lib/eal/windows/rte_thread.c    | 134 ++++++++++++++++++++
 7 files changed, 523 insertions(+), 1 deletion(-)
 create mode 100644 app/test/test_threads.c
  

Patch

diff --git a/app/test/meson.build b/app/test/meson.build
index 96670c3504..9fd34459e9 100644
--- a/app/test/meson.build
+++ b/app/test/meson.build
@@ -146,6 +146,7 @@  test_sources = files(
         'test_tailq.c',
         'test_thash.c',
         'test_thash_perf.c',
+        'test_threads.c',
         'test_timer.c',
         'test_timer_perf.c',
         'test_timer_racecond.c',
@@ -287,6 +288,7 @@  fast_tests = [
         ['reorder_autotest', true],
         ['service_autotest', true],
         ['thash_autotest', true],
+        ['threads_autotest', true],
         ['trace_autotest', true],
 ]
 
diff --git a/app/test/test_threads.c b/app/test/test_threads.c
new file mode 100644
index 0000000000..9fcae34179
--- /dev/null
+++ b/app/test/test_threads.c
@@ -0,0 +1,217 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright (c) 2021 Microsoft.
+ */
+
+#include <rte_thread.h>
+#include <rte_debug.h>
+
+#include "test.h"
+
+#define THREADS_COUNT 20
+
+RTE_LOG_REGISTER(threads_logtype_test, test.threads, INFO);
+
+static void *
+thread_loop_self(void *arg)
+{
+	rte_thread_t *id = arg;
+
+	*id = rte_thread_self();
+
+	return NULL;
+}
+
+static int
+test_thread_self(void)
+{
+	rte_thread_t threads_ids[THREADS_COUNT];
+	rte_thread_t self_ids[THREADS_COUNT] = {};
+	int ret;
+	int i;
+
+	for (i = 0; i < THREADS_COUNT; ++i) {
+		ret = rte_thread_create(&threads_ids[i], NULL, thread_loop_self,
+				&self_ids[i]);
+		RTE_TEST_ASSERT(ret == 0, "Failed to create threads!");
+	}
+
+	for (i = 0; i < THREADS_COUNT; ++i) {
+		RTE_TEST_ASSERT(rte_thread_join(threads_ids[i], NULL) == 0, "Failed to join thread!");
+		RTE_TEST_ASSERT_EQUAL(threads_ids[i].opaque_id,
+				self_ids[i].opaque_id, "Unexpected thread id!");
+	}
+
+	return 0;
+}
+
+struct thread_affinity_ctx {
+	rte_cpuset_t *cpuset;
+	unsigned int result;
+};
+
+static void *
+thread_loop_attributes_affinity(void *arg)
+{
+	struct thread_affinity_ctx *ctx = arg;
+	rte_cpuset_t cpuset;
+	size_t i;
+
+	ctx->result = 0;
+
+	CPU_ZERO(&cpuset);
+	if (rte_thread_get_affinity_by_id(rte_thread_self(), &cpuset) != 0) {
+		ctx->result = 1;
+		rte_log(RTE_LOG_DEBUG, threads_logtype_test, "Failed to get thread affinity!");
+		return NULL;
+	}
+
+	/*
+	 * Check that the thread is not running on CPUs which were not
+	 * specified in the affinity mask. Note that the CPU mask
+	 * retrieved above can be different than the original mask specified
+	 * with rte_thread_attr_set_affinity(), since some CPUs may not be
+	 * available on the system.
+	 */
+	for (i = 0; i < CPU_SETSIZE; ++i) {
+		if (!CPU_ISSET(i, ctx->cpuset) && CPU_ISSET(i, &cpuset)) {
+			ctx->result = 1;
+			rte_log(RTE_LOG_DEBUG, threads_logtype_test, "CPU %zu should not be set for this thread!\n",
+					i);
+			return NULL;
+		}
+	}
+
+	return NULL;
+}
+
+static int
+test_thread_attributes_affinity(void)
+{
+	rte_thread_t threads_ids[THREADS_COUNT];
+	struct thread_affinity_ctx ctx[THREADS_COUNT] = {};
+	rte_thread_attr_t attr;
+	rte_cpuset_t cpuset;
+	size_t i;
+	int ret = 0;
+
+	ret = rte_thread_attr_init(&attr);
+	RTE_TEST_ASSERT(ret == 0, "Failed to initialize thread attributes!");
+
+	CPU_ZERO(&cpuset);
+	ret = rte_thread_get_affinity_by_id(rte_thread_self(), &cpuset);
+	RTE_TEST_ASSERT(ret == 0, "Failed to get main thread affinity!");
+
+	ret = rte_thread_attr_set_affinity(&attr, &cpuset);
+	RTE_TEST_ASSERT(ret == 0, "Failed to set thread attributes!");
+
+	for (i = 0; i < THREADS_COUNT; ++i) {
+		ctx[i].cpuset = &cpuset;
+		ret = rte_thread_create(&threads_ids[i], &attr,
+				thread_loop_attributes_affinity, &ctx[i]);
+		RTE_TEST_ASSERT(ret == 0, "Failed to create threads!");
+	}
+
+	for (i = 0; i < THREADS_COUNT; ++i) {
+		ret = rte_thread_join(threads_ids[i], NULL);
+		RTE_TEST_ASSERT(ret == 0, "Failed to join threads!");
+
+		RTE_TEST_ASSERT_EQUAL(ctx[i].result, 0, "Unexpected thread affinity!");
+	}
+
+	return ret;
+}
+
+static void *
+thread_loop_priority(void *arg)
+{
+	int ret;
+	enum rte_thread_priority priority;
+	int *result = arg;
+
+	*result = 1;
+	ret = rte_thread_get_priority(rte_thread_self(), &priority);
+	if (ret != 0 || priority != RTE_THREAD_PRIORITY_NORMAL)
+		*result = 2;
+
+	return NULL;
+}
+
+static int
+test_thread_attributes_priority(void)
+{
+	rte_thread_t threads_ids[THREADS_COUNT];
+	rte_thread_attr_t attr;
+	size_t i;
+	int ret = 0;
+	int results[THREADS_COUNT] = {};
+
+	ret = rte_thread_attr_init(&attr);
+	RTE_TEST_ASSERT(ret == 0, "Failed to initialize  thread attributes!");
+
+	ret = rte_thread_attr_set_priority(&attr, RTE_THREAD_PRIORITY_NORMAL);
+	RTE_TEST_ASSERT(ret == 0, "Failed to set thread priority!");
+
+	for (i = 0; i < THREADS_COUNT; ++i) {
+		ret = rte_thread_create(&threads_ids[i], &attr,
+				thread_loop_priority, &results[i]);
+		RTE_TEST_ASSERT(ret == 0, "Failed to create threads!");
+	}
+
+	for (i = 0; i < THREADS_COUNT; ++i) {
+		ret = rte_thread_join(threads_ids[i], NULL);
+		RTE_TEST_ASSERT(ret == 0, "Failed to join threads!");
+
+		RTE_TEST_ASSERT_EQUAL(results[i], 1, "Unexpected priority value!");
+	}
+
+	return ret;
+}
+
+static void *
+thread_loop_return(void *arg)
+{
+	RTE_SET_USED(arg);
+	return NULL;
+}
+
+static int
+test_thread_detach(void)
+{
+	rte_thread_t threads_ids[THREADS_COUNT];
+	size_t i;
+	int ret = 0;
+
+	for (i = 0; i < THREADS_COUNT; ++i) {
+		ret = rte_thread_create(&threads_ids[i], NULL,
+				thread_loop_return, NULL);
+		RTE_TEST_ASSERT(ret == 0, "Failed to create threads!");
+	}
+
+	for (i = 0; i < THREADS_COUNT; ++i) {
+		ret = rte_thread_detach(threads_ids[i]);
+		RTE_TEST_ASSERT(ret == 0, "Failed to detach thread!");
+	}
+
+	return ret;
+}
+
+static struct unit_test_suite threads_test_suite = {
+	.suite_name = "threads autotest",
+	.setup = NULL,
+	.teardown = NULL,
+	.unit_test_cases = {
+			TEST_CASE(test_thread_self),
+			TEST_CASE(test_thread_attributes_affinity),
+			TEST_CASE(test_thread_attributes_priority),
+			TEST_CASE(test_thread_detach),
+			TEST_CASES_END()
+	}
+};
+
+static int
+test_threads(void)
+{
+	return unit_test_suite_runner(&threads_test_suite);
+}
+
+REGISTER_TEST_COMMAND(threads_autotest, test_threads);
diff --git a/lib/eal/common/rte_thread.c b/lib/eal/common/rte_thread.c
index fc5d7c5b1a..910c39eb88 100644
--- a/lib/eal/common/rte_thread.c
+++ b/lib/eal/common/rte_thread.c
@@ -198,6 +198,117 @@  rte_thread_attr_set_priority(rte_thread_attr_t *thread_attr,
 	return 0;
 }
 
+int
+rte_thread_create(rte_thread_t *thread_id,
+		const rte_thread_attr_t *thread_attr,
+		rte_thread_func thread_func, void *args)
+{
+	int ret = 0;
+	pthread_attr_t attr;
+	pthread_attr_t *attrp = NULL;
+	struct sched_param param = {
+		.sched_priority = 0,
+	};
+	int policy = SCHED_OTHER;
+
+	if (thread_attr != NULL) {
+		ret = pthread_attr_init(&attr);
+		if (ret != 0) {
+			RTE_LOG(DEBUG, EAL, "pthread_attr_init failed\n");
+			goto cleanup;
+		}
+
+		attrp = &attr;
+
+		/*
+		 * Set the inherit scheduler parameter to explicit,
+		 * otherwise the priority attribute is ignored.
+		 */
+		ret = pthread_attr_setinheritsched(attrp,
+				PTHREAD_EXPLICIT_SCHED);
+		if (ret != 0) {
+			RTE_LOG(DEBUG, EAL, "pthread_attr_setinheritsched failed\n");
+			goto cleanup;
+		}
+
+
+/* Realtime priority can cause crashes on non-Windows platforms. */
+#ifndef RTE_EXEC_ENV_WINDOWS
+		if (thread_attr->priority ==
+				RTE_THREAD_PRIORITY_REALTIME_CRITICAL) {
+			ret = ENOTSUP;
+			goto cleanup;
+		}
+#endif
+		ret = thread_map_priority_to_os_value(thread_attr->priority,
+				&param.sched_priority, &policy);
+		if (ret != 0)
+			goto cleanup;
+
+		ret = pthread_attr_setschedpolicy(attrp, policy);
+		if (ret != 0) {
+			RTE_LOG(DEBUG, EAL, "pthread_attr_setschedpolicy failed\n");
+			goto cleanup;
+		}
+
+		ret = pthread_attr_setschedparam(attrp, &param);
+		if (ret != 0) {
+			RTE_LOG(DEBUG, EAL, "pthread_attr_setschedparam failed\n");
+			goto cleanup;
+		}
+	}
+
+	ret = pthread_create((pthread_t *)&thread_id->opaque_id, attrp,
+		thread_func, args);
+	if (ret != 0) {
+		RTE_LOG(DEBUG, EAL, "pthread_create failed\n");
+		goto cleanup;
+	}
+
+	if (thread_attr != NULL && CPU_COUNT(&thread_attr->cpuset) > 0) {
+		ret = rte_thread_set_affinity_by_id(*thread_id,
+			&thread_attr->cpuset);
+		if (ret != 0) {
+			RTE_LOG(DEBUG, EAL, "rte_thread_set_affinity_by_id failed\n");
+			goto cleanup;
+		}
+	}
+
+cleanup:
+	if (attrp != NULL)
+		pthread_attr_destroy(&attr);
+
+	return ret;
+}
+
+int
+rte_thread_join(rte_thread_t thread_id, unsigned long *value_ptr)
+{
+	int ret = 0;
+	void *res = NULL;
+	void **pres = NULL;
+
+	if (value_ptr != NULL)
+		pres = &res;
+
+	ret = pthread_join((pthread_t)thread_id.opaque_id, pres);
+	if (ret != 0) {
+		RTE_LOG(DEBUG, EAL, "pthread_join failed\n");
+		return ret;
+	}
+
+	if (value_ptr != NULL && *pres != NULL)
+		*value_ptr = *(unsigned long *)(*pres);
+
+	return 0;
+}
+
+int
+rte_thread_detach(rte_thread_t thread_id)
+{
+	return pthread_detach((pthread_t)thread_id.opaque_id);
+}
+
 int
 rte_thread_key_create(rte_thread_key *key, void (*destructor)(void *))
 {
diff --git a/lib/eal/include/rte_thread.h b/lib/eal/include/rte_thread.h
index 7077c9ce46..e841321819 100644
--- a/lib/eal/include/rte_thread.h
+++ b/lib/eal/include/rte_thread.h
@@ -31,6 +31,7 @@  typedef struct rte_thread_tag {
 	uintptr_t opaque_id; /**< thread identifier */
 } rte_thread_t;
 
+typedef void* (*rte_thread_func) (void *);
 /**
  * Thread priority values.
  */
@@ -211,6 +212,60 @@  int rte_thread_set_affinity(rte_cpuset_t *cpusetp);
  */
 void rte_thread_get_affinity(rte_cpuset_t *cpusetp);
 
+/**
+ * Create a new thread that will invoke the 'thread_func' routine.
+ *
+ * @param thread_id
+ *    A pointer that will store the id of the newly created thread.
+ *
+ * @param thread_attr
+ *    Attributes that are used at the creation of the new thread.
+ *
+ * @param thread_func
+ *    The routine that the new thread will invoke when starting execution.
+ *
+ * @param args
+ *    Arguments to be passed to the 'thread_func' routine.
+ *
+ * @return
+ *   On success, return 0.
+ *   On failure, return a positive errno-style error number.
+ */
+__rte_experimental
+int rte_thread_create(rte_thread_t *thread_id,
+		const rte_thread_attr_t *thread_attr,
+		rte_thread_func thread_func, void *args);
+
+/**
+ * Waits for the thread identified by 'thread_id' to terminate
+ *
+ * @param thread_id
+ *    The identifier of the thread.
+ *
+ * @param value_ptr
+ *    Stores the exit status of the thread.
+ *
+ * @return
+ *   On success, return 0.
+ *   On failure, return a positive errno-style error number.
+ */
+__rte_experimental
+int rte_thread_join(rte_thread_t thread_id, unsigned long *value_ptr);
+
+/**
+ * Indicate that the return value of the thread is not needed and
+ * all thread resources should be release when the thread terminates.
+ *
+ * @param thread_id
+ *    The id of the thread to be detached.
+ *
+ * @return
+ *   On success, return 0.
+ *   On failure, return a positive errno-style error number.
+ */
+__rte_experimental
+int rte_thread_detach(rte_thread_t thread_id);
+
 #endif /* RTE_HAS_CPUSET */
 
 /**
diff --git a/lib/eal/version.map b/lib/eal/version.map
index 5bc5a6cf76..0384a09fa2 100644
--- a/lib/eal/version.map
+++ b/lib/eal/version.map
@@ -431,6 +431,9 @@  EXPERIMENTAL {
 	rte_thread_set_affinity_by_id;
 	rte_thread_get_priority;
 	rte_thread_set_priority;
+	rte_thread_create;
+	rte_thread_join;
+	rte_thread_detach;
 };
 
 INTERNAL {
diff --git a/lib/eal/windows/include/sched.h b/lib/eal/windows/include/sched.h
index bc31cc8465..912fed12c2 100644
--- a/lib/eal/windows/include/sched.h
+++ b/lib/eal/windows/include/sched.h
@@ -44,7 +44,7 @@  typedef struct _rte_cpuset_s {
 	(1LL << _WHICH_BIT(b))) != 0LL)
 
 static inline int
-count_cpu(rte_cpuset_t *s)
+count_cpu(const rte_cpuset_t *s)
 {
 	unsigned int _i;
 	int count = 0;
diff --git a/lib/eal/windows/rte_thread.c b/lib/eal/windows/rte_thread.c
index 5c02a6eaff..669a68d6a8 100644
--- a/lib/eal/windows/rte_thread.c
+++ b/lib/eal/windows/rte_thread.c
@@ -14,6 +14,11 @@  struct eal_tls_key {
 	DWORD thread_index;
 };
 
+struct thread_routine_ctx {
+	rte_thread_func thread_func;
+	void *routine_args;
+};
+
 /* Translates the most common error codes related to threads */
 static int
 thread_translate_win32_error(DWORD error)
@@ -370,6 +375,135 @@  rte_thread_attr_set_priority(rte_thread_attr_t *thread_attr,
 	return 0;
 }
 
+static DWORD
+thread_func_wrapper(void *args)
+{
+	struct thread_routine_ctx *pctx = args;
+	struct thread_routine_ctx ctx;
+
+	ctx.thread_func = pctx->thread_func;
+	ctx.routine_args = pctx->routine_args;
+
+	free(pctx);
+
+	return (DWORD)(uintptr_t)ctx.thread_func(ctx.routine_args);
+}
+
+int
+rte_thread_create(rte_thread_t *thread_id,
+		  const rte_thread_attr_t *thread_attr,
+		  rte_thread_func thread_func, void *args)
+{
+	int ret = 0;
+	DWORD tid;
+	HANDLE thread_handle = NULL;
+	GROUP_AFFINITY thread_affinity;
+	struct thread_routine_ctx *ctx = NULL;
+
+	ctx = calloc(1, sizeof(*ctx));
+	if (ctx == NULL) {
+		RTE_LOG(DEBUG, EAL, "Insufficient memory for thread context allocations\n");
+		ret = ENOMEM;
+		goto cleanup;
+	}
+	ctx->routine_args = args;
+	ctx->thread_func = thread_func;
+
+	thread_handle = CreateThread(NULL, 0, thread_func_wrapper, ctx,
+		CREATE_SUSPENDED, &tid);
+	if (thread_handle == NULL) {
+		ret = thread_log_last_error("CreateThread()");
+		free(ctx);
+		goto cleanup;
+	}
+	thread_id->opaque_id = tid;
+
+	if (thread_attr != NULL) {
+		if (CPU_COUNT(&thread_attr->cpuset) > 0) {
+			ret = rte_convert_cpuset_to_affinity(
+							&thread_attr->cpuset,
+							&thread_affinity
+							);
+			if (ret != 0) {
+				RTE_LOG(DEBUG, EAL, "Unable to convert cpuset to thread affinity\n");
+				goto cleanup;
+			}
+
+			if (!SetThreadGroupAffinity(thread_handle,
+						    &thread_affinity, NULL)) {
+				ret = thread_log_last_error("SetThreadGroupAffinity()");
+				goto cleanup;
+			}
+		}
+		ret = rte_thread_set_priority(*thread_id,
+				thread_attr->priority);
+		if (ret != 0) {
+			RTE_LOG(DEBUG, EAL, "Unable to set thread priority\n");
+			goto cleanup;
+		}
+	}
+
+	if (ResumeThread(thread_handle) == (DWORD)-1) {
+		ret = thread_log_last_error("ResumeThread()");
+		goto cleanup;
+	}
+
+cleanup:
+	if (thread_handle != NULL) {
+		CloseHandle(thread_handle);
+		thread_handle = NULL;
+	}
+	return ret;
+}
+
+int
+rte_thread_join(rte_thread_t thread_id, unsigned long *value_ptr)
+{
+	HANDLE thread_handle;
+	DWORD result;
+	DWORD exit_code = 0;
+	BOOL err;
+	int ret = 0;
+
+	thread_handle = OpenThread(SYNCHRONIZE | THREAD_QUERY_INFORMATION,
+				   FALSE, thread_id.opaque_id);
+	if (thread_handle == NULL) {
+		ret = thread_log_last_error("OpenThread()");
+		goto cleanup;
+	}
+
+	result = WaitForSingleObject(thread_handle, INFINITE);
+	if (result != WAIT_OBJECT_0) {
+		ret = thread_log_last_error("WaitForSingleObject()");
+		goto cleanup;
+	}
+
+	if (value_ptr != NULL) {
+		err = GetExitCodeThread(thread_handle, &exit_code);
+		if (err == 0) {
+			ret = thread_log_last_error("GetExitCodeThread()");
+			goto cleanup;
+		}
+		*value_ptr = exit_code;
+	}
+
+cleanup:
+	if (thread_handle != NULL) {
+		CloseHandle(thread_handle);
+		thread_handle = NULL;
+	}
+
+	return ret;
+}
+
+int
+rte_thread_detach(rte_thread_t thread_id)
+{
+	/* No resources that need to be released. */
+	RTE_SET_USED(thread_id);
+	return 0;
+}
+
 int
 rte_thread_key_create(rte_thread_key *key,
 		__rte_unused void (*destructor)(void *))