This commit adds an implementation of the lock-free stack push, pop, and
length functions that use __atomic builtins, for systems that benefit from
the finer-grained memory ordering control.
Signed-off-by: Gage Eads <gage.eads@intel.com>
---
lib/librte_stack/Makefile | 3 +-
lib/librte_stack/meson.build | 3 +-
lib/librte_stack/rte_stack.h | 4 +
lib/librte_stack/rte_stack_c11_mem.h | 175 +++++++++++++++++++++++++++++++++++
4 files changed, 183 insertions(+), 2 deletions(-)
create mode 100644 lib/librte_stack/rte_stack_c11_mem.h
@@ -19,6 +19,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_STACK) := rte_stack.c
# install includes
SYMLINK-$(CONFIG_RTE_LIBRTE_STACK)-include := rte_stack.h \
- rte_stack_generic.h
+ rte_stack_generic.h \
+ rte_stack_c11_mem.h
include $(RTE_SDK)/mk/rte.lib.mk
@@ -6,4 +6,5 @@ allow_experimental_apis = true
version = 1
sources = files('rte_stack.c')
headers = files('rte_stack.h',
- 'rte_stack_generic.h')
+ 'rte_stack_generic.h',
+ 'rte_stack_c11_mem.h')
@@ -91,7 +91,11 @@ struct rte_stack {
*/
#define RTE_STACK_F_LF 0x0001
+#ifdef RTE_USE_C11_MEM_MODEL
+#include "rte_stack_c11_mem.h"
+#else
#include "rte_stack_generic.h"
+#endif
/**
* @internal Push several objects on the lock-free stack (MT-safe).
new file mode 100644
@@ -0,0 +1,175 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+
+#ifndef _RTE_STACK_C11_MEM_H_
+#define _RTE_STACK_C11_MEM_H_
+
+#include <rte_branch_prediction.h>
+#include <rte_prefetch.h>
+
+static __rte_always_inline unsigned int
+rte_stack_lf_len(struct rte_stack *s)
+{
+ /* stack_lf_push() and stack_lf_pop() do not update the list's contents
+ * and stack_lf->len atomically, which can cause the list to appear
+ * shorter than it actually is if this function is called while other
+ * threads are modifying the list.
+ *
+ * However, given the inherently approximate nature of the get_count
+ * callback -- even if the list and its size were updated atomically,
+ * the size could change between when get_count executes and when the
+ * value is returned to the caller -- this is acceptable.
+ *
+ * The stack_lf->len updates are placed such that the list may appear to
+ * have fewer elements than it does, but will never appear to have more
+ * elements. If the mempool is near-empty to the point that this is a
+ * concern, the user should consider increasing the mempool size.
+ */
+ return (unsigned int)__atomic_load_n(&s->stack_lf.used.len.cnt,
+ __ATOMIC_RELAXED);
+}
+
+static __rte_always_inline void
+__rte_stack_lf_push(struct rte_stack_lf_list *list,
+ struct rte_stack_lf_elem *first,
+ struct rte_stack_lf_elem *last,
+ unsigned int num)
+{
+#ifndef RTE_ARCH_X86_64
+ RTE_SET_USED(first);
+ RTE_SET_USED(last);
+ RTE_SET_USED(list);
+ RTE_SET_USED(num);
+#else
+ struct rte_stack_lf_head old_head;
+ int success;
+
+ old_head = list->head;
+
+ do {
+ struct rte_stack_lf_head new_head;
+
+ /* Swing the top pointer to the first element in the list and
+ * make the last element point to the old top.
+ */
+ new_head.top = first;
+ new_head.cnt = old_head.cnt + 1;
+
+ last->next = old_head.top;
+
+ /* Use the release memmodel to ensure the writes to the LF LIFO
+ * elements are visible before the head pointer write.
+ */
+ success = rte_atomic128_cmp_exchange(
+ (rte_int128_t *)&list->head,
+ (rte_int128_t *)&old_head,
+ (rte_int128_t *)&new_head,
+ 1, __ATOMIC_RELEASE,
+ __ATOMIC_RELAXED);
+ } while (success == 0);
+
+ /* Ensure the stack modifications are not reordered with respect
+ * to the LIFO len update.
+ */
+ __atomic_add_fetch(&list->len.cnt, num, __ATOMIC_RELEASE);
+#endif
+}
+
+static __rte_always_inline struct rte_stack_lf_elem *
+__rte_stack_lf_pop(struct rte_stack_lf_list *list,
+ unsigned int num,
+ void **obj_table,
+ struct rte_stack_lf_elem **last)
+{
+#ifndef RTE_ARCH_X86_64
+ RTE_SET_USED(obj_table);
+ RTE_SET_USED(last);
+ RTE_SET_USED(list);
+ RTE_SET_USED(num);
+
+ return NULL;
+#else
+ struct rte_stack_lf_head old_head;
+ int success;
+
+ /* Reserve num elements, if available */
+ while (1) {
+ uint64_t len = __atomic_load_n(&list->len.cnt,
+ __ATOMIC_ACQUIRE);
+
+ /* Does the list contain enough elements? */
+ if (unlikely(len < num))
+ return NULL;
+
+ if (__atomic_compare_exchange_n(&list->len.cnt,
+ &len, len - num,
+ 0, __ATOMIC_RELAXED,
+ __ATOMIC_RELAXED))
+ break;
+ }
+
+#ifndef RTE_ARCH_X86_64
+ /* Use the acquire memmodel to ensure the reads to the LF LIFO elements
+ * are properly ordered with respect to the head pointer read.
+ *
+ * Note that for aarch64, GCC's implementation of __atomic_load_16 in
+ * libatomic uses locks, and so this function should be replaced by
+ * a new function (e.g. "rte_atomic128_load()").
+ */
+ __atomic_load((volatile __int128 *)&list->head,
+ &old_head,
+ __ATOMIC_ACQUIRE);
+#else
+ /* x86-64 does not require an atomic load here; if a torn read occurs,
+ * the CAS will fail and set old_head to the correct/latest value.
+ */
+ old_head = list->head;
+#endif
+
+ /* Pop num elements */
+ do {
+ struct rte_stack_lf_head new_head;
+ struct rte_stack_lf_elem *tmp;
+ unsigned int i;
+
+ rte_prefetch0(old_head.top);
+
+ tmp = old_head.top;
+
+ /* Traverse the list to find the new head. A next pointer will
+ * either point to another element or NULL; if a thread
+ * encounters a pointer that has already been popped, the CAS
+ * will fail.
+ */
+ for (i = 0; i < num && tmp != NULL; i++) {
+ rte_prefetch0(tmp->next);
+ if (obj_table)
+ obj_table[i] = tmp->data;
+ if (last)
+ *last = tmp;
+ tmp = tmp->next;
+ }
+
+ /* If NULL was encountered, the list was modified while
+ * traversing it. Retry.
+ */
+ if (i != num)
+ continue;
+
+ new_head.top = tmp;
+ new_head.cnt = old_head.cnt + 1;
+
+ success = rte_atomic128_cmp_exchange(
+ (rte_int128_t *)&list->head,
+ (rte_int128_t *)&old_head,
+ (rte_int128_t *)&new_head,
+ 1, __ATOMIC_ACQUIRE,
+ __ATOMIC_ACQUIRE);
+ } while (success == 0);
+
+ return old_head.top;
+#endif
+}
+
+#endif /* _RTE_STACK_C11_MEM_H_ */