From patchwork Mon Nov 10 14:44:02 2014 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Qinglai Xiao X-Patchwork-Id: 1239 Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [IPv6:::1]) by dpdk.org (Postfix) with ESMTP id 0333A7F2C; Mon, 10 Nov 2014 15:34:23 +0100 (CET) Received: from mail-la0-f50.google.com (mail-la0-f50.google.com [209.85.215.50]) by dpdk.org (Postfix) with ESMTP id 9E1EE7E9D for ; Mon, 10 Nov 2014 15:34:20 +0100 (CET) Received: by mail-la0-f50.google.com with SMTP id hs14so1521596lab.23 for ; Mon, 10 Nov 2014 06:44:06 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=from:to:cc:subject:date:message-id; bh=DXw73YKmuWn+Us+THRPRvGwO6LJIvHskPxl8vm109UY=; b=kDmi7B/ZTmWeoS7zYTDYPX3jjOS0wEgIASEwMFe9e+50jewaVVpXEo0x5+p6JhbXE7 hKIQ8UYXRjvopOQW2CjeEO9hrwLuR9jY1TYZtstxz91mDOgif9QgLMOp/9b9nI6eKW/e O9IMTq3o+1sn6jbPB7GVLmFM9yue6R/GHSorq4EVM1hqKZNKyW2fWpC556zJ3mgY26t/ acdBUcOsKL0f+nMlSOPoV1gAVY8MYIOsG5z7OXYD0rgK1q8rW1Jvf33+PHYmH9KgM1R5 gXh/9Eclc6PGBl4u2axalVKEtEE1pucmraGBBC3QuDqCJdgll4tz8y4gdKbDtLsD8+Ic 5YCQ== X-Received: by 10.112.63.133 with SMTP id g5mr28256986lbs.33.1415630645996; Mon, 10 Nov 2014 06:44:05 -0800 (PST) Received: from localhost.localdomain ([194.251.119.201]) by mx.google.com with ESMTPSA id b4sm5326894lak.28.2014.11.10.06.44.05 for (version=TLSv1.2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Mon, 10 Nov 2014 06:44:05 -0800 (PST) From: Qinglai Xiao To: dev@dpdk.org Date: Mon, 10 Nov 2014 16:44:02 +0200 Message-Id: <1415630642-3905-1-git-send-email-jigsaw@gmail.com> X-Mailer: git-send-email 1.7.1 Cc: Qinglai Xiao Subject: [dpdk-dev] [PATCH v3] Add in_flight_bitmask so as to use full 32 bits of tag. X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: patches and discussions about DPDK List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" With introduction of in_flight_bitmask, the whole 32 bits of tag can be used. Further more, this patch fixed the integer overflow when finding the matched tags. The maximum number workers is now defined as 64, which is length of double-word. The link between number of workers and RTE_MAX_LCORE is now removed. Compile time check is added to ensure the RTE_DISTRIB_MAX_WORKERS is less than or equal to size of double-word. Signed-off-by: Qinglai Xiao Acked-by: Bruce Richardson --- lib/librte_distributor/rte_distributor.c | 64 ++++++++++++++++++++++-------- lib/librte_distributor/rte_distributor.h | 4 ++ 2 files changed, 51 insertions(+), 17 deletions(-) diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c index 3dfec4a..2c5d61c 100644 --- a/lib/librte_distributor/rte_distributor.c +++ b/lib/librte_distributor/rte_distributor.c @@ -62,6 +62,13 @@ #define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1) /** + * Maximum number of workers allowed. + * Be aware of increasing the limit, becaus it is limited by how we track + * in-flight tags. See @in_flight_bitmask and @rte_distributor_process + */ +#define RTE_DISTRIB_MAX_WORKERS 64 + +/** * Buffer structure used to pass the pointer data between cores. This is cache * line aligned, but to improve performance and prevent adjacent cache-line * prefetches of buffers for other workers, e.g. when worker 1's buffer is on @@ -91,11 +98,17 @@ struct rte_distributor { char name[RTE_DISTRIBUTOR_NAMESIZE]; /**< Name of the ring. */ unsigned num_workers; /**< Number of workers polling */ - uint32_t in_flight_tags[RTE_MAX_LCORE]; - /**< Tracks the tag being processed per core, 0 == no pkt */ - struct rte_distributor_backlog backlog[RTE_MAX_LCORE]; + uint32_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS]; + /**< Tracks the tag being processed per core */ + uint64_t in_flight_bitmask; + /**< on/off bits for in-flight tags. + * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64 then + * the bitmask has to expand. + */ + + struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS]; - union rte_distributor_buffer bufs[RTE_MAX_LCORE]; + union rte_distributor_buffer bufs[RTE_DISTRIB_MAX_WORKERS]; struct rte_distributor_returned_pkts returns; }; @@ -189,6 +202,7 @@ static inline void handle_worker_shutdown(struct rte_distributor *d, unsigned wkr) { d->in_flight_tags[wkr] = 0; + d->in_flight_bitmask &= ~(1UL << wkr); d->bufs[wkr].bufptr64 = 0; if (unlikely(d->backlog[wkr].count != 0)) { /* On return of a packet, we need to move the @@ -211,7 +225,10 @@ handle_worker_shutdown(struct rte_distributor *d, unsigned wkr) pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >> RTE_DISTRIB_FLAG_BITS)); } - /* recursive call */ + /* recursive call. + * Note that the tags were set before first level call + * to rte_distributor_process. + */ rte_distributor_process(d, pkts, i); bl->count = bl->start = 0; } @@ -242,6 +259,7 @@ process_returns(struct rte_distributor *d) else { d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF; d->in_flight_tags[wkr] = 0; + d->in_flight_bitmask &= ~(1UL << wkr); } oldbuf = data >> RTE_DISTRIB_FLAG_BITS; } else if (data & RTE_DISTRIB_RETURN_BUF) { @@ -284,14 +302,18 @@ rte_distributor_process(struct rte_distributor *d, next_value = (((int64_t)(uintptr_t)next_mb) << RTE_DISTRIB_FLAG_BITS); /* - * Set the low bit on the tag, so we can guarantee that - * we never store a tag value of zero. That means we can - * use the zero-value to indicate that no packet is - * being processed by a worker. + * User is advocated to set tag vaue for each + * mbuf before calling rte_distributor_process. + * User defined tags are used to identify flows, + * or sessions. */ - new_tag = (next_mb->hash.usr | 1); + new_tag = next_mb->hash.usr; - uint32_t match = 0; + /* + * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64 + * then the size of match has to be expanded. + */ + uint64_t match = 0; unsigned i; /* * to scan for a match use "xor" and "not" to get a 0/1 @@ -303,9 +325,12 @@ rte_distributor_process(struct rte_distributor *d, match |= (!(d->in_flight_tags[i] ^ new_tag) << i); + /* Only turned-on bits are considered as match */ + match &= d->in_flight_bitmask; + if (match) { next_mb = NULL; - unsigned worker = __builtin_ctz(match); + unsigned worker = __builtin_ctzl(match); if (add_to_backlog(&d->backlog[worker], next_value) < 0) next_idx--; @@ -322,6 +347,7 @@ rte_distributor_process(struct rte_distributor *d, else { d->bufs[wkr].bufptr64 = next_value; d->in_flight_tags[wkr] = new_tag; + d->in_flight_bitmask |= (1UL << wkr); next_mb = NULL; } oldbuf = data >> RTE_DISTRIB_FLAG_BITS; @@ -379,11 +405,13 @@ rte_distributor_returned_pkts(struct rte_distributor *d, static inline unsigned total_outstanding(const struct rte_distributor *d) { - unsigned wkr, total_outstanding = 0; + unsigned wkr, total_outstanding; + + total_outstanding = __builtin_popcountl(d->in_flight_bitmask); for (wkr = 0; wkr < d->num_workers; wkr++) - total_outstanding += d->backlog[wkr].count + - !!(d->in_flight_tags[wkr]); + total_outstanding += d->backlog[wkr].count; + return total_outstanding; } @@ -423,9 +451,11 @@ rte_distributor_create(const char *name, /* compilation-time checks */ RTE_BUILD_BUG_ON((sizeof(*d) & CACHE_LINE_MASK) != 0); - RTE_BUILD_BUG_ON((RTE_MAX_LCORE & 7) != 0); + RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0); + RTE_BUILD_BUG_ON(RTE_DISTRIB_MAX_WORKERS > + sizeof(d->in_flight_bitmask) * CHAR_BIT); - if (name == NULL || num_workers >= RTE_MAX_LCORE) { + if (name == NULL || num_workers >= RTE_DISTRIB_MAX_WORKERS) { rte_errno = EINVAL; return NULL; } diff --git a/lib/librte_distributor/rte_distributor.h b/lib/librte_distributor/rte_distributor.h index ec0d74a..cc1d559 100644 --- a/lib/librte_distributor/rte_distributor.h +++ b/lib/librte_distributor/rte_distributor.h @@ -88,6 +88,10 @@ rte_distributor_create(const char *name, unsigned socket_id, * packets. The distributor will ensure that no two packets that have the * same flow id, or tag, in the mbuf will be procesed at the same time. * + * The user is advocated to set tag for each mbuf before calling this function. + * If user doesn't set the tag, the tag value can be various values depending on + * driver implementation and configuration. + * * This is not multi-thread safe and should only be called on a single lcore. * * @param d