diff mbox

[dpdk-dev] Add in_flight_bitmask so as to use full 32 bits of tag.

Message ID 1415615912-46212-1-git-send-email-jigsaw@gmail.com (mailing list archive)
State Superseded, archived
Headers show

Commit Message

Qinglai Xiao Nov. 10, 2014, 10:38 a.m. UTC
User application is advocated to set the newly introduced union field
mbuf->hash.usr as flow id, which is uint32_t.
With introduction of in_flight_bitmask, the whole 32 bits of tag can
be used.

Further more, this patch fixed the integer overflow when finding the
matched tags.

Note that currently librte_distributor supports up to 64 worker
threads. If more workers are needed, the size of in_flight_bitmask and
the algorithm of finding matched tag must be revised.

Signed-off-by: Qinglai Xiao <jigsaw@gmail.com>
---
 app/test/test_distributor.c              |   18 ++++++------
 app/test/test_distributor_perf.c         |    4 +-
 lib/librte_distributor/rte_distributor.c |   45 +++++++++++++++++++++--------
 lib/librte_distributor/rte_distributor.h |    3 ++
 lib/librte_mbuf/rte_mbuf.h               |    1 +
 5 files changed, 47 insertions(+), 24 deletions(-)

Comments

Bruce Richardson Nov. 10, 2014, 11:09 a.m. UTC | #1
On Mon, Nov 10, 2014 at 12:38:32PM +0200, Qinglai Xiao wrote:
> User application is advocated to set the newly introduced union field
> mbuf->hash.usr as flow id, which is uint32_t.
> With introduction of in_flight_bitmask, the whole 32 bits of tag can
> be used.
> 
> Further more, this patch fixed the integer overflow when finding the
> matched tags.
> 
> Note that currently librte_distributor supports up to 64 worker
> threads. If more workers are needed, the size of in_flight_bitmask and
> the algorithm of finding matched tag must be revised.
> 
> Signed-off-by: Qinglai Xiao <jigsaw@gmail.com>

Hi,

this would probably be better as two patches rather than one. One patch to
add the hash.usr field, and then use it in the distributor. The change to
the distributor to add the bit mask to allow all bits of the tag to be used
should then go as a separate patch.

Regards,
/Bruce


> ---
>  app/test/test_distributor.c              |   18 ++++++------
>  app/test/test_distributor_perf.c         |    4 +-
>  lib/librte_distributor/rte_distributor.c |   45 +++++++++++++++++++++--------
>  lib/librte_distributor/rte_distributor.h |    3 ++
>  lib/librte_mbuf/rte_mbuf.h               |    1 +
>  5 files changed, 47 insertions(+), 24 deletions(-)
> 
> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
> index ce06436..9e8c06d 100644
> --- a/app/test/test_distributor.c
> +++ b/app/test/test_distributor.c
> @@ -120,7 +120,7 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
>  	/* now set all hash values in all buffers to zero, so all pkts go to the
>  	 * one worker thread */
>  	for (i = 0; i < BURST; i++)
> -		bufs[i]->hash.rss = 0;
> +		bufs[i]->hash.usr = 0;
>  
>  	rte_distributor_process(d, bufs, BURST);
>  	rte_distributor_flush(d);
> @@ -142,7 +142,7 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
>  	if (rte_lcore_count() >= 3) {
>  		clear_packet_count();
>  		for (i = 0; i < BURST; i++)
> -			bufs[i]->hash.rss = (i & 1) << 8;
> +			bufs[i]->hash.usr = (i & 1) << 8;
>  
>  		rte_distributor_process(d, bufs, BURST);
>  		rte_distributor_flush(d);
> @@ -167,7 +167,7 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
>  	 * so load gets distributed */
>  	clear_packet_count();
>  	for (i = 0; i < BURST; i++)
> -		bufs[i]->hash.rss = i;
> +		bufs[i]->hash.usr = i;
>  
>  	rte_distributor_process(d, bufs, BURST);
>  	rte_distributor_flush(d);
> @@ -199,7 +199,7 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
>  		return -1;
>  	}
>  	for (i = 0; i < BIG_BATCH; i++)
> -		many_bufs[i]->hash.rss = i << 2;
> +		many_bufs[i]->hash.usr = i << 2;
>  
>  	for (i = 0; i < BIG_BATCH/BURST; i++) {
>  		rte_distributor_process(d, &many_bufs[i*BURST], BURST);
> @@ -280,7 +280,7 @@ sanity_test_with_mbuf_alloc(struct rte_distributor *d, struct rte_mempool *p)
>  		while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
>  			rte_distributor_process(d, NULL, 0);
>  		for (j = 0; j < BURST; j++) {
> -			bufs[j]->hash.rss = (i+j) << 1;
> +			bufs[j]->hash.usr = (i+j) << 1;
>  			rte_mbuf_refcnt_set(bufs[j], 1);
>  		}
>  
> @@ -359,7 +359,7 @@ sanity_test_with_worker_shutdown(struct rte_distributor *d,
>  	/* now set all hash values in all buffers to zero, so all pkts go to the
>  	 * one worker thread */
>  	for (i = 0; i < BURST; i++)
> -		bufs[i]->hash.rss = 0;
> +		bufs[i]->hash.usr = 0;
>  
>  	rte_distributor_process(d, bufs, BURST);
>  	/* at this point, we will have processed some packets and have a full
> @@ -372,7 +372,7 @@ sanity_test_with_worker_shutdown(struct rte_distributor *d,
>  		return -1;
>  	}
>  	for (i = 0; i < BURST; i++)
> -		bufs[i]->hash.rss = 0;
> +		bufs[i]->hash.usr = 0;
>  
>  	/* get worker zero to quit */
>  	zero_quit = 1;
> @@ -416,7 +416,7 @@ test_flush_with_worker_shutdown(struct rte_distributor *d,
>  	/* now set all hash values in all buffers to zero, so all pkts go to the
>  	 * one worker thread */
>  	for (i = 0; i < BURST; i++)
> -		bufs[i]->hash.rss = 0;
> +		bufs[i]->hash.usr = 0;
>  
>  	rte_distributor_process(d, bufs, BURST);
>  	/* at this point, we will have processed some packets and have a full
> @@ -488,7 +488,7 @@ quit_workers(struct rte_distributor *d, struct rte_mempool *p)
>  	zero_quit = 0;
>  	quit = 1;
>  	for (i = 0; i < num_workers; i++)
> -		bufs[i]->hash.rss = i << 1;
> +		bufs[i]->hash.usr = i << 1;
>  	rte_distributor_process(d, bufs, num_workers);
>  
>  	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
> diff --git a/app/test/test_distributor_perf.c b/app/test/test_distributor_perf.c
> index b04864c..48ee344 100644
> --- a/app/test/test_distributor_perf.c
> +++ b/app/test/test_distributor_perf.c
> @@ -159,7 +159,7 @@ perf_test(struct rte_distributor *d, struct rte_mempool *p)
>  	}
>  	/* ensure we have different hash value for each pkt */
>  	for (i = 0; i < BURST; i++)
> -		bufs[i]->hash.rss = i;
> +		bufs[i]->hash.usr = i;
>  
>  	start = rte_rdtsc();
>  	for (i = 0; i < (1<<ITER_POWER); i++)
> @@ -198,7 +198,7 @@ quit_workers(struct rte_distributor *d, struct rte_mempool *p)
>  
>  	quit = 1;
>  	for (i = 0; i < num_workers; i++)
> -		bufs[i]->hash.rss = i << 1;
> +		bufs[i]->hash.usr = i << 1;
>  	rte_distributor_process(d, bufs, num_workers);
>  
>  	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
> diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
> index 656ee5c..a84ea97 100644
> --- a/lib/librte_distributor/rte_distributor.c
> +++ b/lib/librte_distributor/rte_distributor.c
> @@ -92,7 +92,13 @@ struct rte_distributor {
>  	unsigned num_workers;                 /**< Number of workers polling */
>  
>  	uint32_t in_flight_tags[RTE_MAX_LCORE];
> -		/**< Tracks the tag being processed per core, 0 == no pkt */
> +		/**< Tracks the tag being processed per core */
> +	uint64_t in_flight_bitmask;
> +		/**< on/off bits for in-flight tags.
> +		 * Note that if RTE_MAX_LCORE is larger than 64 then
> +		 * the bitmask has to expand.
> +		 */
> +
>  	struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
>  
>  	union rte_distributor_buffer bufs[RTE_MAX_LCORE];
> @@ -189,6 +195,7 @@ static inline void
>  handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
>  {
>  	d->in_flight_tags[wkr] = 0;
> +	d->in_flight_bitmask &= ~(1UL << wkr);
>  	d->bufs[wkr].bufptr64 = 0;
>  	if (unlikely(d->backlog[wkr].count != 0)) {
>  		/* On return of a packet, we need to move the
> @@ -211,7 +218,10 @@ handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
>  			pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
>  					RTE_DISTRIB_FLAG_BITS));
>  		}
> -		/* recursive call */
> +		/* recursive call.
> +		 * Note that the tags were set before first level call
> +		 * to rte_distributor_process.
> +		 */
>  		rte_distributor_process(d, pkts, i);
>  		bl->count = bl->start = 0;
>  	}
> @@ -242,6 +252,7 @@ process_returns(struct rte_distributor *d)
>  			else {
>  				d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF;
>  				d->in_flight_tags[wkr] = 0;
> +				d->in_flight_bitmask &= ~(1UL << wkr);
>  			}
>  			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
>  		} else if (data & RTE_DISTRIB_RETURN_BUF) {
> @@ -284,17 +295,21 @@ rte_distributor_process(struct rte_distributor *d,
>  			next_value = (((int64_t)(uintptr_t)next_mb)
>  					<< RTE_DISTRIB_FLAG_BITS);
>  			/*
> -			 * Set the low bit on the tag, so we can guarantee that
> -			 * we never store a tag value of zero. That means we can
> -			 * use the zero-value to indicate that no packet is
> -			 * being processed by a worker.
> +			 * User is advocated to set tag vaue for each
> +			 * mbuf before calling rte_distributor_process.
> +			 * User defined tags are used to identify flows,
> +			 * or sessions.
>  			 */
> -			new_tag = (next_mb->hash.rss | 1);
> +			new_tag = next_mb->hash.usr;
>  
> -			uint32_t match = 0;
> +			/*
> +			 * Note that if RTE_MAX_LCORE is larger than 64 then
> +			 * the size of match has to be expanded.
> +			 */
> +			uint64_t match = 0;
>  			unsigned i;
>  			/*
> -			 * to scan for a match use "xor" and "not" to get a 0/1
> +			 * To scan for a match use "xor" and "not" to get a 0/1
>  			 * value, then use shifting to merge to single "match"
>  			 * variable, where a one-bit indicates a match for the
>  			 * worker given by the bit-position
> @@ -303,9 +318,11 @@ rte_distributor_process(struct rte_distributor *d,
>  				match |= (!(d->in_flight_tags[i] ^ new_tag)
>  					<< i);
>  
> +			/* Only turned-on bits are considered as match */
> +			match &= d->in_flight_bitmask;
>  			if (match) {
>  				next_mb = NULL;
> -				unsigned worker = __builtin_ctz(match);
> +				unsigned worker = __builtin_ctzl(match);
>  				if (add_to_backlog(&d->backlog[worker],
>  						next_value) < 0)
>  					next_idx--;
> @@ -322,6 +339,7 @@ rte_distributor_process(struct rte_distributor *d,
>  			else {
>  				d->bufs[wkr].bufptr64 = next_value;
>  				d->in_flight_tags[wkr] = new_tag;
> +				d->in_flight_bitmask |= (1UL << wkr);
>  				next_mb = NULL;
>  			}
>  			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> @@ -379,11 +397,12 @@ rte_distributor_returned_pkts(struct rte_distributor *d,
>  static inline unsigned
>  total_outstanding(const struct rte_distributor *d)
>  {
> -	unsigned wkr, total_outstanding = 0;
> +	unsigned wkr, total_outstanding;
>  
> +	total_outstanding = __builtin_popcountl(d->in_flight_bitmask);
>  	for (wkr = 0; wkr < d->num_workers; wkr++)
> -		total_outstanding += d->backlog[wkr].count +
> -				!!(d->in_flight_tags[wkr]);
> +		total_outstanding += d->backlog[wkr].count;
> +
>  	return total_outstanding;
>  }
>  
> diff --git a/lib/librte_distributor/rte_distributor.h b/lib/librte_distributor/rte_distributor.h
> index ec0d74a..a29c506 100644
> --- a/lib/librte_distributor/rte_distributor.h
> +++ b/lib/librte_distributor/rte_distributor.h
> @@ -87,6 +87,9 @@ rte_distributor_create(const char *name, unsigned socket_id,
>   * Process a set of packets by distributing them among workers that request
>   * packets. The distributor will ensure that no two packets that have the
>   * same flow id, or tag, in the mbuf will be procesed at the same time.
> + * The user is advocated to set tag for each mbuf. If user doesn't set the
> + * tag, the tag value can be various values depending on driver implementation
> + * and configuration.
>   *
>   * This is not multi-thread safe and should only be called on a single lcore.
>   *
> diff --git a/lib/librte_mbuf/rte_mbuf.h b/lib/librte_mbuf/rte_mbuf.h
> index e8f9bfc..f5f8658 100644
> --- a/lib/librte_mbuf/rte_mbuf.h
> +++ b/lib/librte_mbuf/rte_mbuf.h
> @@ -185,6 +185,7 @@ struct rte_mbuf {
>  			uint16_t id;
>  		} fdir;           /**< Filter identifier if FDIR enabled */
>  		uint32_t sched;   /**< Hierarchical scheduler */
> +		uint32_t usr;	  /**< User defined tags. See @rte_distributor_process */
>  	} hash;                   /**< hash information */
>  
>  	/* second cache line - fields only used in slow path or on TX */
> -- 
> 1.7.1
>
Qinglai Xiao Nov. 10, 2014, 12:13 p.m. UTC | #2
OK thx Bruce, I will make 2 commits with one cover-letter.

On Mon, Nov 10, 2014 at 1:09 PM, Bruce Richardson <
bruce.richardson@intel.com> wrote:

> On Mon, Nov 10, 2014 at 12:38:32PM +0200, Qinglai Xiao wrote:
> > User application is advocated to set the newly introduced union field
> > mbuf->hash.usr as flow id, which is uint32_t.
> > With introduction of in_flight_bitmask, the whole 32 bits of tag can
> > be used.
> >
> > Further more, this patch fixed the integer overflow when finding the
> > matched tags.
> >
> > Note that currently librte_distributor supports up to 64 worker
> > threads. If more workers are needed, the size of in_flight_bitmask and
> > the algorithm of finding matched tag must be revised.
> >
> > Signed-off-by: Qinglai Xiao <jigsaw@gmail.com>
>
> Hi,
>
> this would probably be better as two patches rather than one. One patch to
> add the hash.usr field, and then use it in the distributor. The change to
> the distributor to add the bit mask to allow all bits of the tag to be used
> should then go as a separate patch.
>
> Regards,
> /Bruce
>
>
> > ---
> >  app/test/test_distributor.c              |   18 ++++++------
> >  app/test/test_distributor_perf.c         |    4 +-
> >  lib/librte_distributor/rte_distributor.c |   45
> +++++++++++++++++++++--------
> >  lib/librte_distributor/rte_distributor.h |    3 ++
> >  lib/librte_mbuf/rte_mbuf.h               |    1 +
> >  5 files changed, 47 insertions(+), 24 deletions(-)
> >
> > diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
> > index ce06436..9e8c06d 100644
> > --- a/app/test/test_distributor.c
> > +++ b/app/test/test_distributor.c
> > @@ -120,7 +120,7 @@ sanity_test(struct rte_distributor *d, struct
> rte_mempool *p)
> >       /* now set all hash values in all buffers to zero, so all pkts go
> to the
> >        * one worker thread */
> >       for (i = 0; i < BURST; i++)
> > -             bufs[i]->hash.rss = 0;
> > +             bufs[i]->hash.usr = 0;
> >
> >       rte_distributor_process(d, bufs, BURST);
> >       rte_distributor_flush(d);
> > @@ -142,7 +142,7 @@ sanity_test(struct rte_distributor *d, struct
> rte_mempool *p)
> >       if (rte_lcore_count() >= 3) {
> >               clear_packet_count();
> >               for (i = 0; i < BURST; i++)
> > -                     bufs[i]->hash.rss = (i & 1) << 8;
> > +                     bufs[i]->hash.usr = (i & 1) << 8;
> >
> >               rte_distributor_process(d, bufs, BURST);
> >               rte_distributor_flush(d);
> > @@ -167,7 +167,7 @@ sanity_test(struct rte_distributor *d, struct
> rte_mempool *p)
> >        * so load gets distributed */
> >       clear_packet_count();
> >       for (i = 0; i < BURST; i++)
> > -             bufs[i]->hash.rss = i;
> > +             bufs[i]->hash.usr = i;
> >
> >       rte_distributor_process(d, bufs, BURST);
> >       rte_distributor_flush(d);
> > @@ -199,7 +199,7 @@ sanity_test(struct rte_distributor *d, struct
> rte_mempool *p)
> >               return -1;
> >       }
> >       for (i = 0; i < BIG_BATCH; i++)
> > -             many_bufs[i]->hash.rss = i << 2;
> > +             many_bufs[i]->hash.usr = i << 2;
> >
> >       for (i = 0; i < BIG_BATCH/BURST; i++) {
> >               rte_distributor_process(d, &many_bufs[i*BURST], BURST);
> > @@ -280,7 +280,7 @@ sanity_test_with_mbuf_alloc(struct rte_distributor
> *d, struct rte_mempool *p)
> >               while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
> >                       rte_distributor_process(d, NULL, 0);
> >               for (j = 0; j < BURST; j++) {
> > -                     bufs[j]->hash.rss = (i+j) << 1;
> > +                     bufs[j]->hash.usr = (i+j) << 1;
> >                       rte_mbuf_refcnt_set(bufs[j], 1);
> >               }
> >
> > @@ -359,7 +359,7 @@ sanity_test_with_worker_shutdown(struct
> rte_distributor *d,
> >       /* now set all hash values in all buffers to zero, so all pkts go
> to the
> >        * one worker thread */
> >       for (i = 0; i < BURST; i++)
> > -             bufs[i]->hash.rss = 0;
> > +             bufs[i]->hash.usr = 0;
> >
> >       rte_distributor_process(d, bufs, BURST);
> >       /* at this point, we will have processed some packets and have a
> full
> > @@ -372,7 +372,7 @@ sanity_test_with_worker_shutdown(struct
> rte_distributor *d,
> >               return -1;
> >       }
> >       for (i = 0; i < BURST; i++)
> > -             bufs[i]->hash.rss = 0;
> > +             bufs[i]->hash.usr = 0;
> >
> >       /* get worker zero to quit */
> >       zero_quit = 1;
> > @@ -416,7 +416,7 @@ test_flush_with_worker_shutdown(struct
> rte_distributor *d,
> >       /* now set all hash values in all buffers to zero, so all pkts go
> to the
> >        * one worker thread */
> >       for (i = 0; i < BURST; i++)
> > -             bufs[i]->hash.rss = 0;
> > +             bufs[i]->hash.usr = 0;
> >
> >       rte_distributor_process(d, bufs, BURST);
> >       /* at this point, we will have processed some packets and have a
> full
> > @@ -488,7 +488,7 @@ quit_workers(struct rte_distributor *d, struct
> rte_mempool *p)
> >       zero_quit = 0;
> >       quit = 1;
> >       for (i = 0; i < num_workers; i++)
> > -             bufs[i]->hash.rss = i << 1;
> > +             bufs[i]->hash.usr = i << 1;
> >       rte_distributor_process(d, bufs, num_workers);
> >
> >       rte_mempool_put_bulk(p, (void *)bufs, num_workers);
> > diff --git a/app/test/test_distributor_perf.c
> b/app/test/test_distributor_perf.c
> > index b04864c..48ee344 100644
> > --- a/app/test/test_distributor_perf.c
> > +++ b/app/test/test_distributor_perf.c
> > @@ -159,7 +159,7 @@ perf_test(struct rte_distributor *d, struct
> rte_mempool *p)
> >       }
> >       /* ensure we have different hash value for each pkt */
> >       for (i = 0; i < BURST; i++)
> > -             bufs[i]->hash.rss = i;
> > +             bufs[i]->hash.usr = i;
> >
> >       start = rte_rdtsc();
> >       for (i = 0; i < (1<<ITER_POWER); i++)
> > @@ -198,7 +198,7 @@ quit_workers(struct rte_distributor *d, struct
> rte_mempool *p)
> >
> >       quit = 1;
> >       for (i = 0; i < num_workers; i++)
> > -             bufs[i]->hash.rss = i << 1;
> > +             bufs[i]->hash.usr = i << 1;
> >       rte_distributor_process(d, bufs, num_workers);
> >
> >       rte_mempool_put_bulk(p, (void *)bufs, num_workers);
> > diff --git a/lib/librte_distributor/rte_distributor.c
> b/lib/librte_distributor/rte_distributor.c
> > index 656ee5c..a84ea97 100644
> > --- a/lib/librte_distributor/rte_distributor.c
> > +++ b/lib/librte_distributor/rte_distributor.c
> > @@ -92,7 +92,13 @@ struct rte_distributor {
> >       unsigned num_workers;                 /**< Number of workers
> polling */
> >
> >       uint32_t in_flight_tags[RTE_MAX_LCORE];
> > -             /**< Tracks the tag being processed per core, 0 == no pkt
> */
> > +             /**< Tracks the tag being processed per core */
> > +     uint64_t in_flight_bitmask;
> > +             /**< on/off bits for in-flight tags.
> > +              * Note that if RTE_MAX_LCORE is larger than 64 then
> > +              * the bitmask has to expand.
> > +              */
> > +
> >       struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
> >
> >       union rte_distributor_buffer bufs[RTE_MAX_LCORE];
> > @@ -189,6 +195,7 @@ static inline void
> >  handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
> >  {
> >       d->in_flight_tags[wkr] = 0;
> > +     d->in_flight_bitmask &= ~(1UL << wkr);
> >       d->bufs[wkr].bufptr64 = 0;
> >       if (unlikely(d->backlog[wkr].count != 0)) {
> >               /* On return of a packet, we need to move the
> > @@ -211,7 +218,10 @@ handle_worker_shutdown(struct rte_distributor *d,
> unsigned wkr)
> >                       pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
> >                                       RTE_DISTRIB_FLAG_BITS));
> >               }
> > -             /* recursive call */
> > +             /* recursive call.
> > +              * Note that the tags were set before first level call
> > +              * to rte_distributor_process.
> > +              */
> >               rte_distributor_process(d, pkts, i);
> >               bl->count = bl->start = 0;
> >       }
> > @@ -242,6 +252,7 @@ process_returns(struct rte_distributor *d)
> >                       else {
> >                               d->bufs[wkr].bufptr64 =
> RTE_DISTRIB_GET_BUF;
> >                               d->in_flight_tags[wkr] = 0;
> > +                             d->in_flight_bitmask &= ~(1UL << wkr);
> >                       }
> >                       oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> >               } else if (data & RTE_DISTRIB_RETURN_BUF) {
> > @@ -284,17 +295,21 @@ rte_distributor_process(struct rte_distributor *d,
> >                       next_value = (((int64_t)(uintptr_t)next_mb)
> >                                       << RTE_DISTRIB_FLAG_BITS);
> >                       /*
> > -                      * Set the low bit on the tag, so we can guarantee
> that
> > -                      * we never store a tag value of zero. That means
> we can
> > -                      * use the zero-value to indicate that no packet is
> > -                      * being processed by a worker.
> > +                      * User is advocated to set tag vaue for each
> > +                      * mbuf before calling rte_distributor_process.
> > +                      * User defined tags are used to identify flows,
> > +                      * or sessions.
> >                        */
> > -                     new_tag = (next_mb->hash.rss | 1);
> > +                     new_tag = next_mb->hash.usr;
> >
> > -                     uint32_t match = 0;
> > +                     /*
> > +                      * Note that if RTE_MAX_LCORE is larger than 64
> then
> > +                      * the size of match has to be expanded.
> > +                      */
> > +                     uint64_t match = 0;
> >                       unsigned i;
> >                       /*
> > -                      * to scan for a match use "xor" and "not" to get
> a 0/1
> > +                      * To scan for a match use "xor" and "not" to get
> a 0/1
> >                        * value, then use shifting to merge to single
> "match"
> >                        * variable, where a one-bit indicates a match for
> the
> >                        * worker given by the bit-position
> > @@ -303,9 +318,11 @@ rte_distributor_process(struct rte_distributor *d,
> >                               match |= (!(d->in_flight_tags[i] ^ new_tag)
> >                                       << i);
> >
> > +                     /* Only turned-on bits are considered as match */
> > +                     match &= d->in_flight_bitmask;
> >                       if (match) {
> >                               next_mb = NULL;
> > -                             unsigned worker = __builtin_ctz(match);
> > +                             unsigned worker = __builtin_ctzl(match);
> >                               if (add_to_backlog(&d->backlog[worker],
> >                                               next_value) < 0)
> >                                       next_idx--;
> > @@ -322,6 +339,7 @@ rte_distributor_process(struct rte_distributor *d,
> >                       else {
> >                               d->bufs[wkr].bufptr64 = next_value;
> >                               d->in_flight_tags[wkr] = new_tag;
> > +                             d->in_flight_bitmask |= (1UL << wkr);
> >                               next_mb = NULL;
> >                       }
> >                       oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> > @@ -379,11 +397,12 @@ rte_distributor_returned_pkts(struct
> rte_distributor *d,
> >  static inline unsigned
> >  total_outstanding(const struct rte_distributor *d)
> >  {
> > -     unsigned wkr, total_outstanding = 0;
> > +     unsigned wkr, total_outstanding;
> >
> > +     total_outstanding = __builtin_popcountl(d->in_flight_bitmask);
> >       for (wkr = 0; wkr < d->num_workers; wkr++)
> > -             total_outstanding += d->backlog[wkr].count +
> > -                             !!(d->in_flight_tags[wkr]);
> > +             total_outstanding += d->backlog[wkr].count;
> > +
> >       return total_outstanding;
> >  }
> >
> > diff --git a/lib/librte_distributor/rte_distributor.h
> b/lib/librte_distributor/rte_distributor.h
> > index ec0d74a..a29c506 100644
> > --- a/lib/librte_distributor/rte_distributor.h
> > +++ b/lib/librte_distributor/rte_distributor.h
> > @@ -87,6 +87,9 @@ rte_distributor_create(const char *name, unsigned
> socket_id,
> >   * Process a set of packets by distributing them among workers that
> request
> >   * packets. The distributor will ensure that no two packets that have
> the
> >   * same flow id, or tag, in the mbuf will be procesed at the same time.
> > + * The user is advocated to set tag for each mbuf. If user doesn't set
> the
> > + * tag, the tag value can be various values depending on driver
> implementation
> > + * and configuration.
> >   *
> >   * This is not multi-thread safe and should only be called on a single
> lcore.
> >   *
> > diff --git a/lib/librte_mbuf/rte_mbuf.h b/lib/librte_mbuf/rte_mbuf.h
> > index e8f9bfc..f5f8658 100644
> > --- a/lib/librte_mbuf/rte_mbuf.h
> > +++ b/lib/librte_mbuf/rte_mbuf.h
> > @@ -185,6 +185,7 @@ struct rte_mbuf {
> >                       uint16_t id;
> >               } fdir;           /**< Filter identifier if FDIR enabled */
> >               uint32_t sched;   /**< Hierarchical scheduler */
> > +             uint32_t usr;     /**< User defined tags. See
> @rte_distributor_process */
> >       } hash;                   /**< hash information */
> >
> >       /* second cache line - fields only used in slow path or on TX */
> > --
> > 1.7.1
> >
>
diff mbox

Patch

diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
index ce06436..9e8c06d 100644
--- a/app/test/test_distributor.c
+++ b/app/test/test_distributor.c
@@ -120,7 +120,7 @@  sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 	/* now set all hash values in all buffers to zero, so all pkts go to the
 	 * one worker thread */
 	for (i = 0; i < BURST; i++)
-		bufs[i]->hash.rss = 0;
+		bufs[i]->hash.usr = 0;
 
 	rte_distributor_process(d, bufs, BURST);
 	rte_distributor_flush(d);
@@ -142,7 +142,7 @@  sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 	if (rte_lcore_count() >= 3) {
 		clear_packet_count();
 		for (i = 0; i < BURST; i++)
-			bufs[i]->hash.rss = (i & 1) << 8;
+			bufs[i]->hash.usr = (i & 1) << 8;
 
 		rte_distributor_process(d, bufs, BURST);
 		rte_distributor_flush(d);
@@ -167,7 +167,7 @@  sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 	 * so load gets distributed */
 	clear_packet_count();
 	for (i = 0; i < BURST; i++)
-		bufs[i]->hash.rss = i;
+		bufs[i]->hash.usr = i;
 
 	rte_distributor_process(d, bufs, BURST);
 	rte_distributor_flush(d);
@@ -199,7 +199,7 @@  sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 		return -1;
 	}
 	for (i = 0; i < BIG_BATCH; i++)
-		many_bufs[i]->hash.rss = i << 2;
+		many_bufs[i]->hash.usr = i << 2;
 
 	for (i = 0; i < BIG_BATCH/BURST; i++) {
 		rte_distributor_process(d, &many_bufs[i*BURST], BURST);
@@ -280,7 +280,7 @@  sanity_test_with_mbuf_alloc(struct rte_distributor *d, struct rte_mempool *p)
 		while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
 			rte_distributor_process(d, NULL, 0);
 		for (j = 0; j < BURST; j++) {
-			bufs[j]->hash.rss = (i+j) << 1;
+			bufs[j]->hash.usr = (i+j) << 1;
 			rte_mbuf_refcnt_set(bufs[j], 1);
 		}
 
@@ -359,7 +359,7 @@  sanity_test_with_worker_shutdown(struct rte_distributor *d,
 	/* now set all hash values in all buffers to zero, so all pkts go to the
 	 * one worker thread */
 	for (i = 0; i < BURST; i++)
-		bufs[i]->hash.rss = 0;
+		bufs[i]->hash.usr = 0;
 
 	rte_distributor_process(d, bufs, BURST);
 	/* at this point, we will have processed some packets and have a full
@@ -372,7 +372,7 @@  sanity_test_with_worker_shutdown(struct rte_distributor *d,
 		return -1;
 	}
 	for (i = 0; i < BURST; i++)
-		bufs[i]->hash.rss = 0;
+		bufs[i]->hash.usr = 0;
 
 	/* get worker zero to quit */
 	zero_quit = 1;
@@ -416,7 +416,7 @@  test_flush_with_worker_shutdown(struct rte_distributor *d,
 	/* now set all hash values in all buffers to zero, so all pkts go to the
 	 * one worker thread */
 	for (i = 0; i < BURST; i++)
-		bufs[i]->hash.rss = 0;
+		bufs[i]->hash.usr = 0;
 
 	rte_distributor_process(d, bufs, BURST);
 	/* at this point, we will have processed some packets and have a full
@@ -488,7 +488,7 @@  quit_workers(struct rte_distributor *d, struct rte_mempool *p)
 	zero_quit = 0;
 	quit = 1;
 	for (i = 0; i < num_workers; i++)
-		bufs[i]->hash.rss = i << 1;
+		bufs[i]->hash.usr = i << 1;
 	rte_distributor_process(d, bufs, num_workers);
 
 	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
diff --git a/app/test/test_distributor_perf.c b/app/test/test_distributor_perf.c
index b04864c..48ee344 100644
--- a/app/test/test_distributor_perf.c
+++ b/app/test/test_distributor_perf.c
@@ -159,7 +159,7 @@  perf_test(struct rte_distributor *d, struct rte_mempool *p)
 	}
 	/* ensure we have different hash value for each pkt */
 	for (i = 0; i < BURST; i++)
-		bufs[i]->hash.rss = i;
+		bufs[i]->hash.usr = i;
 
 	start = rte_rdtsc();
 	for (i = 0; i < (1<<ITER_POWER); i++)
@@ -198,7 +198,7 @@  quit_workers(struct rte_distributor *d, struct rte_mempool *p)
 
 	quit = 1;
 	for (i = 0; i < num_workers; i++)
-		bufs[i]->hash.rss = i << 1;
+		bufs[i]->hash.usr = i << 1;
 	rte_distributor_process(d, bufs, num_workers);
 
 	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
index 656ee5c..a84ea97 100644
--- a/lib/librte_distributor/rte_distributor.c
+++ b/lib/librte_distributor/rte_distributor.c
@@ -92,7 +92,13 @@  struct rte_distributor {
 	unsigned num_workers;                 /**< Number of workers polling */
 
 	uint32_t in_flight_tags[RTE_MAX_LCORE];
-		/**< Tracks the tag being processed per core, 0 == no pkt */
+		/**< Tracks the tag being processed per core */
+	uint64_t in_flight_bitmask;
+		/**< on/off bits for in-flight tags.
+		 * Note that if RTE_MAX_LCORE is larger than 64 then
+		 * the bitmask has to expand.
+		 */
+
 	struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
 
 	union rte_distributor_buffer bufs[RTE_MAX_LCORE];
@@ -189,6 +195,7 @@  static inline void
 handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
 {
 	d->in_flight_tags[wkr] = 0;
+	d->in_flight_bitmask &= ~(1UL << wkr);
 	d->bufs[wkr].bufptr64 = 0;
 	if (unlikely(d->backlog[wkr].count != 0)) {
 		/* On return of a packet, we need to move the
@@ -211,7 +218,10 @@  handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
 			pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
 					RTE_DISTRIB_FLAG_BITS));
 		}
-		/* recursive call */
+		/* recursive call.
+		 * Note that the tags were set before first level call
+		 * to rte_distributor_process.
+		 */
 		rte_distributor_process(d, pkts, i);
 		bl->count = bl->start = 0;
 	}
@@ -242,6 +252,7 @@  process_returns(struct rte_distributor *d)
 			else {
 				d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF;
 				d->in_flight_tags[wkr] = 0;
+				d->in_flight_bitmask &= ~(1UL << wkr);
 			}
 			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
 		} else if (data & RTE_DISTRIB_RETURN_BUF) {
@@ -284,17 +295,21 @@  rte_distributor_process(struct rte_distributor *d,
 			next_value = (((int64_t)(uintptr_t)next_mb)
 					<< RTE_DISTRIB_FLAG_BITS);
 			/*
-			 * Set the low bit on the tag, so we can guarantee that
-			 * we never store a tag value of zero. That means we can
-			 * use the zero-value to indicate that no packet is
-			 * being processed by a worker.
+			 * User is advocated to set tag vaue for each
+			 * mbuf before calling rte_distributor_process.
+			 * User defined tags are used to identify flows,
+			 * or sessions.
 			 */
-			new_tag = (next_mb->hash.rss | 1);
+			new_tag = next_mb->hash.usr;
 
-			uint32_t match = 0;
+			/*
+			 * Note that if RTE_MAX_LCORE is larger than 64 then
+			 * the size of match has to be expanded.
+			 */
+			uint64_t match = 0;
 			unsigned i;
 			/*
-			 * to scan for a match use "xor" and "not" to get a 0/1
+			 * To scan for a match use "xor" and "not" to get a 0/1
 			 * value, then use shifting to merge to single "match"
 			 * variable, where a one-bit indicates a match for the
 			 * worker given by the bit-position
@@ -303,9 +318,11 @@  rte_distributor_process(struct rte_distributor *d,
 				match |= (!(d->in_flight_tags[i] ^ new_tag)
 					<< i);
 
+			/* Only turned-on bits are considered as match */
+			match &= d->in_flight_bitmask;
 			if (match) {
 				next_mb = NULL;
-				unsigned worker = __builtin_ctz(match);
+				unsigned worker = __builtin_ctzl(match);
 				if (add_to_backlog(&d->backlog[worker],
 						next_value) < 0)
 					next_idx--;
@@ -322,6 +339,7 @@  rte_distributor_process(struct rte_distributor *d,
 			else {
 				d->bufs[wkr].bufptr64 = next_value;
 				d->in_flight_tags[wkr] = new_tag;
+				d->in_flight_bitmask |= (1UL << wkr);
 				next_mb = NULL;
 			}
 			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
@@ -379,11 +397,12 @@  rte_distributor_returned_pkts(struct rte_distributor *d,
 static inline unsigned
 total_outstanding(const struct rte_distributor *d)
 {
-	unsigned wkr, total_outstanding = 0;
+	unsigned wkr, total_outstanding;
 
+	total_outstanding = __builtin_popcountl(d->in_flight_bitmask);
 	for (wkr = 0; wkr < d->num_workers; wkr++)
-		total_outstanding += d->backlog[wkr].count +
-				!!(d->in_flight_tags[wkr]);
+		total_outstanding += d->backlog[wkr].count;
+
 	return total_outstanding;
 }
 
diff --git a/lib/librte_distributor/rte_distributor.h b/lib/librte_distributor/rte_distributor.h
index ec0d74a..a29c506 100644
--- a/lib/librte_distributor/rte_distributor.h
+++ b/lib/librte_distributor/rte_distributor.h
@@ -87,6 +87,9 @@  rte_distributor_create(const char *name, unsigned socket_id,
  * Process a set of packets by distributing them among workers that request
  * packets. The distributor will ensure that no two packets that have the
  * same flow id, or tag, in the mbuf will be procesed at the same time.
+ * The user is advocated to set tag for each mbuf. If user doesn't set the
+ * tag, the tag value can be various values depending on driver implementation
+ * and configuration.
  *
  * This is not multi-thread safe and should only be called on a single lcore.
  *
diff --git a/lib/librte_mbuf/rte_mbuf.h b/lib/librte_mbuf/rte_mbuf.h
index e8f9bfc..f5f8658 100644
--- a/lib/librte_mbuf/rte_mbuf.h
+++ b/lib/librte_mbuf/rte_mbuf.h
@@ -185,6 +185,7 @@  struct rte_mbuf {
 			uint16_t id;
 		} fdir;           /**< Filter identifier if FDIR enabled */
 		uint32_t sched;   /**< Hierarchical scheduler */
+		uint32_t usr;	  /**< User defined tags. See @rte_distributor_process */
 	} hash;                   /**< hash information */
 
 	/* second cache line - fields only used in slow path or on TX */