[v3,9/9] mempool/bucket: handle non-EAL lcores

Message ID 20200622132531.21857-10-david.marchand@redhat.com (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers
Series Register non-EAL threads as lcore |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/travis-robot success Travis build: passed
ci/Intel-compilation success Compilation OK

Commit Message

David Marchand June 22, 2020, 1:25 p.m. UTC
  Convert to new lcore API to support non-EAL lcores.

Signed-off-by: David Marchand <david.marchand@redhat.com>
---
 drivers/mempool/bucket/rte_mempool_bucket.c | 131 ++++++++++++--------
 1 file changed, 82 insertions(+), 49 deletions(-)
  

Comments

Andrew Rybchenko June 23, 2020, 5:28 p.m. UTC | #1
On 6/22/20 4:25 PM, David Marchand wrote:
> Convert to new lcore API to support non-EAL lcores.
> 
> Signed-off-by: David Marchand <david.marchand@redhat.com>
> ---
>  drivers/mempool/bucket/rte_mempool_bucket.c | 131 ++++++++++++--------
>  1 file changed, 82 insertions(+), 49 deletions(-)
> 
> diff --git a/drivers/mempool/bucket/rte_mempool_bucket.c b/drivers/mempool/bucket/rte_mempool_bucket.c
> index 5ce1ef16fb..0b4f42d330 100644
> --- a/drivers/mempool/bucket/rte_mempool_bucket.c
> +++ b/drivers/mempool/bucket/rte_mempool_bucket.c
> @@ -55,6 +55,7 @@ struct bucket_data {
>  	struct rte_ring *shared_orphan_ring;
>  	struct rte_mempool *pool;
>  	unsigned int bucket_mem_size;
> +	void *lcore_callback_handle;
>  };
>  
>  static struct bucket_stack *
> @@ -345,6 +346,22 @@ bucket_dequeue_contig_blocks(struct rte_mempool *mp, void **first_obj_table,
>  	return 0;
>  }
>  
> +struct bucket_per_lcore_ctx {

The structure is not used in per-lcore init and uninit
functions. So, it is better to add _count to make it
count specified. I.e. bucket_count_per_lcore_ctx.

> +	const struct bucket_data *bd;
> +	unsigned int count;
> +};
> +
> +static int
> +count_per_lcore(unsigned int lcore_id, void *arg)
> +{
> +	struct bucket_per_lcore_ctx *ctx = arg;
> +
> +	ctx->count += ctx->bd->obj_per_bucket *
> +		ctx->bd->buckets[lcore_id]->top;
> +	ctx->count += rte_ring_count(ctx->bd->adoption_buffer_rings[lcore_id]);
> +	return 0;
> +}
> +
>  static void
>  count_underfilled_buckets(struct rte_mempool *mp,
>  			  void *opaque,
> @@ -373,23 +390,66 @@ count_underfilled_buckets(struct rte_mempool *mp,
>  static unsigned int
>  bucket_get_count(const struct rte_mempool *mp)
>  {
> -	const struct bucket_data *bd = mp->pool_data;
> -	unsigned int count =
> -		bd->obj_per_bucket * rte_ring_count(bd->shared_bucket_ring) +
> -		rte_ring_count(bd->shared_orphan_ring);
> -	unsigned int i;
> +	struct bucket_per_lcore_ctx ctx;

Just a nit, but I think that ctx is too generic.
(some time ago bucket_data bd was ctx in fact :) )
May be bplc? Up to you.

>  
> -	for (i = 0; i < RTE_MAX_LCORE; i++) {
> -		if (!rte_lcore_is_enabled(i))
> -			continue;
> -		count += bd->obj_per_bucket * bd->buckets[i]->top +
> -			rte_ring_count(bd->adoption_buffer_rings[i]);
> -	}
> +	ctx.bd = mp->pool_data;
> +	ctx.count = ctx.bd->obj_per_bucket *
> +		rte_ring_count(ctx.bd->shared_bucket_ring);
> +	ctx.count += rte_ring_count(ctx.bd->shared_orphan_ring);
>  
> +	rte_lcore_iterate(count_per_lcore, &ctx);
>  	rte_mempool_mem_iter((struct rte_mempool *)(uintptr_t)mp,
> -			     count_underfilled_buckets, &count);
> +			     count_underfilled_buckets, &ctx.count);
> +
> +	return ctx.count;
> +}
> +
> +static int
> +bucket_init_per_lcore(unsigned int lcore_id, void *arg)

It should be no bucket_ prefix here, or it should be bucket_
prefix above in count_per_lcore.

> +{
> +	char rg_name[RTE_RING_NAMESIZE];
> +	struct bucket_data *bd = arg;
> +	struct rte_mempool *mp;
> +	int rg_flags;
> +	int rc;
> +
> +	mp = bd->pool;
> +	bd->buckets[lcore_id] = bucket_stack_create(mp,
> +		mp->size / bd->obj_per_bucket);
> +	if (bd->buckets[lcore_id] == NULL)
> +		goto error;
> +
> +	rc = snprintf(rg_name, sizeof(rg_name), RTE_MEMPOOL_MZ_FORMAT ".a%u",
> +		mp->name, lcore_id);
> +	if (rc < 0 || rc >= (int)sizeof(rg_name))
> +		goto error;
> +
> +	rg_flags = RING_F_SC_DEQ;
> +	if (mp->flags & MEMPOOL_F_SP_PUT)
> +		rg_flags |= RING_F_SP_ENQ;
> +	if (mp->flags & MEMPOOL_F_SC_GET)
> +		rg_flags |= RING_F_SC_DEQ;

There is not point to have two above lines here, since
RING_F_SC_DEQ is always set.

> +	bd->adoption_buffer_rings[lcore_id] = rte_ring_create(rg_name,
> +		rte_align32pow2(mp->size + 1), mp->socket_id, rg_flags);
> +	if (bd->adoption_buffer_rings[lcore_id] == NULL)
> +		goto error;
>  
> -	return count;
> +	return 0;
> +error:
> +	rte_free(bd->buckets[lcore_id]);
> +	bd->buckets[lcore_id] = NULL;
> +	return -1;

Why does the API collapse all negative errnos into -1?
(I don't think it is critical, just want to know why).

> +}
> +
> +static void
> +bucket_uninit_per_lcore(unsigned int lcore_id, void *arg)

Same note about bucket_ prefix.
  
David Marchand June 26, 2020, 2:13 p.m. UTC | #2
On Tue, Jun 23, 2020 at 7:28 PM Andrew Rybchenko
<arybchenko@solarflare.com> wrote:
>
> On 6/22/20 4:25 PM, David Marchand wrote:
> > Convert to new lcore API to support non-EAL lcores.
> >
> > Signed-off-by: David Marchand <david.marchand@redhat.com>
> > ---
> >  drivers/mempool/bucket/rte_mempool_bucket.c | 131 ++++++++++++--------
> >  1 file changed, 82 insertions(+), 49 deletions(-)
> >
> > diff --git a/drivers/mempool/bucket/rte_mempool_bucket.c b/drivers/mempool/bucket/rte_mempool_bucket.c
> > index 5ce1ef16fb..0b4f42d330 100644
> > --- a/drivers/mempool/bucket/rte_mempool_bucket.c
> > +++ b/drivers/mempool/bucket/rte_mempool_bucket.c
> > @@ -55,6 +55,7 @@ struct bucket_data {
> >       struct rte_ring *shared_orphan_ring;
> >       struct rte_mempool *pool;
> >       unsigned int bucket_mem_size;
> > +     void *lcore_callback_handle;
> >  };
> >
> >  static struct bucket_stack *
> > @@ -345,6 +346,22 @@ bucket_dequeue_contig_blocks(struct rte_mempool *mp, void **first_obj_table,
> >       return 0;
> >  }
> >
> > +struct bucket_per_lcore_ctx {
>
> The structure is not used in per-lcore init and uninit
> functions. So, it is better to add _count to make it
> count specified. I.e. bucket_count_per_lcore_ctx.

Yes, and this aligns with its only user being renamed from
count_per_lcore to bucket_count_per_lcore.

>
> > +     const struct bucket_data *bd;
> > +     unsigned int count;
> > +};
> > +
> > +static int
> > +count_per_lcore(unsigned int lcore_id, void *arg)
> > +{
> > +     struct bucket_per_lcore_ctx *ctx = arg;
> > +
> > +     ctx->count += ctx->bd->obj_per_bucket *
> > +             ctx->bd->buckets[lcore_id]->top;
> > +     ctx->count += rte_ring_count(ctx->bd->adoption_buffer_rings[lcore_id]);
> > +     return 0;
> > +}
> > +
> >  static void
> >  count_underfilled_buckets(struct rte_mempool *mp,
> >                         void *opaque,
> > @@ -373,23 +390,66 @@ count_underfilled_buckets(struct rte_mempool *mp,
> >  static unsigned int
> >  bucket_get_count(const struct rte_mempool *mp)
> >  {
> > -     const struct bucket_data *bd = mp->pool_data;
> > -     unsigned int count =
> > -             bd->obj_per_bucket * rte_ring_count(bd->shared_bucket_ring) +
> > -             rte_ring_count(bd->shared_orphan_ring);
> > -     unsigned int i;
> > +     struct bucket_per_lcore_ctx ctx;
>
> Just a nit, but I think that ctx is too generic.
> (some time ago bucket_data bd was ctx in fact :) )
> May be bplc? Up to you.

Ack.


>
> >
> > -     for (i = 0; i < RTE_MAX_LCORE; i++) {
> > -             if (!rte_lcore_is_enabled(i))
> > -                     continue;
> > -             count += bd->obj_per_bucket * bd->buckets[i]->top +
> > -                     rte_ring_count(bd->adoption_buffer_rings[i]);
> > -     }
> > +     ctx.bd = mp->pool_data;
> > +     ctx.count = ctx.bd->obj_per_bucket *
> > +             rte_ring_count(ctx.bd->shared_bucket_ring);
> > +     ctx.count += rte_ring_count(ctx.bd->shared_orphan_ring);
> >
> > +     rte_lcore_iterate(count_per_lcore, &ctx);
> >       rte_mempool_mem_iter((struct rte_mempool *)(uintptr_t)mp,
> > -                          count_underfilled_buckets, &count);
> > +                          count_underfilled_buckets, &ctx.count);
> > +
> > +     return ctx.count;
> > +}
> > +
> > +static int
> > +bucket_init_per_lcore(unsigned int lcore_id, void *arg)
>
> It should be no bucket_ prefix here, or it should be bucket_
> prefix above in count_per_lcore.

As mentioned before, ack.


>
> > +{
> > +     char rg_name[RTE_RING_NAMESIZE];
> > +     struct bucket_data *bd = arg;
> > +     struct rte_mempool *mp;
> > +     int rg_flags;
> > +     int rc;
> > +
> > +     mp = bd->pool;
> > +     bd->buckets[lcore_id] = bucket_stack_create(mp,
> > +             mp->size / bd->obj_per_bucket);
> > +     if (bd->buckets[lcore_id] == NULL)
> > +             goto error;
> > +
> > +     rc = snprintf(rg_name, sizeof(rg_name), RTE_MEMPOOL_MZ_FORMAT ".a%u",
> > +             mp->name, lcore_id);
> > +     if (rc < 0 || rc >= (int)sizeof(rg_name))
> > +             goto error;
> > +
> > +     rg_flags = RING_F_SC_DEQ;
> > +     if (mp->flags & MEMPOOL_F_SP_PUT)
> > +             rg_flags |= RING_F_SP_ENQ;
> > +     if (mp->flags & MEMPOOL_F_SC_GET)
> > +             rg_flags |= RING_F_SC_DEQ;
>
> There is not point to have two above lines here, since
> RING_F_SC_DEQ is always set.

Ah yes, I did not realise when moving the code.


>
> > +     bd->adoption_buffer_rings[lcore_id] = rte_ring_create(rg_name,
> > +             rte_align32pow2(mp->size + 1), mp->socket_id, rg_flags);
> > +     if (bd->adoption_buffer_rings[lcore_id] == NULL)
> > +             goto error;
> >
> > -     return count;
> > +     return 0;
> > +error:
> > +     rte_free(bd->buckets[lcore_id]);
> > +     bd->buckets[lcore_id] = NULL;
> > +     return -1;
>
> Why does the API collapse all negative errnos into -1?
> (I don't think it is critical, just want to know why).

I collapsed everything as a single error as we have a partial idea of
what went wrong when calling this callback with all lcores at
registration.
We could get a specific error reported by this callback, but then we
would not know on which lcore (programmatically).
And in the end, all errors will summarize as a lack of resources, I do
not expect a need for input validation.

Maybe you have other use cases in mind?


Thanks for the review.
  
Andrew Rybchenko June 26, 2020, 2:34 p.m. UTC | #3
On 6/26/20 5:13 PM, David Marchand wrote:
> On Tue, Jun 23, 2020 at 7:28 PM Andrew Rybchenko
> <arybchenko@solarflare.com> wrote:
>>
>> On 6/22/20 4:25 PM, David Marchand wrote:
>>> Convert to new lcore API to support non-EAL lcores.
>>>
>>> Signed-off-by: David Marchand <david.marchand@redhat.com>
>>> ---

[snip]

>>> +     bd->adoption_buffer_rings[lcore_id] = rte_ring_create(rg_name,
>>> +             rte_align32pow2(mp->size + 1), mp->socket_id, rg_flags);
>>> +     if (bd->adoption_buffer_rings[lcore_id] == NULL)
>>> +             goto error;
>>>
>>> -     return count;
>>> +     return 0;
>>> +error:
>>> +     rte_free(bd->buckets[lcore_id]);
>>> +     bd->buckets[lcore_id] = NULL;
>>> +     return -1;
>>
>> Why does the API collapse all negative errnos into -1?
>> (I don't think it is critical, just want to know why).
> 
> I collapsed everything as a single error as we have a partial idea of
> what went wrong when calling this callback with all lcores at
> registration.
> We could get a specific error reported by this callback, but then we
> would not know on which lcore (programmatically).
> And in the end, all errors will summarize as a lack of resources, I do
> not expect a need for input validation.

Makes sense. Thanks for explanations.

> Maybe you have other use cases in mind?

No.
  

Patch

diff --git a/drivers/mempool/bucket/rte_mempool_bucket.c b/drivers/mempool/bucket/rte_mempool_bucket.c
index 5ce1ef16fb..0b4f42d330 100644
--- a/drivers/mempool/bucket/rte_mempool_bucket.c
+++ b/drivers/mempool/bucket/rte_mempool_bucket.c
@@ -55,6 +55,7 @@  struct bucket_data {
 	struct rte_ring *shared_orphan_ring;
 	struct rte_mempool *pool;
 	unsigned int bucket_mem_size;
+	void *lcore_callback_handle;
 };
 
 static struct bucket_stack *
@@ -345,6 +346,22 @@  bucket_dequeue_contig_blocks(struct rte_mempool *mp, void **first_obj_table,
 	return 0;
 }
 
+struct bucket_per_lcore_ctx {
+	const struct bucket_data *bd;
+	unsigned int count;
+};
+
+static int
+count_per_lcore(unsigned int lcore_id, void *arg)
+{
+	struct bucket_per_lcore_ctx *ctx = arg;
+
+	ctx->count += ctx->bd->obj_per_bucket *
+		ctx->bd->buckets[lcore_id]->top;
+	ctx->count += rte_ring_count(ctx->bd->adoption_buffer_rings[lcore_id]);
+	return 0;
+}
+
 static void
 count_underfilled_buckets(struct rte_mempool *mp,
 			  void *opaque,
@@ -373,23 +390,66 @@  count_underfilled_buckets(struct rte_mempool *mp,
 static unsigned int
 bucket_get_count(const struct rte_mempool *mp)
 {
-	const struct bucket_data *bd = mp->pool_data;
-	unsigned int count =
-		bd->obj_per_bucket * rte_ring_count(bd->shared_bucket_ring) +
-		rte_ring_count(bd->shared_orphan_ring);
-	unsigned int i;
+	struct bucket_per_lcore_ctx ctx;
 
-	for (i = 0; i < RTE_MAX_LCORE; i++) {
-		if (!rte_lcore_is_enabled(i))
-			continue;
-		count += bd->obj_per_bucket * bd->buckets[i]->top +
-			rte_ring_count(bd->adoption_buffer_rings[i]);
-	}
+	ctx.bd = mp->pool_data;
+	ctx.count = ctx.bd->obj_per_bucket *
+		rte_ring_count(ctx.bd->shared_bucket_ring);
+	ctx.count += rte_ring_count(ctx.bd->shared_orphan_ring);
 
+	rte_lcore_iterate(count_per_lcore, &ctx);
 	rte_mempool_mem_iter((struct rte_mempool *)(uintptr_t)mp,
-			     count_underfilled_buckets, &count);
+			     count_underfilled_buckets, &ctx.count);
+
+	return ctx.count;
+}
+
+static int
+bucket_init_per_lcore(unsigned int lcore_id, void *arg)
+{
+	char rg_name[RTE_RING_NAMESIZE];
+	struct bucket_data *bd = arg;
+	struct rte_mempool *mp;
+	int rg_flags;
+	int rc;
+
+	mp = bd->pool;
+	bd->buckets[lcore_id] = bucket_stack_create(mp,
+		mp->size / bd->obj_per_bucket);
+	if (bd->buckets[lcore_id] == NULL)
+		goto error;
+
+	rc = snprintf(rg_name, sizeof(rg_name), RTE_MEMPOOL_MZ_FORMAT ".a%u",
+		mp->name, lcore_id);
+	if (rc < 0 || rc >= (int)sizeof(rg_name))
+		goto error;
+
+	rg_flags = RING_F_SC_DEQ;
+	if (mp->flags & MEMPOOL_F_SP_PUT)
+		rg_flags |= RING_F_SP_ENQ;
+	if (mp->flags & MEMPOOL_F_SC_GET)
+		rg_flags |= RING_F_SC_DEQ;
+	bd->adoption_buffer_rings[lcore_id] = rte_ring_create(rg_name,
+		rte_align32pow2(mp->size + 1), mp->socket_id, rg_flags);
+	if (bd->adoption_buffer_rings[lcore_id] == NULL)
+		goto error;
 
-	return count;
+	return 0;
+error:
+	rte_free(bd->buckets[lcore_id]);
+	bd->buckets[lcore_id] = NULL;
+	return -1;
+}
+
+static void
+bucket_uninit_per_lcore(unsigned int lcore_id, void *arg)
+{
+	struct bucket_data *bd = arg;
+
+	rte_ring_free(bd->adoption_buffer_rings[lcore_id]);
+	bd->adoption_buffer_rings[lcore_id] = NULL;
+	rte_free(bd->buckets[lcore_id]);
+	bd->buckets[lcore_id] = NULL;
 }
 
 static int
@@ -399,7 +459,6 @@  bucket_alloc(struct rte_mempool *mp)
 	int rc = 0;
 	char rg_name[RTE_RING_NAMESIZE];
 	struct bucket_data *bd;
-	unsigned int i;
 	unsigned int bucket_header_size;
 	size_t pg_sz;
 
@@ -429,36 +488,17 @@  bucket_alloc(struct rte_mempool *mp)
 	/* eventually this should be a tunable parameter */
 	bd->bucket_stack_thresh = (mp->size / bd->obj_per_bucket) * 4 / 3;
 
+	bd->lcore_callback_handle = rte_lcore_callback_register("bucket",
+		bucket_init_per_lcore, bucket_uninit_per_lcore, bd);
+	if (bd->lcore_callback_handle == NULL) {
+		rc = -ENOMEM;
+		goto no_mem_for_stacks;
+	}
+
 	if (mp->flags & MEMPOOL_F_SP_PUT)
 		rg_flags |= RING_F_SP_ENQ;
 	if (mp->flags & MEMPOOL_F_SC_GET)
 		rg_flags |= RING_F_SC_DEQ;
-
-	for (i = 0; i < RTE_MAX_LCORE; i++) {
-		if (!rte_lcore_is_enabled(i))
-			continue;
-		bd->buckets[i] =
-			bucket_stack_create(mp, mp->size / bd->obj_per_bucket);
-		if (bd->buckets[i] == NULL) {
-			rc = -ENOMEM;
-			goto no_mem_for_stacks;
-		}
-		rc = snprintf(rg_name, sizeof(rg_name),
-			      RTE_MEMPOOL_MZ_FORMAT ".a%u", mp->name, i);
-		if (rc < 0 || rc >= (int)sizeof(rg_name)) {
-			rc = -ENAMETOOLONG;
-			goto no_mem_for_stacks;
-		}
-		bd->adoption_buffer_rings[i] =
-			rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
-					mp->socket_id,
-					rg_flags | RING_F_SC_DEQ);
-		if (bd->adoption_buffer_rings[i] == NULL) {
-			rc = -rte_errno;
-			goto no_mem_for_stacks;
-		}
-	}
-
 	rc = snprintf(rg_name, sizeof(rg_name),
 		      RTE_MEMPOOL_MZ_FORMAT ".0", mp->name);
 	if (rc < 0 || rc >= (int)sizeof(rg_name)) {
@@ -498,11 +538,8 @@  bucket_alloc(struct rte_mempool *mp)
 	rte_ring_free(bd->shared_orphan_ring);
 cannot_create_shared_orphan_ring:
 invalid_shared_orphan_ring:
+	rte_lcore_callback_unregister(bd->lcore_callback_handle);
 no_mem_for_stacks:
-	for (i = 0; i < RTE_MAX_LCORE; i++) {
-		rte_free(bd->buckets[i]);
-		rte_ring_free(bd->adoption_buffer_rings[i]);
-	}
 	rte_free(bd);
 no_mem_for_data:
 	rte_errno = -rc;
@@ -512,16 +549,12 @@  bucket_alloc(struct rte_mempool *mp)
 static void
 bucket_free(struct rte_mempool *mp)
 {
-	unsigned int i;
 	struct bucket_data *bd = mp->pool_data;
 
 	if (bd == NULL)
 		return;
 
-	for (i = 0; i < RTE_MAX_LCORE; i++) {
-		rte_free(bd->buckets[i]);
-		rte_ring_free(bd->adoption_buffer_rings[i]);
-	}
+	rte_lcore_callback_unregister(bd->lcore_callback_handle);
 
 	rte_ring_free(bd->shared_orphan_ring);
 	rte_ring_free(bd->shared_bucket_ring);