[dpdk-dev] mempool: improbe cache search

Message ID 1435258110-17140-1-git-send-email-zoltan.kiss@linaro.org (mailing list archive)
State Changes Requested, archived
Headers

Commit Message

Zoltan Kiss June 25, 2015, 6:48 p.m. UTC
  The current way has a few problems:

- if cache->len < n, we copy our elements into the cache first, then
  into obj_table, that's unnecessary
- if n >= cache_size (or the backfill fails), and we can't fulfil the
  request from the ring alone, we don't try to combine with the cache
- if refill fails, we don't return anything, even if the ring has enough
  for our request

This patch rewrites it severely:
- at the first part of the function we only try the cache if cache->len < n
- otherwise take our elements straight from the ring
- if that fails but we have something in the cache, try to combine them
- the refill happens at the end, and its failure doesn't modify our return
  value

Signed-off-by: Zoltan Kiss <zoltan.kiss@linaro.org>
---
 lib/librte_mempool/rte_mempool.h | 63 +++++++++++++++++++++++++---------------
 1 file changed, 39 insertions(+), 24 deletions(-)
  

Comments

Olivier Matz June 30, 2015, 11:58 a.m. UTC | #1
Hi Zoltan,

On 06/25/2015 08:48 PM, Zoltan Kiss wrote:
> The current way has a few problems:
>
> - if cache->len < n, we copy our elements into the cache first, then
>    into obj_table, that's unnecessary
> - if n >= cache_size (or the backfill fails), and we can't fulfil the
>    request from the ring alone, we don't try to combine with the cache
> - if refill fails, we don't return anything, even if the ring has enough
>    for our request
>
> This patch rewrites it severely:
> - at the first part of the function we only try the cache if cache->len < n
> - otherwise take our elements straight from the ring
> - if that fails but we have something in the cache, try to combine them
> - the refill happens at the end, and its failure doesn't modify our return
>    value

Indeed, it looks easier to read that way. I checked the performance with
"mempool_perf_autotest" of app/test and it show that there is no
regression (it is even slightly better in some test cases).

There is a small typo in the title: s/improbe/improve
Please see also a comment below.

>
> Signed-off-by: Zoltan Kiss <zoltan.kiss@linaro.org>
> ---
>   lib/librte_mempool/rte_mempool.h | 63 +++++++++++++++++++++++++---------------
>   1 file changed, 39 insertions(+), 24 deletions(-)
>
> diff --git a/lib/librte_mempool/rte_mempool.h b/lib/librte_mempool/rte_mempool.h
> index a8054e1..896946c 100644
> --- a/lib/librte_mempool/rte_mempool.h
> +++ b/lib/librte_mempool/rte_mempool.h
> @@ -948,34 +948,14 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
>   	unsigned lcore_id = rte_lcore_id();
>   	uint32_t cache_size = mp->cache_size;
>
> -	/* cache is not enabled or single consumer */
> +	cache = &mp->local_cache[lcore_id];
> +	/* cache is not enabled or single consumer or not enough */
>   	if (unlikely(cache_size == 0 || is_mc == 0 ||
> -		     n >= cache_size || lcore_id >= RTE_MAX_LCORE))
> +		     cache->len < n || lcore_id >= RTE_MAX_LCORE))
>   		goto ring_dequeue;
>
> -	cache = &mp->local_cache[lcore_id];
>   	cache_objs = cache->objs;
>
> -	/* Can this be satisfied from the cache? */
> -	if (cache->len < n) {
> -		/* No. Backfill the cache first, and then fill from it */
> -		uint32_t req = n + (cache_size - cache->len);
> -
> -		/* How many do we require i.e. number to fill the cache + the request */
> -		ret = rte_ring_mc_dequeue_bulk(mp->ring, &cache->objs[cache->len], req);
> -		if (unlikely(ret < 0)) {
> -			/*
> -			 * In the offchance that we are buffer constrained,
> -			 * where we are not able to allocate cache + n, go to
> -			 * the ring directly. If that fails, we are truly out of
> -			 * buffers.
> -			 */
> -			goto ring_dequeue;
> -		}
> -
> -		cache->len += req;
> -	}
> -
>   	/* Now fill in the response ... */
>   	for (index = 0, len = cache->len - 1; index < n; ++index, len--, obj_table++)
>   		*obj_table = cache_objs[len];
> @@ -984,7 +964,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
>
>   	__MEMPOOL_STAT_ADD(mp, get_success, n);
>
> -	return 0;
> +	ret = 0;
> +	goto cache_refill;
>
>   ring_dequeue:
>   #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
> @@ -995,11 +976,45 @@ ring_dequeue:
>   	else
>   		ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);
>
> +#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
> +	if (ret < 0 && is_mc == 1 && cache->len > 0) {

if (unlikely(ret < 0 && is_mc == 1 && cache->len > 0))  ?


> +		uint32_t req = n - cache->len;
> +
> +		ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, req);
> +		if (ret == 0) {
> +			cache_objs = cache->objs;
> +			obj_table += req;
> +			for (index = 0; index < cache->len;
> +			     ++index, ++obj_table)
> +				*obj_table = cache_objs[index];
> +			cache->len = 0;
> +		}
> +	}
> +#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
> +
>   	if (ret < 0)
>   		__MEMPOOL_STAT_ADD(mp, get_fail, n);
>   	else
>   		__MEMPOOL_STAT_ADD(mp, get_success, n);
>
> +#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
> +cache_refill:
> +	/* If previous dequeue was OK and we have less than n, start refill */
> +	if (ret == 0 && cache_size > 0 && cache->len < n) {

Not sure it's likely or unlikely there. I'll tend to say unlikely
as the cache size is probably big compared to n most of the time.

I don't know if it would have a real performance impact thought, but
I think it won't hurt.


Regards,
Olivier


> +		uint32_t req = cache_size - cache->len;
> +
> +		cache_objs = cache->objs;
> +		ret = rte_ring_mc_dequeue_bulk(mp->ring,
> +					       &cache->objs[cache->len],
> +					       req);
> +		if (likely(ret == 0))
> +			cache->len += req;
> +		else
> +			/* Don't spoil the return value */
> +			ret = 0;
> +	}
> +#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
> +
>   	return ret;
>   }
>
>
  
Zoltan Kiss June 30, 2015, 1:59 p.m. UTC | #2
On 30/06/15 12:58, Olivier MATZ wrote:
> Hi Zoltan,
>
> On 06/25/2015 08:48 PM, Zoltan Kiss wrote:
>> The current way has a few problems:
>>
>> - if cache->len < n, we copy our elements into the cache first, then
>>    into obj_table, that's unnecessary
>> - if n >= cache_size (or the backfill fails), and we can't fulfil the
>>    request from the ring alone, we don't try to combine with the cache
>> - if refill fails, we don't return anything, even if the ring has enough
>>    for our request
>>
>> This patch rewrites it severely:
>> - at the first part of the function we only try the cache if
>> cache->len < n
>> - otherwise take our elements straight from the ring
>> - if that fails but we have something in the cache, try to combine them
>> - the refill happens at the end, and its failure doesn't modify our
>> return
>>    value
>
> Indeed, it looks easier to read that way. I checked the performance with
> "mempool_perf_autotest" of app/test and it show that there is no
> regression (it is even slightly better in some test cases).
>
> There is a small typo in the title: s/improbe/improve

Yes, I'll fix that.


> Please see also a comment below.
>
>>
>> Signed-off-by: Zoltan Kiss <zoltan.kiss@linaro.org>
>> ---
>>   lib/librte_mempool/rte_mempool.h | 63
>> +++++++++++++++++++++++++---------------
>>   1 file changed, 39 insertions(+), 24 deletions(-)
>>
>> diff --git a/lib/librte_mempool/rte_mempool.h
>> b/lib/librte_mempool/rte_mempool.h
>> index a8054e1..896946c 100644
>> --- a/lib/librte_mempool/rte_mempool.h
>> +++ b/lib/librte_mempool/rte_mempool.h
>> @@ -948,34 +948,14 @@ __mempool_get_bulk(struct rte_mempool *mp, void
>> **obj_table,
>>       unsigned lcore_id = rte_lcore_id();
>>       uint32_t cache_size = mp->cache_size;
>>
>> -    /* cache is not enabled or single consumer */
>> +    cache = &mp->local_cache[lcore_id];
>> +    /* cache is not enabled or single consumer or not enough */
>>       if (unlikely(cache_size == 0 || is_mc == 0 ||
>> -             n >= cache_size || lcore_id >= RTE_MAX_LCORE))
>> +             cache->len < n || lcore_id >= RTE_MAX_LCORE))
>>           goto ring_dequeue;
>>
>> -    cache = &mp->local_cache[lcore_id];
>>       cache_objs = cache->objs;
>>
>> -    /* Can this be satisfied from the cache? */
>> -    if (cache->len < n) {
>> -        /* No. Backfill the cache first, and then fill from it */
>> -        uint32_t req = n + (cache_size - cache->len);
>> -
>> -        /* How many do we require i.e. number to fill the cache + the
>> request */
>> -        ret = rte_ring_mc_dequeue_bulk(mp->ring,
>> &cache->objs[cache->len], req);
>> -        if (unlikely(ret < 0)) {
>> -            /*
>> -             * In the offchance that we are buffer constrained,
>> -             * where we are not able to allocate cache + n, go to
>> -             * the ring directly. If that fails, we are truly out of
>> -             * buffers.
>> -             */
>> -            goto ring_dequeue;
>> -        }
>> -
>> -        cache->len += req;
>> -    }
>> -
>>       /* Now fill in the response ... */
>>       for (index = 0, len = cache->len - 1; index < n; ++index, len--,
>> obj_table++)
>>           *obj_table = cache_objs[len];
>> @@ -984,7 +964,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void
>> **obj_table,
>>
>>       __MEMPOOL_STAT_ADD(mp, get_success, n);
>>
>> -    return 0;
>> +    ret = 0;
>> +    goto cache_refill;
>>
>>   ring_dequeue:
>>   #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
>> @@ -995,11 +976,45 @@ ring_dequeue:
>>       else
>>           ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);
>>
>> +#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
>> +    if (ret < 0 && is_mc == 1 && cache->len > 0) {
>
> if (unlikely(ret < 0 && is_mc == 1 && cache->len > 0))  ?

Ok

>
>
>> +        uint32_t req = n - cache->len;
>> +
>> +        ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, req);
>> +        if (ret == 0) {
>> +            cache_objs = cache->objs;
>> +            obj_table += req;
>> +            for (index = 0; index < cache->len;
>> +                 ++index, ++obj_table)
>> +                *obj_table = cache_objs[index];
>> +            cache->len = 0;
>> +        }
>> +    }
>> +#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
>> +
>>       if (ret < 0)
>>           __MEMPOOL_STAT_ADD(mp, get_fail, n);
>>       else
>>           __MEMPOOL_STAT_ADD(mp, get_success, n);
>>
>> +#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
>> +cache_refill:
>> +    /* If previous dequeue was OK and we have less than n, start
>> refill */
>> +    if (ret == 0 && cache_size > 0 && cache->len < n) {
>
> Not sure it's likely or unlikely there. I'll tend to say unlikely
> as the cache size is probably big compared to n most of the time.
>
> I don't know if it would have a real performance impact thought, but
> I think it won't hurt.

I think it's not obvious here which one should happen more often on the 
hot path. I think it's better to follow the rule of thumb: if you are 
not confident about the likelihood, just don't use (un)likely, let the 
branch predictor decide runtime.


>
>
> Regards,
> Olivier
>
>
>> +        uint32_t req = cache_size - cache->len;
>> +
>> +        cache_objs = cache->objs;
>> +        ret = rte_ring_mc_dequeue_bulk(mp->ring,
>> +                           &cache->objs[cache->len],
>> +                           req);
>> +        if (likely(ret == 0))
>> +            cache->len += req;
>> +        else
>> +            /* Don't spoil the return value */
>> +            ret = 0;
>> +    }
>> +#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
>> +
>>       return ret;
>>   }
>>
>>
>
  

Patch

diff --git a/lib/librte_mempool/rte_mempool.h b/lib/librte_mempool/rte_mempool.h
index a8054e1..896946c 100644
--- a/lib/librte_mempool/rte_mempool.h
+++ b/lib/librte_mempool/rte_mempool.h
@@ -948,34 +948,14 @@  __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
 	unsigned lcore_id = rte_lcore_id();
 	uint32_t cache_size = mp->cache_size;
 
-	/* cache is not enabled or single consumer */
+	cache = &mp->local_cache[lcore_id];
+	/* cache is not enabled or single consumer or not enough */
 	if (unlikely(cache_size == 0 || is_mc == 0 ||
-		     n >= cache_size || lcore_id >= RTE_MAX_LCORE))
+		     cache->len < n || lcore_id >= RTE_MAX_LCORE))
 		goto ring_dequeue;
 
-	cache = &mp->local_cache[lcore_id];
 	cache_objs = cache->objs;
 
-	/* Can this be satisfied from the cache? */
-	if (cache->len < n) {
-		/* No. Backfill the cache first, and then fill from it */
-		uint32_t req = n + (cache_size - cache->len);
-
-		/* How many do we require i.e. number to fill the cache + the request */
-		ret = rte_ring_mc_dequeue_bulk(mp->ring, &cache->objs[cache->len], req);
-		if (unlikely(ret < 0)) {
-			/*
-			 * In the offchance that we are buffer constrained,
-			 * where we are not able to allocate cache + n, go to
-			 * the ring directly. If that fails, we are truly out of
-			 * buffers.
-			 */
-			goto ring_dequeue;
-		}
-
-		cache->len += req;
-	}
-
 	/* Now fill in the response ... */
 	for (index = 0, len = cache->len - 1; index < n; ++index, len--, obj_table++)
 		*obj_table = cache_objs[len];
@@ -984,7 +964,8 @@  __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
 
 	__MEMPOOL_STAT_ADD(mp, get_success, n);
 
-	return 0;
+	ret = 0;
+	goto cache_refill;
 
 ring_dequeue:
 #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
@@ -995,11 +976,45 @@  ring_dequeue:
 	else
 		ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);
 
+#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
+	if (ret < 0 && is_mc == 1 && cache->len > 0) {
+		uint32_t req = n - cache->len;
+
+		ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, req);
+		if (ret == 0) {
+			cache_objs = cache->objs;
+			obj_table += req;
+			for (index = 0; index < cache->len;
+			     ++index, ++obj_table)
+				*obj_table = cache_objs[index];
+			cache->len = 0;
+		}
+	}
+#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
+
 	if (ret < 0)
 		__MEMPOOL_STAT_ADD(mp, get_fail, n);
 	else
 		__MEMPOOL_STAT_ADD(mp, get_success, n);
 
+#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
+cache_refill:
+	/* If previous dequeue was OK and we have less than n, start refill */
+	if (ret == 0 && cache_size > 0 && cache->len < n) {
+		uint32_t req = cache_size - cache->len;
+
+		cache_objs = cache->objs;
+		ret = rte_ring_mc_dequeue_bulk(mp->ring,
+					       &cache->objs[cache->len],
+					       req);
+		if (likely(ret == 0))
+			cache->len += req;
+		else
+			/* Don't spoil the return value */
+			ret = 0;
+	}
+#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
+
 	return ret;
 }