From patchwork Thu Jun 25 18:48:30 2015 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Zoltan Kiss X-Patchwork-Id: 5797 Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [IPv6:::1]) by dpdk.org (Postfix) with ESMTP id 1794FC702; Thu, 25 Jun 2015 20:48:49 +0200 (CEST) Received: from mail-wi0-f175.google.com (mail-wi0-f175.google.com [209.85.212.175]) by dpdk.org (Postfix) with ESMTP id 6564CC6FE for ; Thu, 25 Jun 2015 20:48:47 +0200 (CEST) Received: by wicnd19 with SMTP id nd19so26144590wic.1 for ; Thu, 25 Jun 2015 11:48:47 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:from:to:cc:subject:date:message-id; bh=9xqDh4jJE+L5Uw4uak60ecGYUdNByf+HcZoITwYLy/c=; b=A9171/CeTn7Kp5HBRDIaBwgAoEFe/K4y30Dl/7biV1u9e6JT8zgkIptgwfyMT/teSu 160M8V68PqkEi20V1A/LMAXJuaOvyfsIvDUVJgX67ISK7I3gtmxGNC5fILB6zICAVVd1 W73v0OzJjqso/bNp1mcvFn1+lF01VsOdz9mLmy28zr52LE8S8AUEcN7Q4BYnywsK7vf5 MKc7V3UcgVDNeoVGnKenwQIwrr5FMPgdlz9iu6o/UXhqksJrSmK0haFcYVBWh72yx1Zg TMKzTjdIVaSECRcsKJNej7SmD6dXkUhl8yUsMUh/OKfmERHdhOnb8vXnPP12Dg50iYGb X8iw== X-Gm-Message-State: ALoCoQkz1nYfwrEQs5tE3tq/iB8bsFospSWkaUKk8hR25HSsE/6kAOCTb4b/YDqw0CasImzyoE4W X-Received: by 10.180.90.228 with SMTP id bz4mr8300236wib.69.1435258127261; Thu, 25 Jun 2015 11:48:47 -0700 (PDT) Received: from localhost.localdomain ([90.152.119.35]) by mx.google.com with ESMTPSA id c2sm44053184wjf.18.2015.06.25.11.48.46 (version=TLSv1.2 cipher=ECDHE-RSA-AES128-SHA bits=128/128); Thu, 25 Jun 2015 11:48:46 -0700 (PDT) From: Zoltan Kiss To: dev@dpdk.org Date: Thu, 25 Jun 2015 19:48:30 +0100 Message-Id: <1435258110-17140-1-git-send-email-zoltan.kiss@linaro.org> X-Mailer: git-send-email 1.9.1 Subject: [dpdk-dev] [PATCH] mempool: improbe cache search X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: patches and discussions about DPDK List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" The current way has a few problems: - if cache->len < n, we copy our elements into the cache first, then into obj_table, that's unnecessary - if n >= cache_size (or the backfill fails), and we can't fulfil the request from the ring alone, we don't try to combine with the cache - if refill fails, we don't return anything, even if the ring has enough for our request This patch rewrites it severely: - at the first part of the function we only try the cache if cache->len < n - otherwise take our elements straight from the ring - if that fails but we have something in the cache, try to combine them - the refill happens at the end, and its failure doesn't modify our return value Signed-off-by: Zoltan Kiss --- lib/librte_mempool/rte_mempool.h | 63 +++++++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 24 deletions(-) diff --git a/lib/librte_mempool/rte_mempool.h b/lib/librte_mempool/rte_mempool.h index a8054e1..896946c 100644 --- a/lib/librte_mempool/rte_mempool.h +++ b/lib/librte_mempool/rte_mempool.h @@ -948,34 +948,14 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table, unsigned lcore_id = rte_lcore_id(); uint32_t cache_size = mp->cache_size; - /* cache is not enabled or single consumer */ + cache = &mp->local_cache[lcore_id]; + /* cache is not enabled or single consumer or not enough */ if (unlikely(cache_size == 0 || is_mc == 0 || - n >= cache_size || lcore_id >= RTE_MAX_LCORE)) + cache->len < n || lcore_id >= RTE_MAX_LCORE)) goto ring_dequeue; - cache = &mp->local_cache[lcore_id]; cache_objs = cache->objs; - /* Can this be satisfied from the cache? */ - if (cache->len < n) { - /* No. Backfill the cache first, and then fill from it */ - uint32_t req = n + (cache_size - cache->len); - - /* How many do we require i.e. number to fill the cache + the request */ - ret = rte_ring_mc_dequeue_bulk(mp->ring, &cache->objs[cache->len], req); - if (unlikely(ret < 0)) { - /* - * In the offchance that we are buffer constrained, - * where we are not able to allocate cache + n, go to - * the ring directly. If that fails, we are truly out of - * buffers. - */ - goto ring_dequeue; - } - - cache->len += req; - } - /* Now fill in the response ... */ for (index = 0, len = cache->len - 1; index < n; ++index, len--, obj_table++) *obj_table = cache_objs[len]; @@ -984,7 +964,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table, __MEMPOOL_STAT_ADD(mp, get_success, n); - return 0; + ret = 0; + goto cache_refill; ring_dequeue: #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */ @@ -995,11 +976,45 @@ ring_dequeue: else ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n); +#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0 + if (ret < 0 && is_mc == 1 && cache->len > 0) { + uint32_t req = n - cache->len; + + ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, req); + if (ret == 0) { + cache_objs = cache->objs; + obj_table += req; + for (index = 0; index < cache->len; + ++index, ++obj_table) + *obj_table = cache_objs[index]; + cache->len = 0; + } + } +#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */ + if (ret < 0) __MEMPOOL_STAT_ADD(mp, get_fail, n); else __MEMPOOL_STAT_ADD(mp, get_success, n); +#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0 +cache_refill: + /* If previous dequeue was OK and we have less than n, start refill */ + if (ret == 0 && cache_size > 0 && cache->len < n) { + uint32_t req = cache_size - cache->len; + + cache_objs = cache->objs; + ret = rte_ring_mc_dequeue_bulk(mp->ring, + &cache->objs[cache->len], + req); + if (likely(ret == 0)) + cache->len += req; + else + /* Don't spoil the return value */ + ret = 0; + } +#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */ + return ret; }