@@ -195,6 +195,54 @@ test_thread_detach(void)
return ret;
}
+struct thread_context {
+ rte_thread_barrier *barrier;
+ int barrier_result;
+};
+
+static void *
+thread_loop_barrier(void *arg)
+{
+ struct thread_context *ctx = arg;
+
+ ctx->barrier_result = rte_thread_barrier_wait(ctx->barrier);
+ if (ctx->barrier_result > 0)
+ rte_log(RTE_LOG_DEBUG, threads_logtype_test, "Failed to wait at barrier!");
+
+ return NULL;
+}
+
+static int
+test_thread_barrier(void)
+{
+ rte_thread_t thread_id;
+ struct thread_context ctx;
+ rte_thread_barrier barrier;
+ int ret = 0;
+ int result = 0;
+
+ ret = rte_thread_barrier_init(&barrier, 2);
+ RTE_TEST_ASSERT(ret == 0, "Failed to initialize barrier!");
+
+ ctx.barrier = &barrier;
+ ret = rte_thread_create(&thread_id, NULL, thread_loop_barrier, &ctx);
+ RTE_TEST_ASSERT(ret == 0, "Failed to create thread!");
+
+ result = rte_thread_barrier_wait(&barrier);
+ RTE_TEST_ASSERT(result <= 0, "Failed to wait at the barrier!");
+
+ ret = rte_thread_join(thread_id, NULL);
+ RTE_TEST_ASSERT(ret == 0, "Failed to join threads!");
+
+ ret = rte_thread_barrier_destroy(&barrier);
+ RTE_TEST_ASSERT(ret == 0, "Failed to destroy barrier!");
+
+ RTE_TEST_ASSERT(ctx.barrier_result <= 0, "Child thread failed to wait at the barrier!");
+ RTE_TEST_ASSERT_NOT_EQUAL(ctx.barrier_result, result, "Threads were not blocked at the barrier!");
+
+ return 0;
+}
+
static struct unit_test_suite threads_test_suite = {
.suite_name = "threads autotest",
.setup = NULL,
@@ -204,6 +252,7 @@ static struct unit_test_suite threads_test_suite = {
TEST_CASE(test_thread_attributes_affinity),
TEST_CASE(test_thread_attributes_priority),
TEST_CASE(test_thread_detach),
+ TEST_CASE(test_thread_barrier),
TEST_CASES_END()
}
};
@@ -309,6 +309,67 @@ rte_thread_detach(rte_thread_t thread_id)
return pthread_detach((pthread_t)thread_id.opaque_id);
}
+int
+rte_thread_barrier_init(rte_thread_barrier *barrier, int count)
+{
+ int ret = 0;
+ pthread_barrier_t *pthread_barrier = NULL;
+
+ RTE_VERIFY(barrier != NULL);
+ RTE_VERIFY(count > 0);
+
+ pthread_barrier = calloc(1, sizeof(*pthread_barrier));
+ if (pthread_barrier == NULL) {
+ RTE_LOG(DEBUG, EAL, "Unable to initialize barrier. Insufficient memory!\n");
+ ret = ENOMEM;
+ goto cleanup;
+ }
+ ret = pthread_barrier_init(pthread_barrier, NULL, count);
+ if (ret != 0) {
+ RTE_LOG(DEBUG, EAL, "Failed to init barrier, ret = %d\n", ret);
+ goto cleanup;
+ }
+
+ barrier->barrier_id = pthread_barrier;
+ pthread_barrier = NULL;
+
+cleanup:
+ free(pthread_barrier);
+ return ret;
+}
+
+int
+rte_thread_barrier_wait(rte_thread_barrier *barrier)
+{
+ int ret = 0;
+
+ RTE_VERIFY(barrier != NULL);
+ RTE_VERIFY(barrier->barrier_id != NULL);
+
+ ret = pthread_barrier_wait(barrier->barrier_id);
+ if (ret == PTHREAD_BARRIER_SERIAL_THREAD)
+ ret = RTE_THREAD_BARRIER_SERIAL_THREAD;
+
+ return ret;
+}
+
+int
+rte_thread_barrier_destroy(rte_thread_barrier *barrier)
+{
+ int ret = 0;
+
+ RTE_VERIFY(barrier != NULL);
+
+ ret = pthread_barrier_destroy(barrier->barrier_id);
+ if (ret != 0)
+ RTE_LOG(DEBUG, EAL, "Failed to destroy barrier: %d\n", ret);
+
+ free(barrier->barrier_id);
+ barrier->barrier_id = NULL;
+
+ return ret;
+}
+
int
rte_thread_key_create(rte_thread_key *key, void (*destructor)(void *))
{
@@ -54,6 +54,18 @@ typedef struct {
#endif /* RTE_HAS_CPUSET */
+/**
+ * Returned by rte_thread_barrier_wait() when call is successful.
+ */
+#define RTE_THREAD_BARRIER_SERIAL_THREAD -1
+
+/**
+ * Thread barrier representation.
+ */
+typedef struct rte_thread_barrier_tag {
+ void *barrier_id; /**< barrrier identifier */
+} rte_thread_barrier;
+
/**
* TLS key type, an opaque pointer.
*/
@@ -302,6 +314,52 @@ __rte_experimental
int rte_thread_set_priority(rte_thread_t thread_id,
enum rte_thread_priority priority);
+/**
+ * Initializes a synchronization barrier.
+ *
+ * @param barrier
+ * A pointer that references the newly created 'barrier' object.
+ *
+ * @param count
+ * The number of threads that must enter the barrier before
+ * the threads can continue execution.
+ *
+ * @return
+ * On success, return 0.
+ * On failure, return a positive errno-style error number.
+ */
+__rte_experimental
+int rte_thread_barrier_init(rte_thread_barrier *barrier, int count);
+
+/**
+ * Causes the calling thread to wait at the synchronization barrier 'barrier'.
+ *
+ * @param barrier
+ * The barrier used for synchronizing the threads.
+ *
+ * @return
+ * Return RTE_THREAD_BARRIER_SERIAL_THREAD for the thread synchronized
+ * at the barrier.
+ * Return 0 for all other threads.
+ * Return a positive errno-style error number, in case of failure.
+ */
+__rte_experimental
+int rte_thread_barrier_wait(rte_thread_barrier *barrier);
+
+/**
+ * Releases all resources used by a synchronization barrier
+ * and uninitializes it.
+ *
+ * @param barrier
+ * The barrier to be destroyed.
+ *
+ * @return
+ * On success, return 0.
+ * On failure, return a positive errno-style error number.
+ */
+__rte_experimental
+int rte_thread_barrier_destroy(rte_thread_barrier *barrier);
+
/**
* Create a TLS data key visible to all threads in the process.
* the created key is later used to get/set a value.
@@ -427,6 +427,9 @@ EXPERIMENTAL {
rte_thread_attr_get_affinity;
rte_thread_attr_set_affinity;
rte_thread_attr_set_priority;
+ rte_thread_barrier_init;
+ rte_thread_barrier_wait;
+ rte_thread_barrier_destroy;
rte_thread_get_affinity_by_id;
rte_thread_set_affinity_by_id;
rte_thread_get_priority;
@@ -504,6 +504,62 @@ rte_thread_detach(rte_thread_t thread_id)
return 0;
}
+int
+rte_thread_barrier_init(rte_thread_barrier *barrier, int count)
+{
+ int ret = 0;
+ SYNCHRONIZATION_BARRIER *sync_barrier = NULL;
+
+ RTE_VERIFY(barrier != NULL);
+ RTE_VERIFY(count > 0);
+
+ sync_barrier = calloc(1, sizeof(*sync_barrier));
+ if (sync_barrier == NULL) {
+ RTE_LOG(DEBUG, EAL, "Unable to initialize barrier. Insufficient memory!\n");
+ ret = ENOMEM;
+ goto cleanup;
+ }
+ if (!InitializeSynchronizationBarrier(sync_barrier, count, -1)) {
+ ret = thread_log_last_error("InitializeSynchronizationBarrier()");
+ goto cleanup;
+ }
+
+ barrier->barrier_id = sync_barrier;
+ sync_barrier = NULL;
+
+cleanup:
+ free(sync_barrier);
+ return ret;
+}
+
+int
+rte_thread_barrier_wait(rte_thread_barrier *barrier)
+{
+ RTE_VERIFY(barrier != NULL);
+ RTE_VERIFY(barrier->barrier_id != NULL);
+
+ if (EnterSynchronizationBarrier(barrier->barrier_id,
+ SYNCHRONIZATION_BARRIER_FLAGS_BLOCK_ONLY)) {
+
+ return RTE_THREAD_BARRIER_SERIAL_THREAD;
+ }
+
+ return 0;
+}
+
+int
+rte_thread_barrier_destroy(rte_thread_barrier *barrier)
+{
+ RTE_VERIFY(barrier != NULL);
+
+ DeleteSynchronizationBarrier(barrier->barrier_id);
+
+ free(barrier->barrier_id);
+ barrier->barrier_id = NULL;
+
+ return 0;
+}
+
int
rte_thread_key_create(rte_thread_key *key,
__rte_unused void (*destructor)(void *))