diff mbox series

[v18,7/8] eal: implement functions for thread barrier management

Message ID 1636594425-9692-8-git-send-email-navasile@linux.microsoft.com (mailing list archive)
State Superseded
Delegated to: David Marchand
Headers show
Series eal: Add EAL API for threading | expand

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Narcisa Ana Maria Vasile Nov. 11, 2021, 1:33 a.m. UTC
From: Narcisa Vasile <navasile@microsoft.com>

Add functions for barrier init, destroy, wait.

A portable type is used to represent a barrier identifier.
The rte_thread_barrier_wait() function returns the same value
on all platforms.

Add unit tests to verify that the barrier correctly
synchronizes all threads. Verify that the threads are unblocked
after the required number of threads have called barrier_wait().

Signed-off-by: Narcisa Vasile <navasile@microsoft.com>
---
 app/test/test_threads.c      | 49 +++++++++++++++++++++++++++++
 lib/eal/common/rte_thread.c  | 61 ++++++++++++++++++++++++++++++++++++
 lib/eal/include/rte_thread.h | 58 ++++++++++++++++++++++++++++++++++
 lib/eal/version.map          |  3 ++
 lib/eal/windows/rte_thread.c | 56 +++++++++++++++++++++++++++++++++
 5 files changed, 227 insertions(+)
diff mbox series

Patch

diff --git a/app/test/test_threads.c b/app/test/test_threads.c
index 9fcae34179..00f604ab7e 100644
--- a/app/test/test_threads.c
+++ b/app/test/test_threads.c
@@ -195,6 +195,54 @@  test_thread_detach(void)
 	return ret;
 }
 
+struct thread_context {
+	rte_thread_barrier *barrier;
+	int barrier_result;
+};
+
+static void *
+thread_loop_barrier(void *arg)
+{
+	struct thread_context *ctx = arg;
+
+	ctx->barrier_result = rte_thread_barrier_wait(ctx->barrier);
+	if (ctx->barrier_result > 0)
+		rte_log(RTE_LOG_DEBUG, threads_logtype_test, "Failed to wait at barrier!");
+
+	return NULL;
+}
+
+static int
+test_thread_barrier(void)
+{
+	rte_thread_t thread_id;
+	struct thread_context ctx;
+	rte_thread_barrier barrier;
+	int ret = 0;
+	int result = 0;
+
+	ret = rte_thread_barrier_init(&barrier, 2);
+	RTE_TEST_ASSERT(ret == 0, "Failed to initialize barrier!");
+
+	ctx.barrier = &barrier;
+	ret = rte_thread_create(&thread_id, NULL, thread_loop_barrier, &ctx);
+	RTE_TEST_ASSERT(ret == 0, "Failed to create thread!");
+
+	result = rte_thread_barrier_wait(&barrier);
+	RTE_TEST_ASSERT(result <= 0, "Failed to wait at the barrier!");
+
+	ret = rte_thread_join(thread_id, NULL);
+	RTE_TEST_ASSERT(ret == 0, "Failed to join threads!");
+
+	ret = rte_thread_barrier_destroy(&barrier);
+	RTE_TEST_ASSERT(ret == 0, "Failed to destroy barrier!");
+
+	RTE_TEST_ASSERT(ctx.barrier_result <= 0, "Child thread failed to wait at the barrier!");
+	RTE_TEST_ASSERT_NOT_EQUAL(ctx.barrier_result, result, "Threads were not blocked at the barrier!");
+
+	return 0;
+}
+
 static struct unit_test_suite threads_test_suite = {
 	.suite_name = "threads autotest",
 	.setup = NULL,
@@ -204,6 +252,7 @@  static struct unit_test_suite threads_test_suite = {
 			TEST_CASE(test_thread_attributes_affinity),
 			TEST_CASE(test_thread_attributes_priority),
 			TEST_CASE(test_thread_detach),
+			TEST_CASE(test_thread_barrier),
 			TEST_CASES_END()
 	}
 };
diff --git a/lib/eal/common/rte_thread.c b/lib/eal/common/rte_thread.c
index 910c39eb88..d30a8a7ca3 100644
--- a/lib/eal/common/rte_thread.c
+++ b/lib/eal/common/rte_thread.c
@@ -309,6 +309,67 @@  rte_thread_detach(rte_thread_t thread_id)
 	return pthread_detach((pthread_t)thread_id.opaque_id);
 }
 
+int
+rte_thread_barrier_init(rte_thread_barrier *barrier, int count)
+{
+	int ret = 0;
+	pthread_barrier_t *pthread_barrier = NULL;
+
+	RTE_VERIFY(barrier != NULL);
+	RTE_VERIFY(count > 0);
+
+	pthread_barrier = calloc(1, sizeof(*pthread_barrier));
+	if (pthread_barrier == NULL) {
+		RTE_LOG(DEBUG, EAL, "Unable to initialize barrier. Insufficient memory!\n");
+		ret = ENOMEM;
+		goto cleanup;
+	}
+	ret = pthread_barrier_init(pthread_barrier, NULL, count);
+	if (ret != 0) {
+		RTE_LOG(DEBUG, EAL, "Failed to init barrier, ret = %d\n", ret);
+		goto cleanup;
+	}
+
+	barrier->barrier_id = pthread_barrier;
+	pthread_barrier = NULL;
+
+cleanup:
+	free(pthread_barrier);
+	return ret;
+}
+
+int
+rte_thread_barrier_wait(rte_thread_barrier *barrier)
+{
+	int ret = 0;
+
+	RTE_VERIFY(barrier != NULL);
+	RTE_VERIFY(barrier->barrier_id != NULL);
+
+	ret = pthread_barrier_wait(barrier->barrier_id);
+	if (ret == PTHREAD_BARRIER_SERIAL_THREAD)
+		ret = RTE_THREAD_BARRIER_SERIAL_THREAD;
+
+	return ret;
+}
+
+int
+rte_thread_barrier_destroy(rte_thread_barrier *barrier)
+{
+	int ret = 0;
+
+	RTE_VERIFY(barrier != NULL);
+
+	ret = pthread_barrier_destroy(barrier->barrier_id);
+	if (ret != 0)
+		RTE_LOG(DEBUG, EAL, "Failed to destroy barrier: %d\n", ret);
+
+	free(barrier->barrier_id);
+	barrier->barrier_id = NULL;
+
+	return ret;
+}
+
 int
 rte_thread_key_create(rte_thread_key *key, void (*destructor)(void *))
 {
diff --git a/lib/eal/include/rte_thread.h b/lib/eal/include/rte_thread.h
index e841321819..7c84e32988 100644
--- a/lib/eal/include/rte_thread.h
+++ b/lib/eal/include/rte_thread.h
@@ -54,6 +54,18 @@  typedef struct {
 
 #endif /* RTE_HAS_CPUSET */
 
+/**
+ * Returned by rte_thread_barrier_wait() when call is successful.
+ */
+#define RTE_THREAD_BARRIER_SERIAL_THREAD -1
+
+/**
+ * Thread barrier representation.
+ */
+typedef struct rte_thread_barrier_tag {
+	void *barrier_id;  /**< barrrier identifier */
+} rte_thread_barrier;
+
 /**
  * TLS key type, an opaque pointer.
  */
@@ -302,6 +314,52 @@  __rte_experimental
 int rte_thread_set_priority(rte_thread_t thread_id,
 		enum rte_thread_priority priority);
 
+/**
+ * Initializes a synchronization barrier.
+ *
+ * @param barrier
+ *    A pointer that references the newly created 'barrier' object.
+ *
+ * @param count
+ *    The number of threads that must enter the barrier before
+ *    the threads can continue execution.
+ *
+ * @return
+ *   On success, return 0.
+ *   On failure, return a positive errno-style error number.
+ */
+__rte_experimental
+int rte_thread_barrier_init(rte_thread_barrier *barrier, int count);
+
+/**
+ * Causes the calling thread to wait at the synchronization barrier 'barrier'.
+ *
+ * @param barrier
+ *    The barrier used for synchronizing the threads.
+ *
+ * @return
+ *   Return RTE_THREAD_BARRIER_SERIAL_THREAD for the thread synchronized
+ *      at the barrier.
+ *   Return 0 for all other threads.
+ *   Return a positive errno-style error number, in case of failure.
+ */
+__rte_experimental
+int rte_thread_barrier_wait(rte_thread_barrier *barrier);
+
+/**
+ * Releases all resources used by a synchronization barrier
+ * and uninitializes it.
+ *
+ * @param barrier
+ *    The barrier to be destroyed.
+ *
+ * @return
+ *   On success, return 0.
+ *   On failure, return a positive errno-style error number.
+ */
+__rte_experimental
+int rte_thread_barrier_destroy(rte_thread_barrier *barrier);
+
 /**
  * Create a TLS data key visible to all threads in the process.
  * the created key is later used to get/set a value.
diff --git a/lib/eal/version.map b/lib/eal/version.map
index 0384a09fa2..06e5f82da2 100644
--- a/lib/eal/version.map
+++ b/lib/eal/version.map
@@ -427,6 +427,9 @@  EXPERIMENTAL {
 	rte_thread_attr_get_affinity;
 	rte_thread_attr_set_affinity;
 	rte_thread_attr_set_priority;
+	rte_thread_barrier_init;
+	rte_thread_barrier_wait;
+	rte_thread_barrier_destroy;
 	rte_thread_get_affinity_by_id;
 	rte_thread_set_affinity_by_id;
 	rte_thread_get_priority;
diff --git a/lib/eal/windows/rte_thread.c b/lib/eal/windows/rte_thread.c
index 669a68d6a8..3f72bbf716 100644
--- a/lib/eal/windows/rte_thread.c
+++ b/lib/eal/windows/rte_thread.c
@@ -504,6 +504,62 @@  rte_thread_detach(rte_thread_t thread_id)
 	return 0;
 }
 
+int
+rte_thread_barrier_init(rte_thread_barrier *barrier, int count)
+{
+	int ret = 0;
+	SYNCHRONIZATION_BARRIER *sync_barrier = NULL;
+
+	RTE_VERIFY(barrier != NULL);
+	RTE_VERIFY(count > 0);
+
+	sync_barrier = calloc(1, sizeof(*sync_barrier));
+	if (sync_barrier == NULL) {
+		RTE_LOG(DEBUG, EAL, "Unable to initialize barrier. Insufficient memory!\n");
+		ret = ENOMEM;
+		goto cleanup;
+	}
+	if (!InitializeSynchronizationBarrier(sync_barrier, count, -1)) {
+		ret = thread_log_last_error("InitializeSynchronizationBarrier()");
+		goto cleanup;
+	}
+
+	barrier->barrier_id = sync_barrier;
+	sync_barrier = NULL;
+
+cleanup:
+	free(sync_barrier);
+	return ret;
+}
+
+int
+rte_thread_barrier_wait(rte_thread_barrier *barrier)
+{
+	RTE_VERIFY(barrier != NULL);
+	RTE_VERIFY(barrier->barrier_id != NULL);
+
+	if (EnterSynchronizationBarrier(barrier->barrier_id,
+				SYNCHRONIZATION_BARRIER_FLAGS_BLOCK_ONLY)) {
+
+		return RTE_THREAD_BARRIER_SERIAL_THREAD;
+	}
+
+	return 0;
+}
+
+int
+rte_thread_barrier_destroy(rte_thread_barrier *barrier)
+{
+	RTE_VERIFY(barrier != NULL);
+
+	DeleteSynchronizationBarrier(barrier->barrier_id);
+
+	free(barrier->barrier_id);
+	barrier->barrier_id = NULL;
+
+	return 0;
+}
+
 int
 rte_thread_key_create(rte_thread_key *key,
 		__rte_unused void (*destructor)(void *))