[dpdk-dev] Add user defined tag calculation callback to librte_distributor.

Message ID 1415194237-1219-1-git-send-email-jigsaw@gmail.com (mailing list archive)
State RFC, archived
Headers

Commit Message

Qinglai Xiao Nov. 5, 2014, 1:30 p.m. UTC
  User defined tag calculation has access to mbuf.
Default tag is RSS hash result.

Signed-off-by: Qinglai Xiao <jigsaw@gmail.com>
---
 app/test/test_distributor.c              |    6 +++---
 app/test/test_distributor_perf.c         |    2 +-
 lib/librte_distributor/rte_distributor.c |   12 ++++++++++--
 lib/librte_distributor/rte_distributor.h |    7 ++++++-
 4 files changed, 20 insertions(+), 7 deletions(-)
  

Comments

Bruce Richardson Nov. 5, 2014, 2:27 p.m. UTC | #1
On Wed, Nov 05, 2014 at 03:30:37PM +0200, Qinglai Xiao wrote:
> User defined tag calculation has access to mbuf.
> Default tag is RSS hash result.
> 

Interesting idea.
Did you investigate was there any performance improvement or regression comparing 
whether the callback was called per-packet as packets were dequeued for distribution
(i.e. how you have things now in your patch), compared to calling
the callback in a loop to extract the tags for all packets initially? I suspect
there probably isn't much performance difference either way, but it may be worth
checking.
One other point, is that I think the callback to extract the tag should have
additional parameters - at least one, if not two. I would suggest that the
distributor pointer be passed in, as well as an arbitrary void * pointer.

Regards,
/Bruce

> Signed-off-by: Qinglai Xiao <jigsaw@gmail.com>
> ---
>  app/test/test_distributor.c              |    6 +++---
>  app/test/test_distributor_perf.c         |    2 +-
>  lib/librte_distributor/rte_distributor.c |   12 ++++++++++--
>  lib/librte_distributor/rte_distributor.h |    7 ++++++-
>  4 files changed, 20 insertions(+), 7 deletions(-)
> 
> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
> index ce06436..6ea4943 100644
> --- a/app/test/test_distributor.c
> +++ b/app/test/test_distributor.c
> @@ -452,7 +452,7 @@ int test_error_distributor_create_name(void)
>  	char *name = NULL;
>  
>  	d = rte_distributor_create(name, rte_socket_id(),
> -			rte_lcore_count() - 1);
> +			rte_lcore_count() - 1, NULL);
>  	if (d != NULL || rte_errno != EINVAL) {
>  		printf("ERROR: No error on create() with NULL name param\n");
>  		return -1;
> @@ -467,7 +467,7 @@ int test_error_distributor_create_numworkers(void)
>  {
>  	struct rte_distributor *d = NULL;
>  	d = rte_distributor_create("test_numworkers", rte_socket_id(),
> -			RTE_MAX_LCORE + 10);
> +			RTE_MAX_LCORE + 10, NULL);
>  	if (d != NULL || rte_errno != EINVAL) {
>  		printf("ERROR: No error on create() with num_workers > MAX\n");
>  		return -1;
> @@ -515,7 +515,7 @@ test_distributor(void)
>  
>  	if (d == NULL) {
>  		d = rte_distributor_create("Test_distributor", rte_socket_id(),
> -				rte_lcore_count() - 1);
> +				rte_lcore_count() - 1, NULL);
>  		if (d == NULL) {
>  			printf("Error creating distributor\n");
>  			return -1;
> diff --git a/app/test/test_distributor_perf.c b/app/test/test_distributor_perf.c
> index b04864c..507e446 100644
> --- a/app/test/test_distributor_perf.c
> +++ b/app/test/test_distributor_perf.c
> @@ -227,7 +227,7 @@ test_distributor_perf(void)
>  
>  	if (d == NULL) {
>  		d = rte_distributor_create("Test_perf", rte_socket_id(),
> -				rte_lcore_count() - 1);
> +				rte_lcore_count() - 1, NULL);
>  		if (d == NULL) {
>  			printf("Error creating distributor\n");
>  			return -1;
> diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
> index 585ff88..78c92bd 100644
> --- a/lib/librte_distributor/rte_distributor.c
> +++ b/lib/librte_distributor/rte_distributor.c
> @@ -97,6 +97,7 @@ struct rte_distributor {
>  	union rte_distributor_buffer bufs[RTE_MAX_LCORE];
>  
>  	struct rte_distributor_returned_pkts returns;
> +	rte_distributor_tag_fn tag_cb;
>  };
>  
>  TAILQ_HEAD(rte_distributor_list, rte_distributor);
> @@ -267,6 +268,7 @@ rte_distributor_process(struct rte_distributor *d,
>  	struct rte_mbuf *next_mb = NULL;
>  	int64_t next_value = 0;
>  	uint32_t new_tag = 0;
> +	rte_distributor_tag_fn tag_cb = d->tag_cb;
>  	unsigned ret_start = d->returns.start,
>  			ret_count = d->returns.count;
>  
> @@ -282,7 +284,11 @@ rte_distributor_process(struct rte_distributor *d,
>  			next_mb = mbufs[next_idx++];
>  			next_value = (((int64_t)(uintptr_t)next_mb)
>  					<< RTE_DISTRIB_FLAG_BITS);
> -			new_tag = (next_mb->hash.rss | 1);
> +			if (tag_cb) {
> +				new_tag = tag_cb(next_mb);
> +			} else {
> +				new_tag = (next_mb->hash.rss | 1);
> +			}
>  
>  			uint32_t match = 0;
>  			unsigned i;
> @@ -401,7 +407,8 @@ rte_distributor_clear_returns(struct rte_distributor *d)
>  struct rte_distributor *
>  rte_distributor_create(const char *name,
>  		unsigned socket_id,
> -		unsigned num_workers)
> +		unsigned num_workers,
> +		rte_distributor_tag_fn tag_cb)
>  {
>  	struct rte_distributor *d;
>  	struct rte_distributor_list *distributor_list;
> @@ -435,6 +442,7 @@ rte_distributor_create(const char *name,
>  	d = mz->addr;
>  	snprintf(d->name, sizeof(d->name), "%s", name);
>  	d->num_workers = num_workers;
> +	d->tag_cb = tag_cb;
>  
>  	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
>  	TAILQ_INSERT_TAIL(distributor_list, d, next);
> diff --git a/lib/librte_distributor/rte_distributor.h b/lib/librte_distributor/rte_distributor.h
> index ec0d74a..844d325 100644
> --- a/lib/librte_distributor/rte_distributor.h
> +++ b/lib/librte_distributor/rte_distributor.h
> @@ -52,6 +52,9 @@ extern "C" {
>  
>  struct rte_distributor;
>  
> +typedef uint32_t (*rte_distributor_tag_fn)(struct rte_mbuf *);
> +/**< User defined tag calculation function */
> +
>  /**
>   * Function to create a new distributor instance
>   *
> @@ -65,12 +68,14 @@ struct rte_distributor;
>   * @param num_workers
>   *   The maximum number of workers that will request packets from this
>   *   distributor
> + * @param tag_cb
> + *   The callback function for calculation of user defined tag.
>   * @return
>   *   The newly created distributor instance
>   */
>  struct rte_distributor *
>  rte_distributor_create(const char *name, unsigned socket_id,
> -		unsigned num_workers);
> +		unsigned num_workers, rte_distributor_tag_fn tag_cb);
>  
>  /*  *** APIS to be called on the distributor lcore ***  */
>  /*
> -- 
> 1.7.1
>
  
Ananyev, Konstantin Nov. 5, 2014, 3:13 p.m. UTC | #2
> -----Original Message-----
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Bruce Richardson
> Sent: Wednesday, November 05, 2014 2:28 PM
> To: Qinglai Xiao
> Cc: dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH] Add user defined tag calculation callback to librte_distributor.
> 
> On Wed, Nov 05, 2014 at 03:30:37PM +0200, Qinglai Xiao wrote:
> > User defined tag calculation has access to mbuf.
> > Default tag is RSS hash result.
> >
> 
> Interesting idea.
> Did you investigate was there any performance improvement or regression comparing
> whether the callback was called per-packet as packets were dequeued for distribution
> (i.e. how you have things now in your patch), compared to calling
> the callback in a loop to extract the tags for all packets initially? I suspect
> there probably isn't much performance difference either way, but it may be worth
> checking.
> One other point, is that I think the callback to extract the tag should have
> additional parameters - at least one, if not two. I would suggest that the
> distributor pointer be passed in, as well as an arbitrary void * pointer.


Just wonder, why do you need a call-back?
Why not just make rte_distributor_process() to accept an extra parameter: array of tags?

rte_distributor_process(struct rte_distributor *d,
                struct rte_mbuf **mbufs, uint32_t *mbuf_tags, unsigned num_mbufs)

?


> 
> Regards,
> /Bruce
> 
> > Signed-off-by: Qinglai Xiao <jigsaw@gmail.com>
> > ---
> >  app/test/test_distributor.c              |    6 +++---
> >  app/test/test_distributor_perf.c         |    2 +-
> >  lib/librte_distributor/rte_distributor.c |   12 ++++++++++--
> >  lib/librte_distributor/rte_distributor.h |    7 ++++++-
> >  4 files changed, 20 insertions(+), 7 deletions(-)
> >
> > diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
> > index ce06436..6ea4943 100644
> > --- a/app/test/test_distributor.c
> > +++ b/app/test/test_distributor.c
> > @@ -452,7 +452,7 @@ int test_error_distributor_create_name(void)
> >  	char *name = NULL;
> >
> >  	d = rte_distributor_create(name, rte_socket_id(),
> > -			rte_lcore_count() - 1);
> > +			rte_lcore_count() - 1, NULL);
> >  	if (d != NULL || rte_errno != EINVAL) {
> >  		printf("ERROR: No error on create() with NULL name param\n");
> >  		return -1;
> > @@ -467,7 +467,7 @@ int test_error_distributor_create_numworkers(void)
> >  {
> >  	struct rte_distributor *d = NULL;
> >  	d = rte_distributor_create("test_numworkers", rte_socket_id(),
> > -			RTE_MAX_LCORE + 10);
> > +			RTE_MAX_LCORE + 10, NULL);
> >  	if (d != NULL || rte_errno != EINVAL) {
> >  		printf("ERROR: No error on create() with num_workers > MAX\n");
> >  		return -1;
> > @@ -515,7 +515,7 @@ test_distributor(void)
> >
> >  	if (d == NULL) {
> >  		d = rte_distributor_create("Test_distributor", rte_socket_id(),
> > -				rte_lcore_count() - 1);
> > +				rte_lcore_count() - 1, NULL);
> >  		if (d == NULL) {
> >  			printf("Error creating distributor\n");
> >  			return -1;
> > diff --git a/app/test/test_distributor_perf.c b/app/test/test_distributor_perf.c
> > index b04864c..507e446 100644
> > --- a/app/test/test_distributor_perf.c
> > +++ b/app/test/test_distributor_perf.c
> > @@ -227,7 +227,7 @@ test_distributor_perf(void)
> >
> >  	if (d == NULL) {
> >  		d = rte_distributor_create("Test_perf", rte_socket_id(),
> > -				rte_lcore_count() - 1);
> > +				rte_lcore_count() - 1, NULL);
> >  		if (d == NULL) {
> >  			printf("Error creating distributor\n");
> >  			return -1;
> > diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
> > index 585ff88..78c92bd 100644
> > --- a/lib/librte_distributor/rte_distributor.c
> > +++ b/lib/librte_distributor/rte_distributor.c
> > @@ -97,6 +97,7 @@ struct rte_distributor {
> >  	union rte_distributor_buffer bufs[RTE_MAX_LCORE];
> >
> >  	struct rte_distributor_returned_pkts returns;
> > +	rte_distributor_tag_fn tag_cb;
> >  };
> >
> >  TAILQ_HEAD(rte_distributor_list, rte_distributor);
> > @@ -267,6 +268,7 @@ rte_distributor_process(struct rte_distributor *d,
> >  	struct rte_mbuf *next_mb = NULL;
> >  	int64_t next_value = 0;
> >  	uint32_t new_tag = 0;
> > +	rte_distributor_tag_fn tag_cb = d->tag_cb;
> >  	unsigned ret_start = d->returns.start,
> >  			ret_count = d->returns.count;
> >
> > @@ -282,7 +284,11 @@ rte_distributor_process(struct rte_distributor *d,
> >  			next_mb = mbufs[next_idx++];
> >  			next_value = (((int64_t)(uintptr_t)next_mb)
> >  					<< RTE_DISTRIB_FLAG_BITS);
> > -			new_tag = (next_mb->hash.rss | 1);
> > +			if (tag_cb) {
> > +				new_tag = tag_cb(next_mb);
> > +			} else {
> > +				new_tag = (next_mb->hash.rss | 1);
> > +			}
> >
> >  			uint32_t match = 0;
> >  			unsigned i;
> > @@ -401,7 +407,8 @@ rte_distributor_clear_returns(struct rte_distributor *d)
> >  struct rte_distributor *
> >  rte_distributor_create(const char *name,
> >  		unsigned socket_id,
> > -		unsigned num_workers)
> > +		unsigned num_workers,
> > +		rte_distributor_tag_fn tag_cb)
> >  {
> >  	struct rte_distributor *d;
> >  	struct rte_distributor_list *distributor_list;
> > @@ -435,6 +442,7 @@ rte_distributor_create(const char *name,
> >  	d = mz->addr;
> >  	snprintf(d->name, sizeof(d->name), "%s", name);
> >  	d->num_workers = num_workers;
> > +	d->tag_cb = tag_cb;
> >
> >  	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
> >  	TAILQ_INSERT_TAIL(distributor_list, d, next);
> > diff --git a/lib/librte_distributor/rte_distributor.h b/lib/librte_distributor/rte_distributor.h
> > index ec0d74a..844d325 100644
> > --- a/lib/librte_distributor/rte_distributor.h
> > +++ b/lib/librte_distributor/rte_distributor.h
> > @@ -52,6 +52,9 @@ extern "C" {
> >
> >  struct rte_distributor;
> >
> > +typedef uint32_t (*rte_distributor_tag_fn)(struct rte_mbuf *);
> > +/**< User defined tag calculation function */
> > +
> >  /**
> >   * Function to create a new distributor instance
> >   *
> > @@ -65,12 +68,14 @@ struct rte_distributor;
> >   * @param num_workers
> >   *   The maximum number of workers that will request packets from this
> >   *   distributor
> > + * @param tag_cb
> > + *   The callback function for calculation of user defined tag.
> >   * @return
> >   *   The newly created distributor instance
> >   */
> >  struct rte_distributor *
> >  rte_distributor_create(const char *name, unsigned socket_id,
> > -		unsigned num_workers);
> > +		unsigned num_workers, rte_distributor_tag_fn tag_cb);
> >
> >  /*  *** APIS to be called on the distributor lcore ***  */
> >  /*
> > --
> > 1.7.1
> >
  
Qinglai Xiao Nov. 5, 2014, 3:24 p.m. UTC | #3
Hi Konstantin,

I agree with you. A callback is not necessary. Pls refer to my previous
mail, which proposed a not-at-all-intrusive patch.
Pls let me know your concern.

thx &
rgds,
-qinglai

On Wed, Nov 5, 2014 at 5:13 PM, Ananyev, Konstantin <
konstantin.ananyev@intel.com> wrote:

>
>
> > -----Original Message-----
> > From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Bruce Richardson
> > Sent: Wednesday, November 05, 2014 2:28 PM
> > To: Qinglai Xiao
> > Cc: dev@dpdk.org
> > Subject: Re: [dpdk-dev] [PATCH] Add user defined tag calculation
> callback to librte_distributor.
> >
> > On Wed, Nov 05, 2014 at 03:30:37PM +0200, Qinglai Xiao wrote:
> > > User defined tag calculation has access to mbuf.
> > > Default tag is RSS hash result.
> > >
> >
> > Interesting idea.
> > Did you investigate was there any performance improvement or regression
> comparing
> > whether the callback was called per-packet as packets were dequeued for
> distribution
> > (i.e. how you have things now in your patch), compared to calling
> > the callback in a loop to extract the tags for all packets initially? I
> suspect
> > there probably isn't much performance difference either way, but it may
> be worth
> > checking.
> > One other point, is that I think the callback to extract the tag should
> have
> > additional parameters - at least one, if not two. I would suggest that
> the
> > distributor pointer be passed in, as well as an arbitrary void * pointer.
>
>
> Just wonder, why do you need a call-back?
> Why not just make rte_distributor_process() to accept an extra parameter:
> array of tags?
>
> rte_distributor_process(struct rte_distributor *d,
>                 struct rte_mbuf **mbufs, uint32_t *mbuf_tags, unsigned
> num_mbufs)
>
> ?
>
>
> >
> > Regards,
> > /Bruce
> >
> > > Signed-off-by: Qinglai Xiao <jigsaw@gmail.com>
> > > ---
> > >  app/test/test_distributor.c              |    6 +++---
> > >  app/test/test_distributor_perf.c         |    2 +-
> > >  lib/librte_distributor/rte_distributor.c |   12 ++++++++++--
> > >  lib/librte_distributor/rte_distributor.h |    7 ++++++-
> > >  4 files changed, 20 insertions(+), 7 deletions(-)
> > >
> > > diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
> > > index ce06436..6ea4943 100644
> > > --- a/app/test/test_distributor.c
> > > +++ b/app/test/test_distributor.c
> > > @@ -452,7 +452,7 @@ int test_error_distributor_create_name(void)
> > >     char *name = NULL;
> > >
> > >     d = rte_distributor_create(name, rte_socket_id(),
> > > -                   rte_lcore_count() - 1);
> > > +                   rte_lcore_count() - 1, NULL);
> > >     if (d != NULL || rte_errno != EINVAL) {
> > >             printf("ERROR: No error on create() with NULL name
> param\n");
> > >             return -1;
> > > @@ -467,7 +467,7 @@ int test_error_distributor_create_numworkers(void)
> > >  {
> > >     struct rte_distributor *d = NULL;
> > >     d = rte_distributor_create("test_numworkers", rte_socket_id(),
> > > -                   RTE_MAX_LCORE + 10);
> > > +                   RTE_MAX_LCORE + 10, NULL);
> > >     if (d != NULL || rte_errno != EINVAL) {
> > >             printf("ERROR: No error on create() with num_workers >
> MAX\n");
> > >             return -1;
> > > @@ -515,7 +515,7 @@ test_distributor(void)
> > >
> > >     if (d == NULL) {
> > >             d = rte_distributor_create("Test_distributor",
> rte_socket_id(),
> > > -                           rte_lcore_count() - 1);
> > > +                           rte_lcore_count() - 1, NULL);
> > >             if (d == NULL) {
> > >                     printf("Error creating distributor\n");
> > >                     return -1;
> > > diff --git a/app/test/test_distributor_perf.c
> b/app/test/test_distributor_perf.c
> > > index b04864c..507e446 100644
> > > --- a/app/test/test_distributor_perf.c
> > > +++ b/app/test/test_distributor_perf.c
> > > @@ -227,7 +227,7 @@ test_distributor_perf(void)
> > >
> > >     if (d == NULL) {
> > >             d = rte_distributor_create("Test_perf", rte_socket_id(),
> > > -                           rte_lcore_count() - 1);
> > > +                           rte_lcore_count() - 1, NULL);
> > >             if (d == NULL) {
> > >                     printf("Error creating distributor\n");
> > >                     return -1;
> > > diff --git a/lib/librte_distributor/rte_distributor.c
> b/lib/librte_distributor/rte_distributor.c
> > > index 585ff88..78c92bd 100644
> > > --- a/lib/librte_distributor/rte_distributor.c
> > > +++ b/lib/librte_distributor/rte_distributor.c
> > > @@ -97,6 +97,7 @@ struct rte_distributor {
> > >     union rte_distributor_buffer bufs[RTE_MAX_LCORE];
> > >
> > >     struct rte_distributor_returned_pkts returns;
> > > +   rte_distributor_tag_fn tag_cb;
> > >  };
> > >
> > >  TAILQ_HEAD(rte_distributor_list, rte_distributor);
> > > @@ -267,6 +268,7 @@ rte_distributor_process(struct rte_distributor *d,
> > >     struct rte_mbuf *next_mb = NULL;
> > >     int64_t next_value = 0;
> > >     uint32_t new_tag = 0;
> > > +   rte_distributor_tag_fn tag_cb = d->tag_cb;
> > >     unsigned ret_start = d->returns.start,
> > >                     ret_count = d->returns.count;
> > >
> > > @@ -282,7 +284,11 @@ rte_distributor_process(struct rte_distributor *d,
> > >                     next_mb = mbufs[next_idx++];
> > >                     next_value = (((int64_t)(uintptr_t)next_mb)
> > >                                     << RTE_DISTRIB_FLAG_BITS);
> > > -                   new_tag = (next_mb->hash.rss | 1);
> > > +                   if (tag_cb) {
> > > +                           new_tag = tag_cb(next_mb);
> > > +                   } else {
> > > +                           new_tag = (next_mb->hash.rss | 1);
> > > +                   }
> > >
> > >                     uint32_t match = 0;
> > >                     unsigned i;
> > > @@ -401,7 +407,8 @@ rte_distributor_clear_returns(struct
> rte_distributor *d)
> > >  struct rte_distributor *
> > >  rte_distributor_create(const char *name,
> > >             unsigned socket_id,
> > > -           unsigned num_workers)
> > > +           unsigned num_workers,
> > > +           rte_distributor_tag_fn tag_cb)
> > >  {
> > >     struct rte_distributor *d;
> > >     struct rte_distributor_list *distributor_list;
> > > @@ -435,6 +442,7 @@ rte_distributor_create(const char *name,
> > >     d = mz->addr;
> > >     snprintf(d->name, sizeof(d->name), "%s", name);
> > >     d->num_workers = num_workers;
> > > +   d->tag_cb = tag_cb;
> > >
> > >     rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
> > >     TAILQ_INSERT_TAIL(distributor_list, d, next);
> > > diff --git a/lib/librte_distributor/rte_distributor.h
> b/lib/librte_distributor/rte_distributor.h
> > > index ec0d74a..844d325 100644
> > > --- a/lib/librte_distributor/rte_distributor.h
> > > +++ b/lib/librte_distributor/rte_distributor.h
> > > @@ -52,6 +52,9 @@ extern "C" {
> > >
> > >  struct rte_distributor;
> > >
> > > +typedef uint32_t (*rte_distributor_tag_fn)(struct rte_mbuf *);
> > > +/**< User defined tag calculation function */
> > > +
> > >  /**
> > >   * Function to create a new distributor instance
> > >   *
> > > @@ -65,12 +68,14 @@ struct rte_distributor;
> > >   * @param num_workers
> > >   *   The maximum number of workers that will request packets from this
> > >   *   distributor
> > > + * @param tag_cb
> > > + *   The callback function for calculation of user defined tag.
> > >   * @return
> > >   *   The newly created distributor instance
> > >   */
> > >  struct rte_distributor *
> > >  rte_distributor_create(const char *name, unsigned socket_id,
> > > -           unsigned num_workers);
> > > +           unsigned num_workers, rte_distributor_tag_fn tag_cb);
> > >
> > >  /*  *** APIS to be called on the distributor lcore ***  */
> > >  /*
> > > --
> > > 1.7.1
> > >
>
  

Patch

diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
index ce06436..6ea4943 100644
--- a/app/test/test_distributor.c
+++ b/app/test/test_distributor.c
@@ -452,7 +452,7 @@  int test_error_distributor_create_name(void)
 	char *name = NULL;
 
 	d = rte_distributor_create(name, rte_socket_id(),
-			rte_lcore_count() - 1);
+			rte_lcore_count() - 1, NULL);
 	if (d != NULL || rte_errno != EINVAL) {
 		printf("ERROR: No error on create() with NULL name param\n");
 		return -1;
@@ -467,7 +467,7 @@  int test_error_distributor_create_numworkers(void)
 {
 	struct rte_distributor *d = NULL;
 	d = rte_distributor_create("test_numworkers", rte_socket_id(),
-			RTE_MAX_LCORE + 10);
+			RTE_MAX_LCORE + 10, NULL);
 	if (d != NULL || rte_errno != EINVAL) {
 		printf("ERROR: No error on create() with num_workers > MAX\n");
 		return -1;
@@ -515,7 +515,7 @@  test_distributor(void)
 
 	if (d == NULL) {
 		d = rte_distributor_create("Test_distributor", rte_socket_id(),
-				rte_lcore_count() - 1);
+				rte_lcore_count() - 1, NULL);
 		if (d == NULL) {
 			printf("Error creating distributor\n");
 			return -1;
diff --git a/app/test/test_distributor_perf.c b/app/test/test_distributor_perf.c
index b04864c..507e446 100644
--- a/app/test/test_distributor_perf.c
+++ b/app/test/test_distributor_perf.c
@@ -227,7 +227,7 @@  test_distributor_perf(void)
 
 	if (d == NULL) {
 		d = rte_distributor_create("Test_perf", rte_socket_id(),
-				rte_lcore_count() - 1);
+				rte_lcore_count() - 1, NULL);
 		if (d == NULL) {
 			printf("Error creating distributor\n");
 			return -1;
diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
index 585ff88..78c92bd 100644
--- a/lib/librte_distributor/rte_distributor.c
+++ b/lib/librte_distributor/rte_distributor.c
@@ -97,6 +97,7 @@  struct rte_distributor {
 	union rte_distributor_buffer bufs[RTE_MAX_LCORE];
 
 	struct rte_distributor_returned_pkts returns;
+	rte_distributor_tag_fn tag_cb;
 };
 
 TAILQ_HEAD(rte_distributor_list, rte_distributor);
@@ -267,6 +268,7 @@  rte_distributor_process(struct rte_distributor *d,
 	struct rte_mbuf *next_mb = NULL;
 	int64_t next_value = 0;
 	uint32_t new_tag = 0;
+	rte_distributor_tag_fn tag_cb = d->tag_cb;
 	unsigned ret_start = d->returns.start,
 			ret_count = d->returns.count;
 
@@ -282,7 +284,11 @@  rte_distributor_process(struct rte_distributor *d,
 			next_mb = mbufs[next_idx++];
 			next_value = (((int64_t)(uintptr_t)next_mb)
 					<< RTE_DISTRIB_FLAG_BITS);
-			new_tag = (next_mb->hash.rss | 1);
+			if (tag_cb) {
+				new_tag = tag_cb(next_mb);
+			} else {
+				new_tag = (next_mb->hash.rss | 1);
+			}
 
 			uint32_t match = 0;
 			unsigned i;
@@ -401,7 +407,8 @@  rte_distributor_clear_returns(struct rte_distributor *d)
 struct rte_distributor *
 rte_distributor_create(const char *name,
 		unsigned socket_id,
-		unsigned num_workers)
+		unsigned num_workers,
+		rte_distributor_tag_fn tag_cb)
 {
 	struct rte_distributor *d;
 	struct rte_distributor_list *distributor_list;
@@ -435,6 +442,7 @@  rte_distributor_create(const char *name,
 	d = mz->addr;
 	snprintf(d->name, sizeof(d->name), "%s", name);
 	d->num_workers = num_workers;
+	d->tag_cb = tag_cb;
 
 	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
 	TAILQ_INSERT_TAIL(distributor_list, d, next);
diff --git a/lib/librte_distributor/rte_distributor.h b/lib/librte_distributor/rte_distributor.h
index ec0d74a..844d325 100644
--- a/lib/librte_distributor/rte_distributor.h
+++ b/lib/librte_distributor/rte_distributor.h
@@ -52,6 +52,9 @@  extern "C" {
 
 struct rte_distributor;
 
+typedef uint32_t (*rte_distributor_tag_fn)(struct rte_mbuf *);
+/**< User defined tag calculation function */
+
 /**
  * Function to create a new distributor instance
  *
@@ -65,12 +68,14 @@  struct rte_distributor;
  * @param num_workers
  *   The maximum number of workers that will request packets from this
  *   distributor
+ * @param tag_cb
+ *   The callback function for calculation of user defined tag.
  * @return
  *   The newly created distributor instance
  */
 struct rte_distributor *
 rte_distributor_create(const char *name, unsigned socket_id,
-		unsigned num_workers);
+		unsigned num_workers, rte_distributor_tag_fn tag_cb);
 
 /*  *** APIS to be called on the distributor lcore ***  */
 /*