[dpdk-dev] mempool: improbe cache search
Commit Message
The current way has a few problems:
- if cache->len < n, we copy our elements into the cache first, then
into obj_table, that's unnecessary
- if n >= cache_size (or the backfill fails), and we can't fulfil the
request from the ring alone, we don't try to combine with the cache
- if refill fails, we don't return anything, even if the ring has enough
for our request
This patch rewrites it severely:
- at the first part of the function we only try the cache if cache->len < n
- otherwise take our elements straight from the ring
- if that fails but we have something in the cache, try to combine them
- the refill happens at the end, and its failure doesn't modify our return
value
Signed-off-by: Zoltan Kiss <zoltan.kiss@linaro.org>
---
lib/librte_mempool/rte_mempool.h | 63 +++++++++++++++++++++++++---------------
1 file changed, 39 insertions(+), 24 deletions(-)
Comments
Hi Zoltan,
On 06/25/2015 08:48 PM, Zoltan Kiss wrote:
> The current way has a few problems:
>
> - if cache->len < n, we copy our elements into the cache first, then
> into obj_table, that's unnecessary
> - if n >= cache_size (or the backfill fails), and we can't fulfil the
> request from the ring alone, we don't try to combine with the cache
> - if refill fails, we don't return anything, even if the ring has enough
> for our request
>
> This patch rewrites it severely:
> - at the first part of the function we only try the cache if cache->len < n
> - otherwise take our elements straight from the ring
> - if that fails but we have something in the cache, try to combine them
> - the refill happens at the end, and its failure doesn't modify our return
> value
Indeed, it looks easier to read that way. I checked the performance with
"mempool_perf_autotest" of app/test and it show that there is no
regression (it is even slightly better in some test cases).
There is a small typo in the title: s/improbe/improve
Please see also a comment below.
>
> Signed-off-by: Zoltan Kiss <zoltan.kiss@linaro.org>
> ---
> lib/librte_mempool/rte_mempool.h | 63 +++++++++++++++++++++++++---------------
> 1 file changed, 39 insertions(+), 24 deletions(-)
>
> diff --git a/lib/librte_mempool/rte_mempool.h b/lib/librte_mempool/rte_mempool.h
> index a8054e1..896946c 100644
> --- a/lib/librte_mempool/rte_mempool.h
> +++ b/lib/librte_mempool/rte_mempool.h
> @@ -948,34 +948,14 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
> unsigned lcore_id = rte_lcore_id();
> uint32_t cache_size = mp->cache_size;
>
> - /* cache is not enabled or single consumer */
> + cache = &mp->local_cache[lcore_id];
> + /* cache is not enabled or single consumer or not enough */
> if (unlikely(cache_size == 0 || is_mc == 0 ||
> - n >= cache_size || lcore_id >= RTE_MAX_LCORE))
> + cache->len < n || lcore_id >= RTE_MAX_LCORE))
> goto ring_dequeue;
>
> - cache = &mp->local_cache[lcore_id];
> cache_objs = cache->objs;
>
> - /* Can this be satisfied from the cache? */
> - if (cache->len < n) {
> - /* No. Backfill the cache first, and then fill from it */
> - uint32_t req = n + (cache_size - cache->len);
> -
> - /* How many do we require i.e. number to fill the cache + the request */
> - ret = rte_ring_mc_dequeue_bulk(mp->ring, &cache->objs[cache->len], req);
> - if (unlikely(ret < 0)) {
> - /*
> - * In the offchance that we are buffer constrained,
> - * where we are not able to allocate cache + n, go to
> - * the ring directly. If that fails, we are truly out of
> - * buffers.
> - */
> - goto ring_dequeue;
> - }
> -
> - cache->len += req;
> - }
> -
> /* Now fill in the response ... */
> for (index = 0, len = cache->len - 1; index < n; ++index, len--, obj_table++)
> *obj_table = cache_objs[len];
> @@ -984,7 +964,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
>
> __MEMPOOL_STAT_ADD(mp, get_success, n);
>
> - return 0;
> + ret = 0;
> + goto cache_refill;
>
> ring_dequeue:
> #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
> @@ -995,11 +976,45 @@ ring_dequeue:
> else
> ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);
>
> +#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
> + if (ret < 0 && is_mc == 1 && cache->len > 0) {
if (unlikely(ret < 0 && is_mc == 1 && cache->len > 0)) ?
> + uint32_t req = n - cache->len;
> +
> + ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, req);
> + if (ret == 0) {
> + cache_objs = cache->objs;
> + obj_table += req;
> + for (index = 0; index < cache->len;
> + ++index, ++obj_table)
> + *obj_table = cache_objs[index];
> + cache->len = 0;
> + }
> + }
> +#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
> +
> if (ret < 0)
> __MEMPOOL_STAT_ADD(mp, get_fail, n);
> else
> __MEMPOOL_STAT_ADD(mp, get_success, n);
>
> +#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
> +cache_refill:
> + /* If previous dequeue was OK and we have less than n, start refill */
> + if (ret == 0 && cache_size > 0 && cache->len < n) {
Not sure it's likely or unlikely there. I'll tend to say unlikely
as the cache size is probably big compared to n most of the time.
I don't know if it would have a real performance impact thought, but
I think it won't hurt.
Regards,
Olivier
> + uint32_t req = cache_size - cache->len;
> +
> + cache_objs = cache->objs;
> + ret = rte_ring_mc_dequeue_bulk(mp->ring,
> + &cache->objs[cache->len],
> + req);
> + if (likely(ret == 0))
> + cache->len += req;
> + else
> + /* Don't spoil the return value */
> + ret = 0;
> + }
> +#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
> +
> return ret;
> }
>
>
On 30/06/15 12:58, Olivier MATZ wrote:
> Hi Zoltan,
>
> On 06/25/2015 08:48 PM, Zoltan Kiss wrote:
>> The current way has a few problems:
>>
>> - if cache->len < n, we copy our elements into the cache first, then
>> into obj_table, that's unnecessary
>> - if n >= cache_size (or the backfill fails), and we can't fulfil the
>> request from the ring alone, we don't try to combine with the cache
>> - if refill fails, we don't return anything, even if the ring has enough
>> for our request
>>
>> This patch rewrites it severely:
>> - at the first part of the function we only try the cache if
>> cache->len < n
>> - otherwise take our elements straight from the ring
>> - if that fails but we have something in the cache, try to combine them
>> - the refill happens at the end, and its failure doesn't modify our
>> return
>> value
>
> Indeed, it looks easier to read that way. I checked the performance with
> "mempool_perf_autotest" of app/test and it show that there is no
> regression (it is even slightly better in some test cases).
>
> There is a small typo in the title: s/improbe/improve
Yes, I'll fix that.
> Please see also a comment below.
>
>>
>> Signed-off-by: Zoltan Kiss <zoltan.kiss@linaro.org>
>> ---
>> lib/librte_mempool/rte_mempool.h | 63
>> +++++++++++++++++++++++++---------------
>> 1 file changed, 39 insertions(+), 24 deletions(-)
>>
>> diff --git a/lib/librte_mempool/rte_mempool.h
>> b/lib/librte_mempool/rte_mempool.h
>> index a8054e1..896946c 100644
>> --- a/lib/librte_mempool/rte_mempool.h
>> +++ b/lib/librte_mempool/rte_mempool.h
>> @@ -948,34 +948,14 @@ __mempool_get_bulk(struct rte_mempool *mp, void
>> **obj_table,
>> unsigned lcore_id = rte_lcore_id();
>> uint32_t cache_size = mp->cache_size;
>>
>> - /* cache is not enabled or single consumer */
>> + cache = &mp->local_cache[lcore_id];
>> + /* cache is not enabled or single consumer or not enough */
>> if (unlikely(cache_size == 0 || is_mc == 0 ||
>> - n >= cache_size || lcore_id >= RTE_MAX_LCORE))
>> + cache->len < n || lcore_id >= RTE_MAX_LCORE))
>> goto ring_dequeue;
>>
>> - cache = &mp->local_cache[lcore_id];
>> cache_objs = cache->objs;
>>
>> - /* Can this be satisfied from the cache? */
>> - if (cache->len < n) {
>> - /* No. Backfill the cache first, and then fill from it */
>> - uint32_t req = n + (cache_size - cache->len);
>> -
>> - /* How many do we require i.e. number to fill the cache + the
>> request */
>> - ret = rte_ring_mc_dequeue_bulk(mp->ring,
>> &cache->objs[cache->len], req);
>> - if (unlikely(ret < 0)) {
>> - /*
>> - * In the offchance that we are buffer constrained,
>> - * where we are not able to allocate cache + n, go to
>> - * the ring directly. If that fails, we are truly out of
>> - * buffers.
>> - */
>> - goto ring_dequeue;
>> - }
>> -
>> - cache->len += req;
>> - }
>> -
>> /* Now fill in the response ... */
>> for (index = 0, len = cache->len - 1; index < n; ++index, len--,
>> obj_table++)
>> *obj_table = cache_objs[len];
>> @@ -984,7 +964,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void
>> **obj_table,
>>
>> __MEMPOOL_STAT_ADD(mp, get_success, n);
>>
>> - return 0;
>> + ret = 0;
>> + goto cache_refill;
>>
>> ring_dequeue:
>> #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
>> @@ -995,11 +976,45 @@ ring_dequeue:
>> else
>> ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);
>>
>> +#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
>> + if (ret < 0 && is_mc == 1 && cache->len > 0) {
>
> if (unlikely(ret < 0 && is_mc == 1 && cache->len > 0)) ?
Ok
>
>
>> + uint32_t req = n - cache->len;
>> +
>> + ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, req);
>> + if (ret == 0) {
>> + cache_objs = cache->objs;
>> + obj_table += req;
>> + for (index = 0; index < cache->len;
>> + ++index, ++obj_table)
>> + *obj_table = cache_objs[index];
>> + cache->len = 0;
>> + }
>> + }
>> +#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
>> +
>> if (ret < 0)
>> __MEMPOOL_STAT_ADD(mp, get_fail, n);
>> else
>> __MEMPOOL_STAT_ADD(mp, get_success, n);
>>
>> +#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
>> +cache_refill:
>> + /* If previous dequeue was OK and we have less than n, start
>> refill */
>> + if (ret == 0 && cache_size > 0 && cache->len < n) {
>
> Not sure it's likely or unlikely there. I'll tend to say unlikely
> as the cache size is probably big compared to n most of the time.
>
> I don't know if it would have a real performance impact thought, but
> I think it won't hurt.
I think it's not obvious here which one should happen more often on the
hot path. I think it's better to follow the rule of thumb: if you are
not confident about the likelihood, just don't use (un)likely, let the
branch predictor decide runtime.
>
>
> Regards,
> Olivier
>
>
>> + uint32_t req = cache_size - cache->len;
>> +
>> + cache_objs = cache->objs;
>> + ret = rte_ring_mc_dequeue_bulk(mp->ring,
>> + &cache->objs[cache->len],
>> + req);
>> + if (likely(ret == 0))
>> + cache->len += req;
>> + else
>> + /* Don't spoil the return value */
>> + ret = 0;
>> + }
>> +#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
>> +
>> return ret;
>> }
>>
>>
>
@@ -948,34 +948,14 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
unsigned lcore_id = rte_lcore_id();
uint32_t cache_size = mp->cache_size;
- /* cache is not enabled or single consumer */
+ cache = &mp->local_cache[lcore_id];
+ /* cache is not enabled or single consumer or not enough */
if (unlikely(cache_size == 0 || is_mc == 0 ||
- n >= cache_size || lcore_id >= RTE_MAX_LCORE))
+ cache->len < n || lcore_id >= RTE_MAX_LCORE))
goto ring_dequeue;
- cache = &mp->local_cache[lcore_id];
cache_objs = cache->objs;
- /* Can this be satisfied from the cache? */
- if (cache->len < n) {
- /* No. Backfill the cache first, and then fill from it */
- uint32_t req = n + (cache_size - cache->len);
-
- /* How many do we require i.e. number to fill the cache + the request */
- ret = rte_ring_mc_dequeue_bulk(mp->ring, &cache->objs[cache->len], req);
- if (unlikely(ret < 0)) {
- /*
- * In the offchance that we are buffer constrained,
- * where we are not able to allocate cache + n, go to
- * the ring directly. If that fails, we are truly out of
- * buffers.
- */
- goto ring_dequeue;
- }
-
- cache->len += req;
- }
-
/* Now fill in the response ... */
for (index = 0, len = cache->len - 1; index < n; ++index, len--, obj_table++)
*obj_table = cache_objs[len];
@@ -984,7 +964,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
__MEMPOOL_STAT_ADD(mp, get_success, n);
- return 0;
+ ret = 0;
+ goto cache_refill;
ring_dequeue:
#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
@@ -995,11 +976,45 @@ ring_dequeue:
else
ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);
+#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
+ if (ret < 0 && is_mc == 1 && cache->len > 0) {
+ uint32_t req = n - cache->len;
+
+ ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, req);
+ if (ret == 0) {
+ cache_objs = cache->objs;
+ obj_table += req;
+ for (index = 0; index < cache->len;
+ ++index, ++obj_table)
+ *obj_table = cache_objs[index];
+ cache->len = 0;
+ }
+ }
+#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
+
if (ret < 0)
__MEMPOOL_STAT_ADD(mp, get_fail, n);
else
__MEMPOOL_STAT_ADD(mp, get_success, n);
+#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
+cache_refill:
+ /* If previous dequeue was OK and we have less than n, start refill */
+ if (ret == 0 && cache_size > 0 && cache->len < n) {
+ uint32_t req = cache_size - cache->len;
+
+ cache_objs = cache->objs;
+ ret = rte_ring_mc_dequeue_bulk(mp->ring,
+ &cache->objs[cache->len],
+ req);
+ if (likely(ret == 0))
+ cache->len += req;
+ else
+ /* Don't spoil the return value */
+ ret = 0;
+ }
+#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
+
return ret;
}