From patchwork Sun Sep 8 22:49:48 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Honnappa Nagarahalli X-Patchwork-Id: 58960 X-Patchwork-Delegate: david.marchand@redhat.com Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id A76A91BFF4; Mon, 9 Sep 2019 00:50:55 +0200 (CEST) Received: from foss.arm.com (foss.arm.com [217.140.110.172]) by dpdk.org (Postfix) with ESMTP id 01CEA1EA25; Mon, 9 Sep 2019 00:50:49 +0200 (CEST) Received: from usa-sjc-imap-foss1.foss.arm.com (unknown [10.121.207.14]) by usa-sjc-mx-foss1.foss.arm.com (Postfix) with ESMTP id 7BF5015A2; Sun, 8 Sep 2019 15:50:49 -0700 (PDT) Received: from qc2400f-1.austin.arm.com (qc2400f-1.austin.arm.com [10.118.12.34]) by usa-sjc-imap-foss1.foss.arm.com (Postfix) with ESMTPSA id 6A7C43F67D; Sun, 8 Sep 2019 15:50:49 -0700 (PDT) From: Honnappa Nagarahalli To: honnappa.nagarahalli@arm.com, konstantin.ananyev@intel.com Cc: dev@dpdk.org, stable@dpdk.org Date: Sun, 8 Sep 2019 17:49:48 -0500 Message-Id: <20190908224949.34851-7-honnappa.nagarahalli@arm.com> X-Mailer: git-send-email 2.17.1 In-Reply-To: <20190908224949.34851-1-honnappa.nagarahalli@arm.com> References: <20190908224949.34851-1-honnappa.nagarahalli@arm.com> Subject: [dpdk-dev] [PATCH 6/7] lib/rcu: add least acknowledged token optimization X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" When the rte_rcu_qsbr_check API is called, it is possible to calculate the least valued token acknowledged by all the readers. When the API is called next time, the readers' token counters do not need to be scanned if the value of the token being queried is less than the last least token acknowledged. This avoids the cache line bounces between readers and writer. Fixes: 64994b56cfd7 ("rcu: add RCU library supporting QSBR mechanism") Cc: stable@dpdk.org Signed-off-by: Honnappa Nagarahalli Reviewed-by: Gavin Hu --- lib/librte_rcu/rte_rcu_qsbr.c | 4 ++++ lib/librte_rcu/rte_rcu_qsbr.h | 42 +++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c index ce7f93dd3..c9ca66aaa 100644 --- a/lib/librte_rcu/rte_rcu_qsbr.c +++ b/lib/librte_rcu/rte_rcu_qsbr.c @@ -73,6 +73,7 @@ rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads) __RTE_QSBR_THRID_ARRAY_ELM_SIZE) / __RTE_QSBR_THRID_ARRAY_ELM_SIZE; v->token = __RTE_QSBR_CNT_INIT; + v->acked_token = __RTE_QSBR_CNT_INIT - 1; return 0; } @@ -245,6 +246,9 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v) fprintf(f, " Token = %"PRIu64"\n", __atomic_load_n(&v->token, __ATOMIC_ACQUIRE)); + fprintf(f, " Least Acknowledged Token = %"PRIu64"\n", + __atomic_load_n(&v->acked_token, __ATOMIC_ACQUIRE)); + fprintf(f, "Quiescent State Counts for readers:\n"); for (i = 0; i < v->num_elems; i++) { bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i), diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h index c80f15c00..3f445ba6c 100644 --- a/lib/librte_rcu/rte_rcu_qsbr.h +++ b/lib/librte_rcu/rte_rcu_qsbr.h @@ -83,6 +83,7 @@ struct rte_rcu_qsbr_cnt { #define __RTE_QSBR_CNT_THR_OFFLINE 0 #define __RTE_QSBR_CNT_INIT 1 +#define __RTE_QSBR_CNT_MAX ((uint64_t)~0) /* RTE Quiescent State variable structure. * This structure has two elements that vary in size based on the @@ -93,6 +94,10 @@ struct rte_rcu_qsbr_cnt { struct rte_rcu_qsbr { uint64_t token __rte_cache_aligned; /**< Counter to allow for multiple concurrent quiescent state queries */ + uint64_t acked_token; + /**< Least token acked by all the threads in the last call to + * rte_rcu_qsbr_check API. + */ uint32_t num_elems __rte_cache_aligned; /**< Number of elements in the thread ID array */ @@ -472,6 +477,7 @@ __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait) uint64_t bmap; uint64_t c; uint64_t *reg_thread_id; + uint64_t acked_token = __RTE_QSBR_CNT_MAX; for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0); i < v->num_elems; @@ -493,6 +499,7 @@ __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait) __RTE_RCU_DP_LOG(DEBUG, "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d", __func__, t, wait, c, id+j); + /* Counter is not checked for wrap-around condition * as it is a 64b counter. */ @@ -512,10 +519,25 @@ __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait) continue; } + /* This thread is in quiescent state. Use the counter + * to find the least acknowledged token among all the + * readers. + */ + if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c) + acked_token = c; + bmap &= ~(1UL << j); } } + /* All readers are checked, update least acknowledged token. + * There might be multiple writers trying to update this. There is + * no need to update this very accurately using compare-and-swap. + */ + if (acked_token != __RTE_QSBR_CNT_MAX) + __atomic_store_n(&v->acked_token, acked_token, + __ATOMIC_RELAXED); + return 1; } @@ -528,6 +550,7 @@ __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait) uint32_t i; struct rte_rcu_qsbr_cnt *cnt; uint64_t c; + uint64_t acked_token = __RTE_QSBR_CNT_MAX; for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) { __RTE_RCU_DP_LOG(DEBUG, @@ -538,6 +561,7 @@ __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait) __RTE_RCU_DP_LOG(DEBUG, "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d", __func__, t, wait, c, i); + /* Counter is not checked for wrap-around condition * as it is a 64b counter. */ @@ -550,8 +574,22 @@ __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait) rte_pause(); } + + /* This thread is in quiescent state. Use the counter to find + * the least acknowledged token among all the readers. + */ + if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)) + acked_token = c; } + /* All readers are checked, update least acknowledged token. + * There might be multiple writers trying to update this. There is + * no need to update this very accurately using compare-and-swap. + */ + if (acked_token != __RTE_QSBR_CNT_MAX) + __atomic_store_n(&v->acked_token, acked_token, + __ATOMIC_RELAXED); + return 1; } @@ -595,6 +633,10 @@ rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait) { RTE_ASSERT(v != NULL); + /* Check if all the readers have already acknowledged this token */ + if (likely(t <= v->acked_token)) + return 1; + if (likely(v->num_threads == v->max_threads)) return __rte_rcu_qsbr_check_all(v, t, wait); else