[v9,06/10] eal: add thread lifetime management

Message ID 1622850274-6946-7-git-send-email-navasile@linux.microsoft.com (mailing list archive)
State Superseded, archived
Delegated to: David Marchand
Headers
Series eal: Add EAL API for threading |

Checks

Context Check Description
ci/checkpatch warning coding style issues

Commit Message

Narcisa Ana Maria Vasile June 4, 2021, 11:44 p.m. UTC
From: Narcisa Vasile <navasile@microsoft.com>

Add function for thread creation, join, canceling, detaching.

The *rte_thread_create()* function can optionally receive an rte_thread_attr_t
object that will cause the thread to be created with the affinity and priority
described by the attributes object. If no rte_thread_attr_t is passed
(parameter is NULL), the default affinity and priority are used.

On Windows, the function executed by a thread when the thread starts is
represeneted by a function pointer of type DWORD (*func) (void*).
On other platforms, the function pointer is a void* (*func) (void*).

Performing a cast between these two types of function pointers to
uniformize the API on all platforms may result in undefined behavior.
TO fix this issue, a wrapper that respects the signature required by
CreateThread() has been created on Windows.

Signed-off-by: Narcisa Vasile <navasile@microsoft.com>
---
 lib/eal/common/rte_thread.c  | 116 +++++++++++++++++++++++++
 lib/eal/include/rte_thread.h |  67 +++++++++++++++
 lib/eal/windows/rte_thread.c | 162 +++++++++++++++++++++++++++++++++++
 3 files changed, 345 insertions(+)
  

Comments

Dmitry Kozlyuk June 8, 2021, 11:04 p.m. UTC | #1
2021-06-04 16:44 (UTC-0700), Narcisa Ana Maria Vasile:
[...]
> diff --git a/lib/eal/include/rte_thread.h b/lib/eal/include/rte_thread.h
> index 5c54cd9d67..1d481b9ad5 100644
> --- a/lib/eal/include/rte_thread.h
> +++ b/lib/eal/include/rte_thread.h
> @@ -208,6 +208,73 @@ __rte_experimental
>  int rte_thread_attr_set_priority(rte_thread_attr_t *thread_attr,
>  		enum rte_thread_priority priority);
>  
> +/**
> + * Create a new thread that will invoke the 'thread_func' routine.
> + *
> + * @param thread_id
> + *    A pointer that will store the id of the newly created thread.
> + *
> + * @param thread_attr
> + *    Attributes that are used at the creation of the new thread.
> + *
> + * @param thread_func
> + *    The routine that the new thread will invoke when starting execution.
> + *
> + * @param args
> + *    Arguments to be passed to the 'thread_func' routine.
> + *
> + * @return
> + *   On success, return 0.
> + *   On failure, return a positive errno-style error number.
> + */
> +__rte_experimental
> +int rte_thread_create(rte_thread_t *thread_id,
> +		      const rte_thread_attr_t *thread_attr,
> +		      void *(*thread_func)(void *), void *args);

1. Thread function prototype is used at least in 4 places,
maybe give it a name, like `rte_thread_func`?

2. We can't easily support it's `void*` return type on Windows,
because it doesn't fit in DWORD. In `rte_thread_join` below you use `int`.
All `pthread_join` usages in DPDK ignore return value, but I'd rather keep it.
Do you think it's OK to stick to `int`?

[...]
> +/**
> + * Terminates a thread.
> + *
> + * @param thread_id
> + *    The id of the thread to be cancelled.
> + *
> + * @return
> + *   On success, return 0.
> + *   On failure, return a positive errno-style error number.
> + */
> +__rte_experimental
> +int rte_thread_cancel(rte_thread_t thread_id);

What do you think of making this function internal for now?
We don't want applications to rely on this prototype.
To hide it from Doxygen, `/*` comment or #ifndef __DOXYGEN__ can be used.
It is worth noting in commit message
that it's not implemented for Windows and why.

> +
> +/**
> + * Detaches a thread.

Please explain what it means, because detaching is a pthread concept.

> + *
> + * @param thread_id
> + *    The id of the thread to be cancelled.

Not "cancelled".

> + *
> + * @return
> + *   On success, return 0.
> + *   On failure, return a positive errno-style error number.
> + */
> +__rte_experimental
> +int rte_thread_detach(rte_thread_t thread_id);
> +
> +

Redundant empty line.

>  #ifdef RTE_HAS_CPUSET
>  
>  /**
> diff --git a/lib/eal/windows/rte_thread.c b/lib/eal/windows/rte_thread.c
> index 6dc3d575c0..5afdd54e15 100644
> --- a/lib/eal/windows/rte_thread.c
> +++ b/lib/eal/windows/rte_thread.c
> @@ -14,6 +14,11 @@ struct eal_tls_key {
>  	DWORD thread_index;
>  };
>  
> +struct thread_routine_ctx {
> +	void* (*start_routine) (void*);
> +	void *routine_args;
> +};
> +
>  /* Translates the most common error codes related to threads */
>  static int
>  thread_translate_win32_error(DWORD error)
> @@ -346,6 +351,163 @@ rte_thread_attr_set_priority(rte_thread_attr_t *thread_attr,
>  	return 0;
>  }
>  
> +static DWORD
> +thread_func_wrapper(void *args)
> +{
> +	struct thread_routine_ctx *pctx = args;
> +	intptr_t func_ret = 0;
> +	struct thread_routine_ctx ctx = { NULL, NULL };
> +
> +	ctx.start_routine = pctx->start_routine;
> +	ctx.routine_args = pctx->routine_args;
> +
> +	free(pctx);
> +
> +	func_ret = (intptr_t)ctx.start_routine(ctx.routine_args);
> +	return (DWORD)func_ret;
> +}
> +
> +int
> +rte_thread_create(rte_thread_t *thread_id,
> +		  const rte_thread_attr_t *thread_attr,
> +		  void *(*thread_func)(void *), void *args)
> +{
> +	int ret = 0;
> +	DWORD tid = 0;
> +	HANDLE thread_handle = NULL;
> +	GROUP_AFFINITY thread_affinity;
> +	struct thread_routine_ctx *ctx = NULL;
> +
> +	ctx = calloc(1, sizeof(*ctx));

Why use `calloc()` for a scalar?

[...]
> +int
> +rte_thread_cancel(rte_thread_t thread_id)
> +{
> +	int ret = 0;
> +	HANDLE thread_handle = NULL;
> +
> +	thread_handle = OpenThread(THREAD_TERMINATE, FALSE, thread_id.opaque_id);
> +	if (thread_handle == NULL) {
> +		ret = thread_log_last_error("OpenThread()");
> +		goto cleanup;
> +	}
> +
> +	/*
> +	 * TODO: Behavior is different between POSIX and Windows threads.
> +	 * POSIX threads wait for a cancellation point.
> +	 * Current Windows emulation kills thread at any point.
> +	 */
> +	ret = TerminateThread(thread_handle, 0);
> +	if (ret != 0) {
> +		ret = thread_log_last_error("TerminateThread()");
> +		goto cleanup;
> +	}
> +
> +cleanup:
> +	if (thread_handle != NULL) {
> +		CloseHandle(thread_handle);
> +		thread_handle = NULL;
> +	}
> +	return ret;
> +}

As we've discussed before, such implementation should never be used.
I suggest removing it for now, otherwise if someone enables the code
calling rte_thread_cancel() for Windows, it will almost certainly lead
to deadlock bugs.

> +
> +int
> +rte_thread_detach(rte_thread_t thread_id)
> +{
> +	(void)thread_id;

RTE_SET_USED/__rte_unused

> +	return ENOTSUP;
> +}
> +

It should return success as the thread is in detached state after the call.

>  int
>  rte_thread_key_create(rte_thread_key *key,
>  		__rte_unused void (*destructor)(void *))
  
Narcisa Ana Maria Vasile June 18, 2021, 9:41 p.m. UTC | #2
On Wed, Jun 09, 2021 at 02:04:09AM +0300, Dmitry Kozlyuk wrote:
> 2021-06-04 16:44 (UTC-0700), Narcisa Ana Maria Vasile:
> [...]
> > diff --git a/lib/eal/include/rte_thread.h b/lib/eal/include/rte_thread.h
> > index 5c54cd9d67..1d481b9ad5 100644
> > --- a/lib/eal/include/rte_thread.h
> > +++ b/lib/eal/include/rte_thread.h
> > +__rte_experimental
> > +int rte_thread_create(rte_thread_t *thread_id,
> > +		      const rte_thread_attr_t *thread_attr,
> > +		      void *(*thread_func)(void *), void *args);
> 
> 1. Thread function prototype is used at least in 4 places,
> maybe give it a name, like `rte_thread_func`?
> 
> 2. We can't easily support it's `void*` return type on Windows,
> because it doesn't fit in DWORD. In `rte_thread_join` below you use `int`.
> All `pthread_join` usages in DPDK ignore return value, but I'd rather keep it.
> Do you think it's OK to stick to `int`?
> 
Thank you, I agree that we should keep it. I've changed it to unsigned long
to fit with Windows's DWORD as well.

> [...]
> > +/**
> > + * Terminates a thread.
> > + *
> > + * @param thread_id
> > + *    The id of the thread to be cancelled.
> > + *
> > + * @return
> > + *   On success, return 0.
> > + *   On failure, return a positive errno-style error number.
> > + */
> > +__rte_experimental
> > +int rte_thread_cancel(rte_thread_t thread_id);
> 
> What do you think of making this function internal for now?
> We don't want applications to rely on this prototype.
> To hide it from Doxygen, `/*` comment or #ifndef __DOXYGEN__ can be used.
> It is worth noting in commit message
> that it's not implemented for Windows and why.
> 
 Thank you, I've removed it for now.
> > +
> > +	HANDLE thread_handle = NULL;
> > +	GROUP_AFFINITY thread_affinity;
> > +	struct thread_routine_ctx *ctx = NULL;
> > +
> > +	ctx = calloc(1, sizeof(*ctx));
> 
> Why use `calloc()` for a scalar?

ctx is pointer to struct that holds the thread function pointer and its arguments.
Did I misunderstand what you meant?
>
  
Dmitry Kozlyuk June 18, 2021, 10:48 p.m. UTC | #3
2021-06-18 14:41 (UTC-0700), Narcisa Ana Maria Vasile:
> On Wed, Jun 09, 2021 at 02:04:09AM +0300, Dmitry Kozlyuk wrote:
> > 2021-06-04 16:44 (UTC-0700), Narcisa Ana Maria Vasile:
[...]
> > > +
> > > +	HANDLE thread_handle = NULL;
> > > +	GROUP_AFFINITY thread_affinity;
> > > +	struct thread_routine_ctx *ctx = NULL;
> > > +
> > > +	ctx = calloc(1, sizeof(*ctx));  
> > 
> > Why use `calloc()` for a scalar?  
> 
> ctx is pointer to struct that holds the thread function pointer and its arguments.
> Did I misunderstand what you meant?

`calloc(size_t n, size_t size)` mainly exists for safe array allocations,
because multiplication in `malloc(n * size)` may overflow. You are allocating
a singular value, i. e. a scalar, so the choice of `calloc()` over `malloc()`
raises questions. Nevertheless, it is harmless and works correctly. Consider
it a nit.
  

Patch

diff --git a/lib/eal/common/rte_thread.c b/lib/eal/common/rte_thread.c
index 5cee19bb7d..84050d0f4c 100644
--- a/lib/eal/common/rte_thread.c
+++ b/lib/eal/common/rte_thread.c
@@ -149,6 +149,122 @@  rte_thread_attr_set_priority(rte_thread_attr_t *thread_attr,
 	return 0;
 }
 
+int
+rte_thread_create(rte_thread_t *thread_id,
+		  const rte_thread_attr_t *thread_attr,
+		  void *(*thread_func)(void *), void *args)
+{
+	int ret = 0;
+	pthread_attr_t attr;
+	pthread_attr_t *attrp = NULL;
+	struct sched_param param = {
+		.sched_priority = 0,
+	};
+	int policy = SCHED_OTHER;
+
+	if (thread_attr != NULL) {
+		ret = pthread_attr_init(&attr);
+		if (ret != 0) {
+			RTE_LOG(DEBUG, EAL, "pthread_attr_init failed\n");
+			goto cleanup;
+		}
+
+		attrp = &attr;
+
+		/*
+		 * Set the inherit scheduler parameter to explicit,
+		 * otherwise the priority attribute is ignored.
+		 */
+		ret = pthread_attr_setinheritsched(attrp,
+						   PTHREAD_EXPLICIT_SCHED);
+		if (ret != 0) {
+			RTE_LOG(DEBUG, EAL, "pthread_attr_setinheritsched failed\n");
+			goto cleanup;
+		}
+
+		/*
+		 * In case a realtime scheduling policy is requested,
+		 * the sched_priority parameter is set to the value stored in
+		 * thread_attr. Otherwise, for the default scheduling policy
+		 * (SCHED_OTHER) sched_priority needs to be initialized to 0.
+		 */
+		if (thread_attr->priority == RTE_THREAD_PRIORITY_REALTIME_CRITICAL) {
+			policy = SCHED_RR;
+			param.sched_priority = thread_attr->priority;
+		}
+
+		ret = pthread_attr_setschedpolicy(attrp, policy);
+		if (ret != 0) {
+			RTE_LOG(DEBUG, EAL, "pthread_attr_setschedpolicy failed\n");
+			goto cleanup;
+		}
+
+		ret = pthread_attr_setschedparam(attrp, &param);
+		if (ret != 0) {
+			RTE_LOG(DEBUG, EAL, "pthread_attr_setschedparam failed\n");
+			goto cleanup;
+		}
+
+		ret = pthread_attr_setaffinity_np(attrp,
+						  sizeof(thread_attr->cpuset),
+						  &thread_attr->cpuset);
+		if (ret != 0) {
+			RTE_LOG(DEBUG, EAL, "pthread_attr_setaffinity_np failed\n");
+			goto cleanup;
+		}
+	}
+
+	ret = pthread_create(&thread_id->opaque_id, attrp, thread_func, args);
+	if (ret != 0) {
+		RTE_LOG(DEBUG, EAL, "pthread_create failed\n");
+		goto cleanup;
+	}
+
+cleanup:
+	if (attrp != NULL)
+		pthread_attr_destroy(&attr);
+
+	return ret;
+}
+
+int
+rte_thread_join(rte_thread_t thread_id, int *value_ptr)
+{
+	int ret = 0;
+	void *res = NULL;
+	void **pres = NULL;
+
+	if (value_ptr != NULL)
+		pres = &res;
+
+	ret = pthread_join(thread_id.opaque_id, pres);
+	if (ret != 0) {
+		RTE_LOG(DEBUG, EAL, "pthread_join failed\n");
+		return ret;
+	}
+
+	if (pres != NULL)
+		*value_ptr = *(int *)(*pres);
+
+	return 0;
+}
+
+int rte_thread_cancel(rte_thread_t thread_id)
+{
+	/*
+	 * TODO: Behavior is different between POSIX and Windows threads.
+	 * POSIX threads wait for a cancellation point.
+	 * Current Windows emulation kills thread at any point.
+	 */
+	return pthread_cancel(thread_id.opaque_id);
+}
+
+int
+rte_thread_detach(rte_thread_t thread_id)
+{
+	return pthread_detach(thread_id.opaque_id);
+}
+
 int
 rte_thread_key_create(rte_thread_key *key, void (*destructor)(void *))
 {
diff --git a/lib/eal/include/rte_thread.h b/lib/eal/include/rte_thread.h
index 5c54cd9d67..1d481b9ad5 100644
--- a/lib/eal/include/rte_thread.h
+++ b/lib/eal/include/rte_thread.h
@@ -208,6 +208,73 @@  __rte_experimental
 int rte_thread_attr_set_priority(rte_thread_attr_t *thread_attr,
 		enum rte_thread_priority priority);
 
+/**
+ * Create a new thread that will invoke the 'thread_func' routine.
+ *
+ * @param thread_id
+ *    A pointer that will store the id of the newly created thread.
+ *
+ * @param thread_attr
+ *    Attributes that are used at the creation of the new thread.
+ *
+ * @param thread_func
+ *    The routine that the new thread will invoke when starting execution.
+ *
+ * @param args
+ *    Arguments to be passed to the 'thread_func' routine.
+ *
+ * @return
+ *   On success, return 0.
+ *   On failure, return a positive errno-style error number.
+ */
+__rte_experimental
+int rte_thread_create(rte_thread_t *thread_id,
+		      const rte_thread_attr_t *thread_attr,
+		      void *(*thread_func)(void *), void *args);
+
+/**
+ * Waits for the thread identified by 'thread_id' to terminate
+ *
+ * @param thread_id
+ *    The identifier of the thread.
+ *
+ * @param value_ptr
+ *    Stores the exit status of the thread.
+ *
+ * @return
+ *   On success, return 0.
+ *   On failure, return a positive errno-style error number.
+ */
+__rte_experimental
+int rte_thread_join(rte_thread_t thread_id, int *value_ptr);
+
+/**
+ * Terminates a thread.
+ *
+ * @param thread_id
+ *    The id of the thread to be cancelled.
+ *
+ * @return
+ *   On success, return 0.
+ *   On failure, return a positive errno-style error number.
+ */
+__rte_experimental
+int rte_thread_cancel(rte_thread_t thread_id);
+
+/**
+ * Detaches a thread.
+ *
+ * @param thread_id
+ *    The id of the thread to be cancelled.
+ *
+ * @return
+ *   On success, return 0.
+ *   On failure, return a positive errno-style error number.
+ */
+__rte_experimental
+int rte_thread_detach(rte_thread_t thread_id);
+
+
 #ifdef RTE_HAS_CPUSET
 
 /**
diff --git a/lib/eal/windows/rte_thread.c b/lib/eal/windows/rte_thread.c
index 6dc3d575c0..5afdd54e15 100644
--- a/lib/eal/windows/rte_thread.c
+++ b/lib/eal/windows/rte_thread.c
@@ -14,6 +14,11 @@  struct eal_tls_key {
 	DWORD thread_index;
 };
 
+struct thread_routine_ctx {
+	void* (*start_routine) (void*);
+	void *routine_args;
+};
+
 /* Translates the most common error codes related to threads */
 static int
 thread_translate_win32_error(DWORD error)
@@ -346,6 +351,163 @@  rte_thread_attr_set_priority(rte_thread_attr_t *thread_attr,
 	return 0;
 }
 
+static DWORD
+thread_func_wrapper(void *args)
+{
+	struct thread_routine_ctx *pctx = args;
+	intptr_t func_ret = 0;
+	struct thread_routine_ctx ctx = { NULL, NULL };
+
+	ctx.start_routine = pctx->start_routine;
+	ctx.routine_args = pctx->routine_args;
+
+	free(pctx);
+
+	func_ret = (intptr_t)ctx.start_routine(ctx.routine_args);
+	return (DWORD)func_ret;
+}
+
+int
+rte_thread_create(rte_thread_t *thread_id,
+		  const rte_thread_attr_t *thread_attr,
+		  void *(*thread_func)(void *), void *args)
+{
+	int ret = 0;
+	DWORD tid = 0;
+	HANDLE thread_handle = NULL;
+	GROUP_AFFINITY thread_affinity;
+	struct thread_routine_ctx *ctx = NULL;
+
+	ctx = calloc(1, sizeof(*ctx));
+	if (ctx == NULL) {
+		RTE_LOG(DEBUG, EAL, "Insufficient memory for thread context allocations\n");
+		ret = ENOMEM;
+		goto cleanup;
+	}
+	ctx->routine_args = args;
+	ctx->start_routine = thread_func;
+
+	thread_handle = CreateThread(NULL, 0, thread_func_wrapper, ctx,
+		CREATE_SUSPENDED, &tid);
+	if (thread_handle == NULL) {
+		ret = thread_log_last_error("CreateThread()");
+		goto cleanup;
+	}
+	thread_id->opaque_id = tid;
+
+	if (thread_attr != NULL) {
+		if (CPU_COUNT(&thread_attr->cpuset) > 0) {
+			ret = rte_convert_cpuset_to_affinity(&thread_attr->cpuset, &thread_affinity);
+			if (ret != 0) {
+				RTE_LOG(DEBUG, EAL, "Unable to convert cpuset to thread affinity\n");
+				goto cleanup;
+			}
+
+			if (!SetThreadGroupAffinity(thread_handle, &thread_affinity, NULL)) {
+				ret = thread_log_last_error("SetThreadGroupAffinity()");
+				goto cleanup;
+			}
+		}
+		ret = rte_thread_set_priority(*thread_id, thread_attr->priority);
+		if (ret != 0) {
+			RTE_LOG(DEBUG, EAL, "Unable to set thread priority\n");
+			goto cleanup;
+		}
+	}
+
+	if (ResumeThread(thread_handle) == (DWORD)-1) {
+		ret = thread_log_last_error("ResumeThread()");
+		goto cleanup;
+	}
+
+	return 0;
+
+cleanup:
+	if (thread_handle != NULL) {
+		CloseHandle(thread_handle);
+		thread_handle = NULL;
+	}
+	return ret;
+}
+
+int
+rte_thread_join(rte_thread_t thread_id, int *value_ptr)
+{
+	HANDLE thread_handle = NULL;
+	DWORD result = 0;
+	DWORD exit_code = 0;
+	BOOL err = 0;
+	int ret = 0;
+
+	thread_handle = OpenThread(SYNCHRONIZE | THREAD_QUERY_INFORMATION,
+				   FALSE, thread_id.opaque_id);
+	if (thread_handle == NULL) {
+		ret = thread_log_last_error("OpenThread()");
+		goto cleanup;
+	}
+
+	result = WaitForSingleObject(thread_handle, INFINITE);
+	if (result != WAIT_OBJECT_0) {
+		ret = thread_log_last_error("WaitForSingleObject()");
+		goto cleanup;
+	}
+
+	if (value_ptr != NULL) {
+		err = GetExitCodeThread(thread_handle, &exit_code);
+		if (err == 0) {
+			ret = thread_log_last_error("GetExitCodeThread()");
+			goto cleanup;
+		}
+		*value_ptr = exit_code;
+	}
+
+cleanup:
+	if (thread_handle != NULL) {
+		CloseHandle(thread_handle);
+		thread_handle = NULL;
+	}
+
+	return ret;
+}
+
+int
+rte_thread_cancel(rte_thread_t thread_id)
+{
+	int ret = 0;
+	HANDLE thread_handle = NULL;
+
+	thread_handle = OpenThread(THREAD_TERMINATE, FALSE, thread_id.opaque_id);
+	if (thread_handle == NULL) {
+		ret = thread_log_last_error("OpenThread()");
+		goto cleanup;
+	}
+
+	/*
+	 * TODO: Behavior is different between POSIX and Windows threads.
+	 * POSIX threads wait for a cancellation point.
+	 * Current Windows emulation kills thread at any point.
+	 */
+	ret = TerminateThread(thread_handle, 0);
+	if (ret != 0) {
+		ret = thread_log_last_error("TerminateThread()");
+		goto cleanup;
+	}
+
+cleanup:
+	if (thread_handle != NULL) {
+		CloseHandle(thread_handle);
+		thread_handle = NULL;
+	}
+	return ret;
+}
+
+int
+rte_thread_detach(rte_thread_t thread_id)
+{
+	(void)thread_id;
+	return ENOTSUP;
+}
+
 int
 rte_thread_key_create(rte_thread_key *key,
 		__rte_unused void (*destructor)(void *))