[RFC,v3,3/5] lib/rcu: add dynamic memory allocation capability

Message ID 20190222070427.22866-4-honnappa.nagarahalli@arm.com (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers
Series rcu: add RCU library supporting QSBR mechanism |

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/Intel-compilation success Compilation OK

Commit Message

Honnappa Nagarahalli Feb. 22, 2019, 7:04 a.m. UTC
  rte_rcu_qsbr_get_memsize API is introduced. This will allow the user
to controll the amount of memory used based on the maximum
number of threads present in the application.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
---
 lib/librte_rcu/rte_rcu_qsbr.c |  51 ++++++++++++---
 lib/librte_rcu/rte_rcu_qsbr.h | 118 +++++++++++++++++++++-------------
 2 files changed, 118 insertions(+), 51 deletions(-)
  

Patch

diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c
index 3c2577ee2..02464fdba 100644
--- a/lib/librte_rcu/rte_rcu_qsbr.c
+++ b/lib/librte_rcu/rte_rcu_qsbr.c
@@ -21,11 +21,39 @@ 
 
 #include "rte_rcu_qsbr.h"
 
+/* Get the memory size of QSBR variable */
+unsigned int __rte_experimental
+rte_rcu_qsbr_get_memsize(uint32_t max_threads)
+{
+	int n;
+	ssize_t sz;
+
+	RTE_ASSERT(max_threads == 0);
+
+	sz = sizeof(struct rte_rcu_qsbr);
+
+	/* Add the size of the registered thread ID bitmap array */
+	n = RTE_ALIGN(max_threads, RTE_QSBR_THRID_ARRAY_ELM_SIZE);
+	sz += RTE_QSBR_THRID_ARRAY_SIZE(n);
+
+	/* Add the size of quiescent state counter array */
+	sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;
+
+	return RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+}
+
 /* Initialize a quiescent state variable */
 void __rte_experimental
-rte_rcu_qsbr_init(struct rte_rcu_qsbr *v)
+rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
 {
-	memset(v, 0, sizeof(struct rte_rcu_qsbr));
+	RTE_ASSERT(v == NULL);
+
+	memset(v, 0, rte_rcu_qsbr_get_memsize(max_threads));
+	v->m_threads = max_threads;
+	v->ma_threads = RTE_ALIGN(max_threads, RTE_QSBR_THRID_ARRAY_ELM_SIZE);
+
+	v->num_elems = v->ma_threads/RTE_QSBR_THRID_ARRAY_ELM_SIZE;
+	v->thrid_array_size = RTE_QSBR_THRID_ARRAY_SIZE(v->ma_threads);
 }
 
 /* Dump the details of a single quiescent state variable to a file. */
@@ -39,9 +67,15 @@  rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
 
 	fprintf(f, "\nQuiescent State Variable @%p\n", v);
 
+	fprintf(f, "  QS variable memory size = %u\n",
+				rte_rcu_qsbr_get_memsize(v->m_threads));
+	fprintf(f, "  Given # max threads = %u\n", v->m_threads);
+	fprintf(f, "  Adjusted # max threads = %u\n", v->ma_threads);
+
 	fprintf(f, "  Registered thread ID mask = 0x");
-	for (i = 0; i < RTE_QSBR_BIT_MAP_ELEMS; i++)
-		fprintf(f, "%lx", __atomic_load_n(&v->reg_thread_id[i],
+	for (i = 0; i < v->num_elems; i++)
+		fprintf(f, "%lx", __atomic_load_n(
+					RTE_QSBR_THRID_ARRAY_ELM(v, i),
 					__ATOMIC_ACQUIRE));
 	fprintf(f, "\n");
 
@@ -49,14 +83,15 @@  rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
 			__atomic_load_n(&v->token, __ATOMIC_ACQUIRE));
 
 	fprintf(f, "Quiescent State Counts for readers:\n");
-	for (i = 0; i < RTE_QSBR_BIT_MAP_ELEMS; i++) {
-		bmap = __atomic_load_n(&v->reg_thread_id[i],
+	for (i = 0; i < v->num_elems; i++) {
+		bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),
 					__ATOMIC_ACQUIRE);
 		while (bmap) {
 			t = __builtin_ctzl(bmap);
 			fprintf(f, "thread ID = %d, count = %lu\n", t,
-				__atomic_load_n(&v->w[i].cnt,
-						__ATOMIC_RELAXED));
+				__atomic_load_n(
+					&RTE_QSBR_CNT_ARRAY_ELM(v, i)->cnt,
+					__ATOMIC_RELAXED));
 			bmap &= ~(1UL << t);
 		}
 	}
diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h
index 53e00488b..21fa2c198 100644
--- a/lib/librte_rcu/rte_rcu_qsbr.h
+++ b/lib/librte_rcu/rte_rcu_qsbr.h
@@ -29,46 +29,71 @@  extern "C" {
 #include <rte_lcore.h>
 #include <rte_debug.h>
 
-/**< Maximum number of reader threads supported. */
-#define RTE_RCU_MAX_THREADS 128
-
-#if !RTE_IS_POWER_OF_2(RTE_RCU_MAX_THREADS)
-#error RTE_RCU_MAX_THREADS must be a power of 2
-#endif
-
-/**< Number of array elements required for the bit-map */
-#define RTE_QSBR_BIT_MAP_ELEMS (RTE_RCU_MAX_THREADS/(sizeof(uint64_t) * 8))
-
-/* Thread IDs are stored as a bitmap of 64b element array. Given thread id
- * needs to be converted to index into the array and the id within
- * the array element.
+/* Registered thread IDs are stored as a bitmap of 64b element array.
+ * Given thread id needs to be converted to index into the array and
+ * the id within the array element.
+ */
+/* Thread ID array size
+ * @param ma_threads
+ *   num of threads aligned to 64
  */
-#define RTE_QSBR_THR_INDEX_SHIFT 6
-#define RTE_QSBR_THR_ID_MASK 0x3f
+#define RTE_QSBR_THRID_ARRAY_SIZE(ma_threads) \
+	RTE_ALIGN((ma_threads) >> 3, RTE_CACHE_LINE_SIZE)
+#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
+#define RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *)(v + 1) + i)
+#define RTE_QSBR_THRID_INDEX_SHIFT 6
+#define RTE_QSBR_THRID_MASK 0x3f
 
 /* Worker thread counter */
 struct rte_rcu_qsbr_cnt {
 	uint64_t cnt; /**< Quiescent state counter. */
 } __rte_cache_aligned;
 
+#define RTE_QSBR_CNT_ARRAY_ELM(v, i) ((struct rte_rcu_qsbr_cnt *) \
+	((uint8_t *)(v + 1) + v->thrid_array_size) + i)
+
 /**
  * RTE thread Quiescent State structure.
+ * The following data, which is dependent on the maximum number of
+ * threads using this variable, is stored in memory immediately
+ * following this structure.
+ *
+ * 1) registered thread ID bitmap array
+ *    This is a uint64_t array enough to hold 'ma_threads' number
+ *    of thread IDs.
+ * 2) quiescent state counter array
+ *    This is an array of 'struct rte_rcu_qsbr_cnt' with
+ *    'm_threads' number of elements.
  */
 struct rte_rcu_qsbr {
-	uint64_t reg_thread_id[RTE_QSBR_BIT_MAP_ELEMS] __rte_cache_aligned;
-	/**< Registered reader thread IDs - reader threads reporting
-	 * on this QS variable represented in a bit map.
-	 */
-
 	uint64_t token __rte_cache_aligned;
 	/**< Counter to allow for multiple simultaneous QS queries */
 
-	struct rte_rcu_qsbr_cnt w[RTE_RCU_MAX_THREADS] __rte_cache_aligned;
-	/**< QS counter for each reader thread, counts upto
-	 * current value of token.
-	 */
+	uint32_t thrid_array_size __rte_cache_aligned;
+	/**< Registered thread ID bitmap array size in bytes */
+	uint32_t num_elems;
+	/**< Number of elements in the thread ID array */
+
+	uint32_t m_threads;
+	/**< Maximum number of threads this RCU variable will use */
+	uint32_t ma_threads;
+	/**< Maximum number of threads aligned to 32 */
 } __rte_cache_aligned;
 
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Return the size of the memory occupied by a Quiescent State (QS) variable.
+ *
+ * @param max_threads
+ *   Maximum number of threads reporting QS on this variable.
+ * @return
+ *   Size of memory in bytes required for this QS variable.
+ */
+unsigned int __rte_experimental
+rte_rcu_qsbr_get_memsize(uint32_t max_threads);
+
 /**
  * @warning
  * @b EXPERIMENTAL: this API may change without prior notice
@@ -77,10 +102,12 @@  struct rte_rcu_qsbr {
  *
  * @param v
  *   QS variable
+ * @param max_threads
+ *   Maximum number of threads reporting QS on this variable.
  *
  */
 void __rte_experimental
-rte_rcu_qsbr_init(struct rte_rcu_qsbr *v);
+rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
 
 /**
  * @warning
@@ -108,24 +135,25 @@  rte_rcu_qsbr_register_thread(struct rte_rcu_qsbr *v, unsigned int thread_id)
 {
 	unsigned int i, id;
 
-	RTE_ASSERT(v == NULL || thread_id >= RTE_RCU_MAX_THREADS);
+	RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
 
-	id = thread_id & RTE_QSBR_THR_ID_MASK;
-	i = thread_id >> RTE_QSBR_THR_INDEX_SHIFT;
+	id = thread_id & RTE_QSBR_THRID_MASK;
+	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;
 
 	/* Worker thread has to count the quiescent states
 	 * only from the current value of token.
 	 * __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
 	 * 'cnt' (64b) is accessed atomically.
 	 */
-	__atomic_store_n(&v->w[thread_id].cnt,
+	__atomic_store_n(&RTE_QSBR_CNT_ARRAY_ELM(v, thread_id)->cnt,
 		__atomic_load_n(&v->token, __ATOMIC_ACQUIRE),
 		__ATOMIC_RELAXED);
 
 	/* Release the store to initial TQS count so that readers
 	 * can use it immediately after this function returns.
 	 */
-	__atomic_fetch_or(&v->reg_thread_id[i], 1UL << id, __ATOMIC_RELEASE);
+	__atomic_fetch_or(RTE_QSBR_THRID_ARRAY_ELM(v, i),
+		1UL << id, __ATOMIC_RELEASE);
 }
 
 /**
@@ -151,16 +179,16 @@  rte_rcu_qsbr_unregister_thread(struct rte_rcu_qsbr *v, unsigned int thread_id)
 {
 	unsigned int i, id;
 
-	RTE_ASSERT(v == NULL || thread_id >= RTE_RCU_MAX_THREADS);
+	RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
 
-	id = thread_id & RTE_QSBR_THR_ID_MASK;
-	i = thread_id >> RTE_QSBR_THR_INDEX_SHIFT;
+	id = thread_id & RTE_QSBR_THRID_MASK;
+	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;
 
 	/* Make sure the removal of the thread from the list of
 	 * reporting threads is visible before the thread
 	 * does anything else.
 	 */
-	__atomic_fetch_and(&v->reg_thread_id[i],
+	__atomic_fetch_and(RTE_QSBR_THRID_ARRAY_ELM(v, i),
 				~(1UL << id), __ATOMIC_RELEASE);
 }
 
@@ -212,7 +240,7 @@  rte_rcu_qsbr_update(struct rte_rcu_qsbr *v, unsigned int thread_id)
 {
 	uint64_t t;
 
-	RTE_ASSERT(v == NULL || thread_id >= RTE_RCU_MAX_THREADS);
+	RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
 
 	/* Load the token before the reader thread loads any other
 	 * (lock-free) data structure. This ensures that updates
@@ -228,8 +256,10 @@  rte_rcu_qsbr_update(struct rte_rcu_qsbr *v, unsigned int thread_id)
 	 * Copy the current token value. This will end grace period
 	 * of multiple concurrent writers.
 	 */
-	if (__atomic_load_n(&v->w[thread_id].cnt, __ATOMIC_RELAXED) != t)
-		__atomic_store_n(&v->w[thread_id].cnt, t, __ATOMIC_RELAXED);
+	if (__atomic_load_n(&RTE_QSBR_CNT_ARRAY_ELM(v, thread_id)->cnt,
+				__ATOMIC_RELAXED) != t)
+		__atomic_store_n(&RTE_QSBR_CNT_ARRAY_ELM(v, thread_id)->cnt,
+				 t, __ATOMIC_RELAXED);
 }
 
 /**
@@ -268,18 +298,20 @@  rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
 		/* Load the current registered thread bit map before
 		 * loading the reader thread quiescent state counters.
 		 */
-		bmap = __atomic_load_n(&v->reg_thread_id[i], __ATOMIC_ACQUIRE);
-		id = i << RTE_QSBR_THR_INDEX_SHIFT;
+		bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),
+				__ATOMIC_ACQUIRE);
+		id = i << RTE_QSBR_THRID_INDEX_SHIFT;
 
 		while (bmap) {
 			j = __builtin_ctzl(bmap);
 
-/* printf ("Status check: token = %lu, wait = %d, Bit Map = 0x%x, Thread ID = %d\n", t, wait, bmap, id+j); */
+/* printf ("Status check: token = %lu, wait = %d, Bit Map = 0x%lx, Thread ID = %d\n", t, wait, bmap, id+j); */
 			/* __atomic_load_n(cnt, __ATOMIC_RELAXED)
 			 * is used to ensure 'cnt' (64b) is accessed
 			 * atomically.
 			 */
-			if (unlikely(__atomic_load_n(&v->w[id + j].cnt,
+			if (unlikely(__atomic_load_n(
+					&RTE_QSBR_CNT_ARRAY_ELM(v, id + j)->cnt,
 					__ATOMIC_RELAXED) < t)) {
 
 /* printf ("Status not in QS: token = %lu, Wait = %d, Thread QS cnt = %lu, Thread ID = %d\n", t, wait, RTE_QSBR_CNT_ARRAY_ELM(v, id + j)->cnt, id+j); */
@@ -292,7 +324,7 @@  rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
 				 * Re-read the bitmap.
 				 */
 				bmap = __atomic_load_n(
-						&v->reg_thread_id[i],
+						RTE_QSBR_THRID_ARRAY_ELM(v, i),
 						__ATOMIC_ACQUIRE);
 
 				continue;
@@ -302,7 +334,7 @@  rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
 		}
 
 		i++;
-	} while (i < RTE_QSBR_BIT_MAP_ELEMS);
+	} while (i < v->num_elems);
 
 	return 1;
 }