diff mbox series

回复:[PATCH v3 9/9] ring: add C11 memory model for new sync modes

Message ID 149fa735-0cd8-47e4-9c8c-47b066f3c609.jielong.zjl@antfin.com (mailing list archive)
State Not Applicable, archived
Headers show
Series 回复:[PATCH v3 9/9] ring: add C11 memory model for new sync modes | expand

Commit Message

周介龙 April 4, 2020, 2:16 p.m. UTC
Hi,
This patchset really helps my case a lot. 
We have a case which would have several hundreds of threads 
get/put concurrently on single mempool(ring-based) without cache. 
Even on a simplified test case in which 32 threads putting into mempool 
concurrently for 1024 times respectively, it works extremely slow and 
would be finished after tens of seconds. With new sync modes introduced 
by this patchset, the case could finish in 2816979 cycles(RTS)  and 
2279615(HTS) cycles.
It would be great if we could have simple APIs to enable these sync modes
for structures based on rte_ring like mempool.
------------------------------------------------------------------
发件人:Konstantin Ananyev <konstantin.ananyev@intel.com>
发送时间:2020年4月4日(星期六) 01:43
收件人:dev <dev@dpdk.org>
抄 送:honnappa.nagarahalli <honnappa.nagarahalli@arm.com>; david.marchand <david.marchand@redhat.com>; 周介龙(卓腾) <jielong.zjl@antfin.com>; Konstantin Ananyev <konstantin.ananyev@intel.com>
主 题:[PATCH v3 9/9] ring: add C11 memory model for new sync modes

Add C11 atomics based implementation for RTS and HTS
head/tail update primitivies.

Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
---
 lib/librte_ring/Makefile               |   4 +-
 lib/librte_ring/meson.build            |   2 +
 lib/librte_ring/rte_ring_hts.h         |   4 +
 lib/librte_ring/rte_ring_hts_c11_mem.h | 222 +++++++++++++++++++++++++
 lib/librte_ring/rte_ring_hts_elem.h    |   4 +
 lib/librte_ring/rte_ring_rts.h         |   4 +
 lib/librte_ring/rte_ring_rts_c11_mem.h | 198 ++++++++++++++++++++++
 lib/librte_ring/rte_ring_rts_elem.h    |   4 +
 8 files changed, 441 insertions(+), 1 deletion(-)
 create mode 100644 lib/librte_ring/rte_ring_hts_c11_mem.h
 create mode 100644 lib/librte_ring/rte_ring_rts_c11_mem.h
diff mbox series

Patch

diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
index 5f8662737..927d105bf 100644
--- a/lib/librte_ring/Makefile
+++ b/lib/librte_ring/Makefile
@@ -22,9 +22,11 @@  SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
      rte_ring_hts.h \
      rte_ring_hts_elem.h \
      rte_ring_hts_generic.h \
+     rte_ring_hts_c11_mem.h \
      rte_ring_peek.h \
      rte_ring_rts.h \
      rte_ring_rts_elem.h \
-     rte_ring_rts_generic.h
+     rte_ring_rts_generic.h \
+     rte_ring_rts_c11_mem.h

 include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build
index f5f84dc6e..f2e37a8e4 100644
--- a/lib/librte_ring/meson.build
+++ b/lib/librte_ring/meson.build
@@ -7,10 +7,12 @@  headers = files('rte_ring.h',
   'rte_ring_c11_mem.h',
   'rte_ring_generic.h',
   'rte_ring_hts.h',
+  'rte_ring_hts_c11_mem.h',
   'rte_ring_hts_elem.h',
   'rte_ring_hts_generic.h',
   'rte_ring_peek.h',
   'rte_ring_rts.h',
+  'rte_ring_rts_c11_mem.h',
   'rte_ring_rts_elem.h',
   'rte_ring_rts_generic.h')

diff --git a/lib/librte_ring/rte_ring_hts.h b/lib/librte_ring/rte_ring_hts.h
index 062d7be6c..ddaa47ff1 100644
--- a/lib/librte_ring/rte_ring_hts.h
+++ b/lib/librte_ring/rte_ring_hts.h
@@ -29,7 +29,11 @@ 
 extern "C" {
 #endif

+#ifdef RTE_USE_C11_MEM_MODEL
+#include <rte_ring_hts_c11_mem.h>
+#else
 #include <rte_ring_hts_generic.h>
+#endif

 /**
  * @internal Enqueue several objects on the HTS ring.
diff --git a/lib/librte_ring/rte_ring_hts_c11_mem.h b/lib/librte_ring/rte_ring_hts_c11_mem.h
new file mode 100644
index 000000000..0218d0e7d
--- /dev/null
+++ b/lib/librte_ring/rte_ring_hts_c11_mem.h
@@ -0,0 +1,222 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2010-2020 Intel Corporation
+ * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
+ * All rights reserved.
+ * Derived from FreeBSD's bufring.h
+ * Used as BSD-3 Licensed with permission from Kip Macy.
+ */
+
+#ifndef _RTE_RING_HTS_C11_MEM_H_
+#define _RTE_RING_HTS_C11_MEM_H_
+
+/**
+ * @file rte_ring_hts_c11_mem.h
+ * It is not recommended to include this file directly,
+ * include <rte_ring.h> instead.
+ * Contains internal helper functions for head/tail sync (HTS) ring mode.
+ * For more information please refer to <rte_ring_hts.h>.
+ */
+
+/**
+ * @internal get current tail value.
+ * Check that user didn't request to move tail above the head.
+ * In that situation:
+ * - return zero, that will cause abort any pending changes and
+ *   return head to its previous position.
+ * - throw an assert in debug mode.
+ */
+static __rte_always_inline uint32_t
+__rte_ring_hts_get_tail(struct rte_ring_hts_headtail *ht, uint32_t *tail,
+ uint32_t num)
+{
+ uint32_t n;
+ union rte_ring_ht_pos p;
+
+ p.raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_RELAXED);
+ n = p.pos.head - p.pos.tail;
+
+ RTE_ASSERT(n >= num);
+ num = (n >= num) ? num : 0;
+
+ *tail = p.pos.tail;
+ return num;
+}
+
+/**
+ * @internal set new values for head and tail as one atomic 64 bit operation.
+ * Should be used only in conjunction with __rte_ring_hts_get_tail.
+ */
+static __rte_always_inline void
+__rte_ring_hts_set_head_tail(struct rte_ring_hts_headtail *ht, uint32_t tail,
+ uint32_t num, uint32_t enqueue)
+{
+ union rte_ring_ht_pos p;
+
+ RTE_SET_USED(enqueue);
+
+ p.pos.head = tail + num;
+ p.pos.tail = p.pos.head;
+
+ __atomic_store_n(&ht->ht.raw, p.raw, __ATOMIC_RELEASE);
+}
+
+static __rte_always_inline void
+__rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t num,
+ uint32_t enqueue)
+{
+ uint32_t tail;
+
+ num = __rte_ring_hts_get_tail(ht, &tail, num);
+ __rte_ring_hts_set_head_tail(ht, tail, num, enqueue);
+}
+
+/**
+ * @internal waits till tail will become equal to head.
+ * Means no writer/reader is active for that ring.
+ * Suppose to work as serialization point.
+ */
+static __rte_always_inline void
+__rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht,
+  union rte_ring_ht_pos *p)
+{
+ p->raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_ACQUIRE);
+
+ while (p->pos.head != p->pos.tail) {
+  rte_pause();
+  p->raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_ACQUIRE);
+ }
+}
+
+/**
+ * @internal This function updates the producer head for enqueue
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sp
+ *   Indicates whether multi-producer path is needed or not
+ * @param n
+ *   The number of elements we will want to enqueue, i.e. how far should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where enqueue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where enqueue finishes
+ * @param free_entries
+ *   Returns the amount of free space in the ring BEFORE head was moved
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
+ enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+ uint32_t *free_entries)
+{
+ uint32_t n;
+ union rte_ring_ht_pos np, op;
+
+ const uint32_t capacity = r->capacity;
+
+ do {
+  /* Reset n to the initial burst count */
+  n = num;
+
+  /* wait for tail to be equal to head, , acquire point */
+  __rte_ring_hts_head_wait(&r->hts_prod, &op);
+
+  /*
+   *  The subtraction is done between two unsigned 32bits value
+   * (the result is always modulo 32 bits even if we have
+   * *old_head > cons_tail). So 'free_entries' is always between 0
+   * and capacity (which is < size).
+   */
+  *free_entries = capacity + r->cons.tail - op.pos.head;
+
+  /* check that we have enough room in ring */
+  if (unlikely(n > *free_entries))
+   n = (behavior == RTE_RING_QUEUE_FIXED) ?
+     0 : *free_entries;
+
+  if (n == 0)
+   break;
+
+  np.pos.tail = op.pos.tail;
+  np.pos.head = op.pos.head + n;
+
+ } while (__atomic_compare_exchange_n(&r->hts_prod.ht.raw,
+   &op.raw, np.raw,
+   0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
+
+ *old_head = op.pos.head;
+ return n;
+}
+
+/**
+ * @internal This function updates the consumer head for dequeue
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sc
+ *   Indicates whether multi-consumer path is needed or not
+ * @param n
+ *   The number of elements we will want to enqueue, i.e. how far should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where dequeue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where dequeue finishes
+ * @param entries
+ *   Returns the number of entries in the ring BEFORE head was moved
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num,
+ enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+ uint32_t *entries)
+{
+ uint32_t n;
+ union rte_ring_ht_pos np, op;
+
+ /* move cons.head atomically */
+ do {
+  /* Restore n as it may change every loop */
+  n = num;
+
+  /* wait for tail to be equal to head */
+  __rte_ring_hts_head_wait(&r->hts_cons, &op);
+
+  /* The subtraction is done between two unsigned 32bits value
+   * (the result is always modulo 32 bits even if we have
+   * cons_head > prod_tail). So 'entries' is always between 0
+   * and size(ring)-1.
+   */
+  *entries = r->prod.tail - op.pos.head;
+
+  /* Set the actual entries for dequeue */
+  if (n > *entries)
+   n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+  if (unlikely(n == 0))
+   break;
+
+  np.pos.tail = op.pos.tail;
+  np.pos.head = op.pos.head + n;
+
+ } while (__atomic_compare_exchange_n(&r->hts_cons.ht.raw,
+   &op.raw, np.raw,
+   0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
+
+ *old_head = op.pos.head;
+ return n;
+}
+
+#endif /* _RTE_RING_HTS_C11_MEM_H_ */
diff --git a/lib/librte_ring/rte_ring_hts_elem.h b/lib/librte_ring/rte_ring_hts_elem.h
index 34f0d121d..1e9a49c7a 100644
--- a/lib/librte_ring/rte_ring_hts_elem.h
+++ b/lib/librte_ring/rte_ring_hts_elem.h
@@ -24,7 +24,11 @@ 
 extern "C" {
 #endif

+#ifdef RTE_USE_C11_MEM_MODEL
+#include <rte_ring_hts_c11_mem.h>
+#else
 #include <rte_ring_hts_generic.h>
+#endif

 /**
  * @internal Enqueue several objects on the HTS ring.
diff --git a/lib/librte_ring/rte_ring_rts.h b/lib/librte_ring/rte_ring_rts.h
index 18404fe48..28b2d25f5 100644
--- a/lib/librte_ring/rte_ring_rts.h
+++ b/lib/librte_ring/rte_ring_rts.h
@@ -55,7 +55,11 @@ 
 extern "C" {
 #endif

+#ifdef RTE_USE_C11_MEM_MODEL
+#include <rte_ring_rts_c11_mem.h>
+#else
 #include <rte_ring_rts_generic.h>
+#endif

 /**
  * @internal Enqueue several objects on the RTS ring.
diff --git a/lib/librte_ring/rte_ring_rts_c11_mem.h b/lib/librte_ring/rte_ring_rts_c11_mem.h
new file mode 100644
index 000000000..b72901497
--- /dev/null
+++ b/lib/librte_ring/rte_ring_rts_c11_mem.h
@@ -0,0 +1,198 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2010-2017 Intel Corporation
+ * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
+ * All rights reserved.
+ * Derived from FreeBSD's bufring.h
+ * Used as BSD-3 Licensed with permission from Kip Macy.
+ */
+
+#ifndef _RTE_RING_RTS_C11_MEM_H_
+#define _RTE_RING_RTS_C11_MEM_H_
+
+/**
+ * @file rte_ring_rts_c11_mem.h
+ * It is not recommended to include this file directly,
+ * include <rte_ring.h> instead.
+ * Contains internal helper functions for Relaxed Tail Sync (RTS) ring mode.
+ * For more information please refer to <rte_ring_rts.h>.
+ */
+
+/**
+ * @internal This function updates tail values.
+ */
+static __rte_always_inline void
+__rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
+{
+ union rte_ring_ht_poscnt h, ot, nt;
+
+ /*
+  * If there are other enqueues/dequeues in progress that
+  * might preceded us, then don't update tail with new value.
+  */
+
+ ot.raw = __atomic_load_n(&ht->tail.raw, __ATOMIC_ACQUIRE);
+
+ do {
+  /* on 32-bit systems we have to do atomic read here */
+  h.raw = __atomic_load_n(&ht->head.raw, __ATOMIC_RELAXED);
+
+  nt.raw = ot.raw;
+  if (++nt.val.cnt == h.val.cnt)
+   nt.val.pos = h.val.pos;
+
+ } while (__atomic_compare_exchange_n(&ht->tail.raw, &ot.raw, nt.raw,
+   0, __ATOMIC_RELEASE, __ATOMIC_ACQUIRE) == 0);
+}
+
+/**
+ * @internal This function waits till head/tail distance wouldn't
+ * exceed pre-defined max value.
+ */
+static __rte_always_inline void
+__rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht,
+ union rte_ring_ht_poscnt *h)
+{
+ uint32_t max;
+
+ max = ht->htd_max;
+ h->raw = __atomic_load_n(&ht->head.raw, __ATOMIC_ACQUIRE);
+
+ while (h->val.pos - ht->tail.val.pos > max) {
+  rte_pause();
+  h->raw = __atomic_load_n(&ht->head.raw, __ATOMIC_ACQUIRE);
+ }
+}
+
+/**
+ * @internal This function updates the producer head for enqueue.
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sp
+ *   Indicates whether multi-producer path is needed or not
+ * @param n
+ *   The number of elements we will want to enqueue, i.e. how far should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where enqueue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where enqueue finishes
+ * @param free_entries
+ *   Returns the amount of free space in the ring BEFORE head was moved
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline uint32_t
+__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
+ enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+ uint32_t *free_entries)
+{
+ uint32_t n;
+ union rte_ring_ht_poscnt nh, oh;
+
+ const uint32_t capacity = r->capacity;
+
+ do {
+  /* Reset n to the initial burst count */
+  n = num;
+
+  /* read prod head (may spin on prod tail, acquire point) */
+  __rte_ring_rts_head_wait(&r->rts_prod, &oh);
+
+  /*
+   *  The subtraction is done between two unsigned 32bits value
+   * (the result is always modulo 32 bits even if we have
+   * *old_head > cons_tail). So 'free_entries' is always between 0
+   * and capacity (which is < size).
+   */
+  *free_entries = capacity + r->cons.tail - oh.val.pos;
+
+  /* check that we have enough room in ring */
+  if (unlikely(n > *free_entries))
+   n = (behavior == RTE_RING_QUEUE_FIXED) ?
+     0 : *free_entries;
+
+  if (n == 0)
+   break;
+
+  nh.val.pos = oh.val.pos + n;
+  nh.val.cnt = oh.val.cnt + 1;
+
+ } while (__atomic_compare_exchange_n(&r->rts_prod.head.raw,
+   &oh.raw, nh.raw,
+   0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
+
+ *old_head = oh.val.pos;
+ return n;
+}
+
+/**
+ * @internal This function updates the consumer head for dequeue
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sc
+ *   Indicates whether multi-consumer path is needed or not
+ * @param n
+ *   The number of elements we will want to enqueue, i.e. how far should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where dequeue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where dequeue finishes
+ * @param entries
+ *   Returns the number of entries in the ring BEFORE head was moved
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num,
+ enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+ uint32_t *entries)
+{
+ uint32_t n;
+ union rte_ring_ht_poscnt nh, oh;
+
+ /* move cons.head atomically */
+ do {
+  /* Restore n as it may change every loop */
+  n = num;
+
+  /* read cons head (may spin on cons tail, acquire point) */
+  __rte_ring_rts_head_wait(&r->rts_cons, &oh);
+
+  /* The subtraction is done between two unsigned 32bits value
+   * (the result is always modulo 32 bits even if we have
+   * cons_head > prod_tail). So 'entries' is always between 0
+   * and size(ring)-1.
+   */
+  *entries = r->prod.tail - oh.val.pos;
+
+  /* Set the actual entries for dequeue */
+  if (n > *entries)
+   n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+  if (unlikely(n == 0))
+   break;
+
+  nh.val.pos = oh.val.pos + n;
+  nh.val.cnt = oh.val.cnt + 1;
+
+ } while (__atomic_compare_exchange_n(&r->rts_cons.head.raw,
+   &oh.raw, nh.raw,
+   1, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
+
+ *old_head = oh.val.pos;
+ return n;
+}
+
+#endif /* _RTE_RING_RTS_C11_MEM_H_ */
diff --git a/lib/librte_ring/rte_ring_rts_elem.h b/lib/librte_ring/rte_ring_rts_elem.h
index 71a331b23..23d8aeec7 100644
--- a/lib/librte_ring/rte_ring_rts_elem.h
+++ b/lib/librte_ring/rte_ring_rts_elem.h
@@ -24,7 +24,11 @@ 
 extern "C" {
 #endif

+#ifdef RTE_USE_C11_MEM_MODEL
+#include <rte_ring_rts_c11_mem.h>
+#else
 #include <rte_ring_rts_generic.h>
+#endif

 /**
  * @internal Enqueue several objects on the RTS ring.