[dpdk-dev,1/5] mempool: add external mempool manager support

Message ID 1453829155-1366-2-git-send-email-david.hunt@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers

Commit Message

Hunt, David Jan. 26, 2016, 5:25 p.m. UTC
  Adds the new rte_mempool_create_ext api and callback mechanism for
external mempool handlers

Modifies the existing rte_mempool_create to set up the handler_idx to
the relevant mempool handler based on the handler name:
	ring_sp_sc
	ring_mp_mc
	ring_sp_mc
	ring_mp_sc

Signed-off-by: David Hunt <david.hunt@intel.com>
---
 app/test/test_mempool_perf.c              |   1 -
 lib/librte_mempool/Makefile               |   1 +
 lib/librte_mempool/rte_mempool.c          | 210 +++++++++++++++++++--------
 lib/librte_mempool/rte_mempool.h          | 207 +++++++++++++++++++++++----
 lib/librte_mempool/rte_mempool_default.c  | 229 ++++++++++++++++++++++++++++++
 lib/librte_mempool/rte_mempool_internal.h |  74 ++++++++++
 6 files changed, 634 insertions(+), 88 deletions(-)
 create mode 100644 lib/librte_mempool/rte_mempool_default.c
 create mode 100644 lib/librte_mempool/rte_mempool_internal.h
  

Comments

Jerin Jacob Jan. 28, 2016, 5:52 p.m. UTC | #1
On Tue, Jan 26, 2016 at 05:25:51PM +0000, David Hunt wrote:
> Adds the new rte_mempool_create_ext api and callback mechanism for
> external mempool handlers
> 
> Modifies the existing rte_mempool_create to set up the handler_idx to
> the relevant mempool handler based on the handler name:
> 	ring_sp_sc
> 	ring_mp_mc
> 	ring_sp_mc
> 	ring_mp_sc
> 
> Signed-off-by: David Hunt <david.hunt@intel.com>
> ---
>  app/test/test_mempool_perf.c              |   1 -
>  lib/librte_mempool/Makefile               |   1 +
>  lib/librte_mempool/rte_mempool.c          | 210 +++++++++++++++++++--------
>  lib/librte_mempool/rte_mempool.h          | 207 +++++++++++++++++++++++----
>  lib/librte_mempool/rte_mempool_default.c  | 229 ++++++++++++++++++++++++++++++
>  lib/librte_mempool/rte_mempool_internal.h |  74 ++++++++++
>  6 files changed, 634 insertions(+), 88 deletions(-)
>  create mode 100644 lib/librte_mempool/rte_mempool_default.c
>  create mode 100644 lib/librte_mempool/rte_mempool_internal.h
> 
> diff --git a/app/test/test_mempool_perf.c b/app/test/test_mempool_perf.c
> index cdc02a0..091c1df 100644
> --- a/app/test/test_mempool_perf.c
> +++ b/app/test/test_mempool_perf.c
> @@ -161,7 +161,6 @@ per_lcore_mempool_test(__attribute__((unused)) void *arg)
>  							   n_get_bulk);
>  				if (unlikely(ret < 0)) {
>  					rte_mempool_dump(stdout, mp);
> -					rte_ring_dump(stdout, mp->ring);
>  					/* in this case, objects are lost... */
>  					return -1;
>  				}
> diff --git a/lib/librte_mempool/Makefile b/lib/librte_mempool/Makefile
> index a6898ef..7c81ef6 100644
> --- a/lib/librte_mempool/Makefile
> +++ b/lib/librte_mempool/Makefile
> @@ -42,6 +42,7 @@ LIBABIVER := 1
>  
>  # all source are stored in SRCS-y
>  SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_mempool.c
> +SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_mempool_default.c
>  ifeq ($(CONFIG_RTE_LIBRTE_XEN_DOM0),y)
>  SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_dom0_mempool.c
>  endif
> diff --git a/lib/librte_mempool/rte_mempool.c b/lib/librte_mempool/rte_mempool.c
> index aff5f6d..8c01838 100644
> --- a/lib/librte_mempool/rte_mempool.c
> +++ b/lib/librte_mempool/rte_mempool.c
> @@ -59,10 +59,11 @@
>  #include <rte_spinlock.h>
>  
>  #include "rte_mempool.h"
> +#include "rte_mempool_internal.h"
>  
>  TAILQ_HEAD(rte_mempool_list, rte_tailq_entry);
>  
> -static struct rte_tailq_elem rte_mempool_tailq = {
> +struct rte_tailq_elem rte_mempool_tailq = {
>  	.name = "RTE_MEMPOOL",
>  };
>  EAL_REGISTER_TAILQ(rte_mempool_tailq)
> @@ -149,7 +150,7 @@ mempool_add_elem(struct rte_mempool *mp, void *obj, uint32_t obj_idx,
>  		obj_init(mp, obj_init_arg, obj, obj_idx);
>  
>  	/* enqueue in ring */
> -	rte_ring_sp_enqueue(mp->ring, obj);
> +	rte_mempool_ext_put_bulk(mp, &obj, 1);
>  }
>  
>  uint32_t
> @@ -375,48 +376,28 @@ rte_mempool_xmem_usage(void *vaddr, uint32_t elt_num, size_t elt_sz,
>  	return usz;
>  }
>  
> -#ifndef RTE_LIBRTE_XEN_DOM0
> -/* stub if DOM0 support not configured */
> -struct rte_mempool *
> -rte_dom0_mempool_create(const char *name __rte_unused,
> -			unsigned n __rte_unused,
> -			unsigned elt_size __rte_unused,
> -			unsigned cache_size __rte_unused,
> -			unsigned private_data_size __rte_unused,
> -			rte_mempool_ctor_t *mp_init __rte_unused,
> -			void *mp_init_arg __rte_unused,
> -			rte_mempool_obj_ctor_t *obj_init __rte_unused,
> -			void *obj_init_arg __rte_unused,
> -			int socket_id __rte_unused,
> -			unsigned flags __rte_unused)
> -{
> -	rte_errno = EINVAL;
> -	return NULL;
> -}
> -#endif
> -
>  /* create the mempool */
>  struct rte_mempool *
>  rte_mempool_create(const char *name, unsigned n, unsigned elt_size,
> -		   unsigned cache_size, unsigned private_data_size,
> -		   rte_mempool_ctor_t *mp_init, void *mp_init_arg,
> -		   rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
> -		   int socket_id, unsigned flags)
> +			unsigned cache_size, unsigned private_data_size,
> +			rte_mempool_ctor_t *mp_init, void *mp_init_arg,
> +			rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
> +			int socket_id, unsigned flags)
>  {
>  	if (rte_xen_dom0_supported())
>  		return rte_dom0_mempool_create(name, n, elt_size,
> -					       cache_size, private_data_size,
> -					       mp_init, mp_init_arg,
> -					       obj_init, obj_init_arg,
> -					       socket_id, flags);
> +			cache_size, private_data_size,
> +			mp_init, mp_init_arg,
> +			obj_init, obj_init_arg,
> +			socket_id, flags);
>  	else
>  		return rte_mempool_xmem_create(name, n, elt_size,
> -					       cache_size, private_data_size,
> -					       mp_init, mp_init_arg,
> -					       obj_init, obj_init_arg,
> -					       socket_id, flags,
> -					       NULL, NULL, MEMPOOL_PG_NUM_DEFAULT,
> -					       MEMPOOL_PG_SHIFT_MAX);
> +			cache_size, private_data_size,
> +			mp_init, mp_init_arg,
> +			obj_init, obj_init_arg,
> +			socket_id, flags,
> +			NULL, NULL,
> +			MEMPOOL_PG_NUM_DEFAULT, MEMPOOL_PG_SHIFT_MAX);
>  }
>  
>  /*
> @@ -435,11 +416,9 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>  		const phys_addr_t paddr[], uint32_t pg_num, uint32_t pg_shift)
>  {
>  	char mz_name[RTE_MEMZONE_NAMESIZE];
> -	char rg_name[RTE_RING_NAMESIZE];
>  	struct rte_mempool_list *mempool_list;
>  	struct rte_mempool *mp = NULL;
>  	struct rte_tailq_entry *te;
> -	struct rte_ring *r;
>  	const struct rte_memzone *mz;
>  	size_t mempool_size;
>  	int mz_flags = RTE_MEMZONE_1GB|RTE_MEMZONE_SIZE_HINT_ONLY;
> @@ -469,7 +448,7 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>  
>  	/* asked cache too big */
>  	if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE ||
> -	    CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
> +		CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
>  		rte_errno = EINVAL;
>  		return NULL;
>  	}
> @@ -502,16 +481,8 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>  		return NULL;
>  	}
>  
> -	rte_rwlock_write_lock(RTE_EAL_MEMPOOL_RWLOCK);
>  
> -	/* allocate the ring that will be used to store objects */
> -	/* Ring functions will return appropriate errors if we are
> -	 * running as a secondary process etc., so no checks made
> -	 * in this function for that condition */
> -	snprintf(rg_name, sizeof(rg_name), RTE_MEMPOOL_MZ_FORMAT, name);
> -	r = rte_ring_create(rg_name, rte_align32pow2(n+1), socket_id, rg_flags);
> -	if (r == NULL)
> -		goto exit;
> +	rte_rwlock_write_lock(RTE_EAL_MEMPOOL_RWLOCK);
>  
>  	/*
>  	 * reserve a memory zone for this mempool: private data is
> @@ -588,7 +559,6 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>  	memset(mp, 0, sizeof(*mp));
>  	snprintf(mp->name, sizeof(mp->name), "%s", name);
>  	mp->phys_addr = mz->phys_addr;
> -	mp->ring = r;
>  	mp->size = n;
>  	mp->flags = flags;
>  	mp->elt_size = objsz.elt_size;
> @@ -598,6 +568,22 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>  	mp->cache_flushthresh = CALC_CACHE_FLUSHTHRESH(cache_size);
>  	mp->private_data_size = private_data_size;
>  
> +	/*
> +	 * Since we have 4 combinations of the SP/SC/MP/MC, and stack,
> +	 * examine the
> +	 * flags to set the correct index into the handler table.
> +	 */
> +	if (flags & MEMPOOL_F_USE_STACK)
> +		mp->handler_idx = rte_get_mempool_handler("stack");
> +	else if (flags & (MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET))
> +		mp->handler_idx = rte_get_mempool_handler("ring_sp_sc");
> +	else if (flags & MEMPOOL_F_SP_PUT)
> +		mp->handler_idx = rte_get_mempool_handler("ring_sp_mc");
> +	else if (flags & MEMPOOL_F_SC_GET)
> +		mp->handler_idx = rte_get_mempool_handler("ring_mp_sc");
> +	else
> +		mp->handler_idx = rte_get_mempool_handler("ring_mp_mc");
> +

Why still use flag based selection? Why not "name" based? See below
for more description


>  	/* calculate address of the first element for continuous mempool. */
>  	obj = (char *)mp + MEMPOOL_HEADER_SIZE(mp, pg_num) +
>  		private_data_size;
> @@ -613,7 +599,6 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>  		mp->elt_va_start = (uintptr_t)obj;
>  		mp->elt_pa[0] = mp->phys_addr +
>  			(mp->elt_va_start - (uintptr_t)mp);
> -
>  	/* mempool elements in a separate chunk of memory. */
>  	} else {
>  		mp->elt_va_start = (uintptr_t)vaddr;
> @@ -622,6 +607,10 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>  
>  	mp->elt_va_end = mp->elt_va_start;
>  
> +	/* Parameters are setup. Call the mempool handler alloc */
> +	if ((rte_mempool_ext_alloc(mp, name, n, socket_id, flags)) == NULL)
> +		goto exit;
> +
>  	/* call the initializer */
>  	if (mp_init)
>  		mp_init(mp, mp_init_arg);
> @@ -646,7 +635,7 @@ rte_mempool_count(const struct rte_mempool *mp)
>  {
>  	unsigned count;
>  
> -	count = rte_ring_count(mp->ring);
> +	count = rte_mempool_ext_get_count(mp);
>  
>  #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
>  	{
> @@ -681,7 +670,9 @@ rte_mempool_dump_cache(FILE *f, const struct rte_mempool *mp)
>  	fprintf(f, "    cache_size=%"PRIu32"\n", mp->cache_size);
>  	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
>  		cache_count = mp->local_cache[lcore_id].len;
> -		fprintf(f, "    cache_count[%u]=%u\n", lcore_id, cache_count);
> +		if (cache_count > 0)
> +			fprintf(f, "    cache_count[%u]=%u\n",
> +						lcore_id, cache_count);
>  		count += cache_count;
>  	}
>  	fprintf(f, "    total_cache_count=%u\n", count);
> @@ -802,14 +793,13 @@ rte_mempool_dump(FILE *f, const struct rte_mempool *mp)
>  
>  	fprintf(f, "mempool <%s>@%p\n", mp->name, mp);
>  	fprintf(f, "  flags=%x\n", mp->flags);
> -	fprintf(f, "  ring=<%s>@%p\n", mp->ring->name, mp->ring);
>  	fprintf(f, "  phys_addr=0x%" PRIx64 "\n", mp->phys_addr);
>  	fprintf(f, "  size=%"PRIu32"\n", mp->size);
>  	fprintf(f, "  header_size=%"PRIu32"\n", mp->header_size);
>  	fprintf(f, "  elt_size=%"PRIu32"\n", mp->elt_size);
>  	fprintf(f, "  trailer_size=%"PRIu32"\n", mp->trailer_size);
>  	fprintf(f, "  total_obj_size=%"PRIu32"\n",
> -	       mp->header_size + mp->elt_size + mp->trailer_size);
> +		   mp->header_size + mp->elt_size + mp->trailer_size);
>  
>  	fprintf(f, "  private_data_size=%"PRIu32"\n", mp->private_data_size);
>  	fprintf(f, "  pg_num=%"PRIu32"\n", mp->pg_num);
> @@ -825,7 +815,7 @@ rte_mempool_dump(FILE *f, const struct rte_mempool *mp)
>  			mp->size);
>  
>  	cache_count = rte_mempool_dump_cache(f, mp);
> -	common_count = rte_ring_count(mp->ring);
> +	common_count = /* rte_ring_count(mp->ring)*/0;
>  	if ((cache_count + common_count) > mp->size)
>  		common_count = mp->size - cache_count;
>  	fprintf(f, "  common_pool_count=%u\n", common_count);
> @@ -904,7 +894,7 @@ rte_mempool_lookup(const char *name)
>  }
>  
>  void rte_mempool_walk(void (*func)(const struct rte_mempool *, void *),
> -		      void *arg)
> +			  void *arg)
>  {
>  	struct rte_tailq_entry *te = NULL;
>  	struct rte_mempool_list *mempool_list;
> @@ -919,3 +909,111 @@ void rte_mempool_walk(void (*func)(const struct rte_mempool *, void *),
>  
>  	rte_rwlock_read_unlock(RTE_EAL_MEMPOOL_RWLOCK);
>  }
> +
> +
> +/* create the mempool using and external mempool manager */
> +struct rte_mempool *
> +rte_mempool_create_ext(const char *name, unsigned n, unsigned elt_size,
> +			unsigned cache_size, unsigned private_data_size,
> +			rte_mempool_ctor_t *mp_init, void *mp_init_arg,
> +			rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
> +			int socket_id, unsigned flags,
> +			const char *handler_name)
> +{
> +	char mz_name[RTE_MEMZONE_NAMESIZE];
> +	struct rte_mempool_list *mempool_list;
> +	struct rte_mempool *mp = NULL;
> +	struct rte_tailq_entry *te;
> +	const struct rte_memzone *mz;
> +	size_t mempool_size;
> +	int mz_flags = RTE_MEMZONE_1GB|RTE_MEMZONE_SIZE_HINT_ONLY;
> +	int rg_flags = 0;
> +	int16_t handler_idx;
> +
> +	mempool_list = RTE_TAILQ_CAST(rte_mempool_tailq.head, rte_mempool_list);
> +
> +	/* asked cache too big */
> +	if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE ||
> +		CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
> +		rte_errno = EINVAL;
> +		return NULL;
> +	}
> +
> +	handler_idx = rte_get_mempool_handler(handler_name);
> +	if (handler_idx < 0) {
> +		RTE_LOG(ERR, MEMPOOL, "Cannot find mempool handler by name!\n");
> +		goto exit;
> +	}
> +
> +	/* ring flags */
> +	if (flags & MEMPOOL_F_SP_PUT)
> +		rg_flags |= RING_F_SP_ENQ;
> +	if (flags & MEMPOOL_F_SC_GET)
> +		rg_flags |= RING_F_SC_DEQ;
> +

rg_flags  not used anywhere down

> +	rte_rwlock_write_lock(RTE_EAL_MEMPOOL_RWLOCK);
> +
> +	/*
> +	 * reserve a memory zone for this mempool: private data is
> +	 * cache-aligned
> +	 */
> +	private_data_size = RTE_ALIGN_CEIL(private_data_size,
> +							RTE_MEMPOOL_ALIGN);
> +
> +	/* try to allocate tailq entry */
> +	te = rte_zmalloc("MEMPOOL_TAILQ_ENTRY", sizeof(*te), 0);
> +	if (te == NULL) {
> +		RTE_LOG(ERR, MEMPOOL, "Cannot allocate tailq entry!\n");
> +		goto exit;
> +	}
> +
> +	/*
> +	 * If user provided an external memory buffer, then use it to
> +	 * store mempool objects. Otherwise reserve a memzone that is large
> +	 * enough to hold mempool header and metadata plus mempool objects.
> +	 */
> +	mempool_size = sizeof(*mp) + private_data_size;
> +	mempool_size = RTE_ALIGN_CEIL(mempool_size, RTE_MEMPOOL_ALIGN);
> +
> +	snprintf(mz_name, sizeof(mz_name), RTE_MEMPOOL_MZ_FORMAT, name);
> +
> +	mz = rte_memzone_reserve(mz_name, mempool_size, socket_id, mz_flags);
> +
> +	/* no more memory */
> +	if (mz == NULL) {
> +		rte_free(te);
> +		goto exit;
> +	}
> +
> +	/* init the mempool structure */
> +	mp = mz->addr;
> +	memset(mp, 0, sizeof(*mp));
> +	snprintf(mp->name, sizeof(mp->name), "%s", name);
> +	mp->phys_addr = mz->phys_addr;
> +	mp->size = n;
> +	mp->flags = flags;
> +	mp->cache_size = cache_size;
> +	mp->cache_flushthresh = CALC_CACHE_FLUSHTHRESH(cache_size);
> +	mp->private_data_size = private_data_size;
> +	mp->handler_idx = handler_idx;
> +	mp->elt_size = elt_size;
> +	mp->rt_pool = rte_mempool_ext_alloc(mp, name, n, socket_id, flags);


IMO, We can avoid the duplicaition of above code with rte_mempool_create.
i.e  rte_mempool_create -> rte_mempool_create_ext(..,"ring_mp_mc")

> +
> +	/* call the initializer */
> +	if (mp_init)
> +		mp_init(mp, mp_init_arg);
> +
> +	mempool_populate(mp, n, 1, obj_init, obj_init_arg);
> +
> +	te->data = (void *) mp;
> +
> +	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
> +	TAILQ_INSERT_TAIL(mempool_list, te, next);
> +	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
> +
> +exit:
> +	rte_rwlock_write_unlock(RTE_EAL_MEMPOOL_RWLOCK);
> +
> +	return mp;
> +
> +}
> diff --git a/lib/librte_mempool/rte_mempool.h b/lib/librte_mempool/rte_mempool.h
> index 6e2390a..620cfb7 100644
> --- a/lib/librte_mempool/rte_mempool.h
> +++ b/lib/librte_mempool/rte_mempool.h
> @@ -88,6 +88,8 @@ extern "C" {
>  struct rte_mempool_debug_stats {
>  	uint64_t put_bulk;         /**< Number of puts. */
>  	uint64_t put_objs;         /**< Number of objects successfully put. */
> +	uint64_t put_pool_bulk;    /**< Number of puts into pool. */
> +	uint64_t put_pool_objs;    /**< Number of objects into pool. */
>  	uint64_t get_success_bulk; /**< Successful allocation number. */
>  	uint64_t get_success_objs; /**< Objects successfully allocated. */
>  	uint64_t get_fail_bulk;    /**< Failed allocation number. */
> @@ -123,6 +125,7 @@ struct rte_mempool_objsz {
>  #define RTE_MEMPOOL_NAMESIZE 32 /**< Maximum length of a memory pool. */
>  #define RTE_MEMPOOL_MZ_PREFIX "MP_"
>  
> +
>  /* "MP_<name>" */
>  #define	RTE_MEMPOOL_MZ_FORMAT	RTE_MEMPOOL_MZ_PREFIX "%s"
>  
> @@ -175,12 +178,85 @@ struct rte_mempool_objtlr {
>  #endif
>  };
>  
> +/* Handler functions for external mempool support */
> +typedef void *(*rte_mempool_alloc_t)(struct rte_mempool *mp,
> +		const char *name, unsigned n, int socket_id, unsigned flags);
> +typedef int (*rte_mempool_put_t)(void *p,
> +		void * const *obj_table, unsigned n);
> +typedef int (*rte_mempool_get_t)(void *p, void **obj_table,
> +		unsigned n);
> +typedef unsigned (*rte_mempool_get_count)(void *p);
> +typedef int(*rte_mempool_free_t)(struct rte_mempool *mp);
> +
> +/**
> + * @internal wrapper for external mempool manager alloc callback.
> + *
> + * @param mp
> + *   Pointer to the memory pool.
> + * @param name
> + *   Name of the statistics field to increment in the memory pool.
> + * @param n
> + *   Number to add to the object-oriented statistics.
> + * @param socket_id
> + *   socket id on which to allocate.
> + * @param flags
> + *   general flags to allocate function
> + */
> +void *
> +rte_mempool_ext_alloc(struct rte_mempool *mp,
> +		const char *name, unsigned n, int socket_id, unsigned flags);
> +
> +/**
> + * @internal wrapper for external mempool manager get callback.
> + *
> + * @param mp
> + *   Pointer to the memory pool.
> + * @param obj_table
> + *   Pointer to a table of void * pointers (objects).
> + * @param n
> + *	 Number of objects to get
> + */
> +int
> +rte_mempool_ext_get_bulk(struct rte_mempool *mp, void **obj_table,
> +		unsigned n);
> +
> +/**
> + * @internal wrapper for external mempool manager put callback.
> + *
> + * @param mp
> + *   Pointer to the memory pool.
> + * @param obj_table
> + *   Pointer to a table of void * pointers (objects).
> + * @param n
> + *   Number of objects to put
> + */
> +int
> +rte_mempool_ext_put_bulk(struct rte_mempool *mp, void * const *obj_table,
> +		unsigned n);
> +
> +/**
> + * @internal wrapper for external mempool manager get_count callback.
> + *
> + * @param mp
> + *   Pointer to the memory pool.
> + */
> +int
> +rte_mempool_ext_get_count(const struct rte_mempool *mp);
> +
> +/**
> + * @internal wrapper for external mempool manager free callback.
> + *
> + * @param mp
> + *   Pointer to the memory pool.
> + */
> +int
> +rte_mempool_ext_free(struct rte_mempool *mp);
> +
>  /**
>   * The RTE mempool structure.
>   */
>  struct rte_mempool {
>  	char name[RTE_MEMPOOL_NAMESIZE]; /**< Name of mempool. */
> -	struct rte_ring *ring;           /**< Ring to store objects. */
>  	phys_addr_t phys_addr;           /**< Phys. addr. of mempool struct. */
>  	int flags;                       /**< Flags of the mempool. */
>  	uint32_t size;                   /**< Size of the mempool. */
> @@ -194,6 +270,11 @@ struct rte_mempool {
>  
>  	unsigned private_data_size;      /**< Size of private data. */
>  
> +	/* Common pool data structure pointer */
> +	void *rt_pool __rte_cache_aligned;

Do we need to split rt_pool to next cache line, "cache_size"
variable, etc are used in fast path, and one more cache line will occupy
for this change

> +
> +	int16_t handler_idx;
> +
>  #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
>  	/** Per-lcore local cache. */
>  	struct rte_mempool_cache local_cache[RTE_MAX_LCORE];
> @@ -223,6 +304,10 @@ struct rte_mempool {
>  #define MEMPOOL_F_NO_CACHE_ALIGN 0x0002 /**< Do not align objs on cache lines.*/
>  #define MEMPOOL_F_SP_PUT         0x0004 /**< Default put is "single-producer".*/
>  #define MEMPOOL_F_SC_GET         0x0008 /**< Default get is "single-consumer".*/
> +#define MEMPOOL_F_USE_STACK      0x0010 /**< Use a stack for the common pool. */
> +#define MEMPOOL_F_USE_TM         0x0020
> +#define MEMPOOL_F_NO_SECONDARY   0x0040
> +
>  
>  /**
>   * @internal When debug is enabled, store some statistics.
> @@ -728,7 +813,6 @@ rte_dom0_mempool_create(const char *name, unsigned n, unsigned elt_size,
>  		rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
>  		int socket_id, unsigned flags);
>  
> -
>  /**
>   * Dump the status of the mempool to the console.
>   *
> @@ -753,7 +837,7 @@ void rte_mempool_dump(FILE *f, const struct rte_mempool *mp);
>   */
>  static inline void __attribute__((always_inline))
>  __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
> -		    unsigned n, int is_mp)
> +		    unsigned n, __attribute__((unused)) int is_mp)
>  {
>  #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
>  	struct rte_mempool_cache *cache;
> @@ -769,8 +853,7 @@ __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
>  
>  #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
>  	/* cache is not enabled or single producer or non-EAL thread */
> -	if (unlikely(cache_size == 0 || is_mp == 0 ||
> -		     lcore_id >= RTE_MAX_LCORE))
> +	if (unlikely(cache_size == 0 || lcore_id >= RTE_MAX_LCORE))
>  		goto ring_enqueue;
>  
>  	/* Go straight to ring if put would overflow mem allocated for cache */
> @@ -793,8 +876,8 @@ __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
>  
>  	cache->len += n;
>  
> -	if (cache->len >= flushthresh) {
> -		rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache_size],
> +	if (unlikely(cache->len >= flushthresh)) {
> +		rte_mempool_ext_put_bulk(mp, &cache->objs[cache_size],
>  				cache->len - cache_size);
>  		cache->len = cache_size;
>  	}
> @@ -804,22 +887,10 @@ __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
>  ring_enqueue:
>  #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
>  
> -	/* push remaining objects in ring */
> -#ifdef RTE_LIBRTE_MEMPOOL_DEBUG
> -	if (is_mp) {
> -		if (rte_ring_mp_enqueue_bulk(mp->ring, obj_table, n) < 0)
> -			rte_panic("cannot put objects in mempool\n");
> -	}
> -	else {
> -		if (rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n) < 0)
> -			rte_panic("cannot put objects in mempool\n");
> -	}
> -#else
> -	if (is_mp)
> -		rte_ring_mp_enqueue_bulk(mp->ring, obj_table, n);
> -	else
> -		rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n);
> -#endif
> +	/* Increment stats counter to tell us how many pool puts happened */
> +	__MEMPOOL_STAT_ADD(mp, put_pool, n);
> +
> +	rte_mempool_ext_put_bulk(mp, obj_table, n);
>  }
>  
>  
> @@ -943,7 +1014,7 @@ rte_mempool_put(struct rte_mempool *mp, void *obj)
>   */
>  static inline int __attribute__((always_inline))
>  __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
> -		   unsigned n, int is_mc)
> +		   unsigned n, __attribute__((unused))int is_mc)
>  {
>  	int ret;
>  #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
> @@ -954,8 +1025,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
>  	uint32_t cache_size = mp->cache_size;
>  
>  	/* cache is not enabled or single consumer */
> -	if (unlikely(cache_size == 0 || is_mc == 0 ||
> -		     n >= cache_size || lcore_id >= RTE_MAX_LCORE))
> +	if (unlikely(cache_size == 0 || n >= cache_size ||
> +						lcore_id >= RTE_MAX_LCORE))
>  		goto ring_dequeue;
>  
>  	cache = &mp->local_cache[lcore_id];
> @@ -967,7 +1038,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
>  		uint32_t req = n + (cache_size - cache->len);
>  
>  		/* How many do we require i.e. number to fill the cache + the request */
> -		ret = rte_ring_mc_dequeue_bulk(mp->ring, &cache->objs[cache->len], req);
> +		ret = rte_mempool_ext_get_bulk(mp,
> +						&cache->objs[cache->len], req);
>  		if (unlikely(ret < 0)) {
>  			/*
>  			 * In the offchance that we are buffer constrained,
> @@ -995,10 +1067,7 @@ ring_dequeue:
>  #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
>  
>  	/* get remaining objects from ring */
> -	if (is_mc)
> -		ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, n);
> -	else
> -		ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);
> +	ret = rte_mempool_ext_get_bulk(mp, obj_table, n);
>  
>  	if (ret < 0)
>  		__MEMPOOL_STAT_ADD(mp, get_fail, n);
> @@ -1401,6 +1470,82 @@ ssize_t rte_mempool_xmem_usage(void *vaddr, uint32_t elt_num, size_t elt_sz,
>  void rte_mempool_walk(void (*func)(const struct rte_mempool *, void *arg),
>  		      void *arg);
>  
> +/**
> + * Function to get an index to an external mempool manager
> + *
> + * @param name
> + *   The name of the mempool handler to search for in the list of handlers
> + * @return
> + *   The index of the mempool handler in the list of registered mempool
> + *   handlers
> + */
> +int16_t
> +rte_get_mempool_handler(const char *name);
> +
> +
> +/**
> + * Create a new mempool named *name* in memory.
> + *
> + * This function uses an externally defined alloc callback to allocate memory.
> + * Its size is set to n elements.
> + * All elements of the mempool are allocated separately to the mempool header.
> + *
> + * @param name
> + *   The name of the mempool.
> + * @param n
> + *   The number of elements in the mempool. The optimum size (in terms of
> + *   memory usage) for a mempool is when n is a power of two minus one:
> + *   n = (2^q - 1).
> + * @param cache_size
> + *   If cache_size is non-zero, the rte_mempool library will try to
> + *   limit the accesses to the common lockless pool, by maintaining a
> + *   per-lcore object cache. This argument must be lower or equal to
> + *   CONFIG_RTE_MEMPOOL_CACHE_MAX_SIZE and n / 1.5. It is advised to choose
> + *   cache_size to have "n modulo cache_size == 0": if this is
> + *   not the case, some elements will always stay in the pool and will
> + *   never be used. The access to the per-lcore table is of course
> + *   faster than the multi-producer/consumer pool. The cache can be
> + *   disabled if the cache_size argument is set to 0; it can be useful to
> + *   avoid losing objects in cache. Note that even if not used, the
> + *   memory space for cache is always reserved in a mempool structure,
> + *   except if CONFIG_RTE_MEMPOOL_CACHE_MAX_SIZE is set to 0.
> + * @param private_data_size
> + *   The size of the private data appended after the mempool
> + *   structure. This is useful for storing some private data after the
> + *   mempool structure, as is done for rte_mbuf_pool for example.
> + * @param mp_init
> + *   A function pointer that is called for initialization of the pool,
> + *   before object initialization. The user can initialize the private
> + *   data in this function if needed. This parameter can be NULL if
> + *   not needed.
> + * @param mp_init_arg
> + *   An opaque pointer to data that can be used in the mempool
> + *   constructor function.
> + * @param obj_init
> + *   A function pointer that is called for each object at
> + *   initialization of the pool. The user can set some meta data in
> + *   objects if needed. This parameter can be NULL if not needed.
> + *   The obj_init() function takes the mempool pointer, the init_arg,
> + *   the object pointer and the object number as parameters.
> + * @param obj_init_arg
> + *   An opaque pointer to data that can be used as an argument for
> + *   each call to the object constructor function.
> + * @param socket_id
> + *   The *socket_id* argument is the socket identifier in the case of
> + *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
> + *   constraint for the reserved zone.
> + * @param flags
> + * @return
> + *   The pointer to the new allocated mempool, on success. NULL on error
> + */
> +struct rte_mempool *
> +rte_mempool_create_ext(const char *name, unsigned n, unsigned elt_size,
> +		unsigned cache_size, unsigned private_data_size,
> +		rte_mempool_ctor_t *mp_init, void *mp_init_arg,
> +		rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
> +		int socket_id, unsigned flags,
> +		const char *handler_name);
> +
>  #ifdef __cplusplus
>  }
>  #endif
> diff --git a/lib/librte_mempool/rte_mempool_default.c b/lib/librte_mempool/rte_mempool_default.c
> new file mode 100644
> index 0000000..2493dc1
> --- /dev/null
> +++ b/lib/librte_mempool/rte_mempool_default.c
> @@ -0,0 +1,229 @@
> +/*-
> + *   BSD LICENSE
> + *
> + *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
> + *   All rights reserved.
> + *
> + *   Redistribution and use in source and binary forms, with or without
> + *   modification, are permitted provided that the following conditions
> + *   are met:
> + *
> + *     * Redistributions of source code must retain the above copyright
> + *       notice, this list of conditions and the following disclaimer.
> + *     * Redistributions in binary form must reproduce the above copyright
> + *       notice, this list of conditions and the following disclaimer in
> + *       the documentation and/or other materials provided with the
> + *       distribution.
> + *     * Neither the name of Intel Corporation nor the names of its
> + *       contributors may be used to endorse or promote products derived
> + *       from this software without specific prior written permission.
> + *
> + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> + */
> +
> +#include <stdio.h>
> +#include <rte_mempool.h>
> +#include <rte_malloc.h>
> +#include <string.h>
> +
> +#include "rte_mempool_internal.h"
> +
> +/*
> + * Indirect jump table to support external memory pools
> + */
> +struct rte_mempool_handler_list mempool_handler_list = {
> +	.sl =  RTE_SPINLOCK_INITIALIZER ,
> +	.num_handlers = 0
> +};
> +
> +/* TODO Convert to older mechanism of an array of stucts */
> +int16_t
> +add_handler(struct rte_mempool_handler *h)
> +{
> +	int16_t handler_idx;
> +
> +	/*  */
> +	rte_spinlock_lock(&mempool_handler_list.sl);
> +
> +	/* Check whether jump table has space */
> +	if (mempool_handler_list.num_handlers >= RTE_MEMPOOL_MAX_HANDLER_IDX) {
> +		rte_spinlock_unlock(&mempool_handler_list.sl);
> +		RTE_LOG(ERR, MEMPOOL,
> +				"Maximum number of mempool handlers exceeded\n");
> +		return -1;
> +	}
> +
> +	if ((h->put == NULL) || (h->get == NULL) ||
> +		(h->get_count == NULL)) {
> +		rte_spinlock_unlock(&mempool_handler_list.sl);
> +		 RTE_LOG(ERR, MEMPOOL,
> +					"Missing callback while registering mempool handler\n");
> +		return -1;
> +	}
> +
> +	/* add new handler index */
> +	handler_idx = mempool_handler_list.num_handlers++;
> +
> +	snprintf(mempool_handler_list.handler[handler_idx].name,
> +				RTE_MEMPOOL_NAMESIZE, "%s", h->name);
> +	mempool_handler_list.handler[handler_idx].alloc = h->alloc;
> +	mempool_handler_list.handler[handler_idx].put = h->put;
> +	mempool_handler_list.handler[handler_idx].get = h->get;
> +	mempool_handler_list.handler[handler_idx].get_count = h->get_count;
> +
> +	rte_spinlock_unlock(&mempool_handler_list.sl);
> +
> +	return handler_idx;
> +}
> +
> +/* TODO Convert to older mechanism of an array of stucts */
> +int16_t
> +rte_get_mempool_handler(const char *name)
> +{
> +	int16_t i;
> +
> +	for (i = 0; i < mempool_handler_list.num_handlers; i++) {
> +		if (!strcmp(name, mempool_handler_list.handler[i].name))
> +			return i;
> +	}
> +	return -1;
> +}
> +
> +static int
> +common_ring_mp_put(void *p, void * const *obj_table, unsigned n)
> +{
> +	return rte_ring_mp_enqueue_bulk((struct rte_ring *)p, obj_table, n);
> +}
> +
> +static int
> +common_ring_sp_put(void *p, void * const *obj_table, unsigned n)
> +{
> +	return rte_ring_sp_enqueue_bulk((struct rte_ring *)p, obj_table, n);
> +}
> +
> +static int
> +common_ring_mc_get(void *p, void **obj_table, unsigned n)
> +{
> +	return rte_ring_mc_dequeue_bulk((struct rte_ring *)p, obj_table, n);
> +}
> +
> +static int
> +common_ring_sc_get(void *p, void **obj_table, unsigned n)
> +{
> +	return rte_ring_sc_dequeue_bulk((struct rte_ring *)p, obj_table, n);
> +}
> +
> +static unsigned
> +common_ring_get_count(void *p)
> +{
> +	return rte_ring_count((struct rte_ring *)p);
> +}
> +
> +
> +static void *
> +rte_mempool_common_ring_alloc(struct rte_mempool *mp,
> +		const char *name, unsigned n, int socket_id, unsigned flags)
> +{
> +	struct rte_ring *r;
> +	char rg_name[RTE_RING_NAMESIZE];
> +	int rg_flags = 0;
> +
> +	if (flags & MEMPOOL_F_SP_PUT)
> +		rg_flags |= RING_F_SP_ENQ;
> +	if (flags & MEMPOOL_F_SC_GET)
> +		rg_flags |= RING_F_SC_DEQ;
> +
> +	/* allocate the ring that will be used to store objects */
> +	/* Ring functions will return appropriate errors if we are
> +	 * running as a secondary process etc., so no checks made
> +	 * in this function for that condition */
> +	snprintf(rg_name, sizeof(rg_name), "%s-ring", name);
> +	r = rte_ring_create(rg_name, rte_align32pow2(n+1), socket_id, rg_flags);
> +	if (r == NULL)
> +		return NULL;
> +
> +	mp->rt_pool = (void *)r;
> +
> +	return (void *) r;
> +}
> +
> +void *
> +rte_mempool_ext_alloc(struct rte_mempool *mp,
> +		const char *name, unsigned n, int socket_id, unsigned flags)
> +{
> +	if (mempool_handler_list.handler[mp->handler_idx].alloc) {
> +		return (mempool_handler_list.handler[mp->handler_idx].alloc)
> +						(mp, name, n, socket_id, flags);
> +	}
> +	return NULL;
> +}
> +
> +inline int __attribute__((always_inline))
> +rte_mempool_ext_get_bulk(struct rte_mempool *mp, void **obj_table, unsigned n)
> +{
> +	return (mempool_handler_list.handler[mp->handler_idx].get)
> +						(mp->rt_pool, obj_table, n);
> +}
> +
> +inline int __attribute__((always_inline))
> +rte_mempool_ext_put_bulk(struct rte_mempool *mp, void * const *obj_table,
> +		unsigned n)
> +{
> +	return (mempool_handler_list.handler[mp->handler_idx].put)
> +						(mp->rt_pool, obj_table, n);
> +}
> +
> +int
> +rte_mempool_ext_get_count(const struct rte_mempool *mp)
> +{
> +	return (mempool_handler_list.handler[mp->handler_idx].get_count)
> +						(mp->rt_pool);
> +}
> +
> +static struct rte_mempool_handler handler_mp_mc = {
> +	.name = "ring_mp_mc",
> +	.alloc = rte_mempool_common_ring_alloc,
> +	.put = common_ring_mp_put,
> +	.get = common_ring_mc_get,
> +	.get_count = common_ring_get_count,
> +	.free = NULL
> +};
> +static struct rte_mempool_handler handler_sp_sc = {
> +	.name = "ring_sp_sc",
> +	.alloc = rte_mempool_common_ring_alloc,
> +	.put = common_ring_sp_put,
> +	.get = common_ring_sc_get,
> +	.get_count = common_ring_get_count,
> +	.free = NULL
> +};
> +static struct rte_mempool_handler handler_mp_sc = {
> +	.name = "ring_mp_sc",
> +	.alloc = rte_mempool_common_ring_alloc,
> +	.put = common_ring_mp_put,
> +	.get = common_ring_sc_get,
> +	.get_count = common_ring_get_count,
> +	.free = NULL
> +};
> +static struct rte_mempool_handler handler_sp_mc = {
> +	.name = "ring_sp_mc",
> +	.alloc = rte_mempool_common_ring_alloc,
> +	.put = common_ring_sp_put,
> +	.get = common_ring_mc_get,
> +	.get_count = common_ring_get_count,
> +	.free = NULL
> +};
> +
> +REGISTER_MEMPOOL_HANDLER(handler_mp_mc);
> +REGISTER_MEMPOOL_HANDLER(handler_sp_sc);
> +REGISTER_MEMPOOL_HANDLER(handler_mp_sc);
> +REGISTER_MEMPOOL_HANDLER(handler_sp_mc);
> diff --git a/lib/librte_mempool/rte_mempool_internal.h b/lib/librte_mempool/rte_mempool_internal.h
> new file mode 100644
> index 0000000..92b7bde
> --- /dev/null
> +++ b/lib/librte_mempool/rte_mempool_internal.h
> @@ -0,0 +1,74 @@
> +/*-
> + *   BSD LICENSE
> + *
> + *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
> + *   All rights reserved.
> + *
> + *   Redistribution and use in source and binary forms, with or without
> + *   modification, are permitted provided that the following conditions
> + *   are met:
> + *
> + *     * Redistributions of source code must retain the above copyright
> + *       notice, this list of conditions and the following disclaimer.
> + *     * Redistributions in binary form must reproduce the above copyright
> + *       notice, this list of conditions and the following disclaimer in
> + *       the documentation and/or other materials provided with the
> + *       distribution.
> + *     * Neither the name of Intel Corporation nor the names of its
> + *       contributors may be used to endorse or promote products derived
> + *       from this software without specific prior written permission.
> + *
> + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> + */
> +
> +#ifndef _RTE_MEMPOOL_INTERNAL_H_
> +#define _RTE_MEMPOOL_INTERNAL_H_
> +
> +#include <rte_spinlock.h>
> +#include <rte_mempool.h>
> +
> +#define RTE_MEMPOOL_MAX_HANDLER_IDX 16
> +
> +struct rte_mempool_handler {
> +	char name[RTE_MEMPOOL_NAMESIZE]; /**< Name of mempool handler */
> +
> +	rte_mempool_alloc_t alloc;
> +
> +	rte_mempool_put_t put __rte_cache_aligned;
> +
> +	rte_mempool_get_t get __rte_cache_aligned;
> +
> +	rte_mempool_get_count get_count __rte_cache_aligned;
> +
> +	rte_mempool_free_t free __rte_cache_aligned;
> +};

IMO, The structure should cache aligned not the individual
elements as elements are likely read only in fast path.

> +
> +struct rte_mempool_handler_list {
> +	rte_spinlock_t sl;		  /**< Spinlock for add/delete. */
> +
> +	int32_t num_handlers;	  /**< Number of handlers that are valid. */
> +
> +	/* storage for all possible handlers */
> +	struct rte_mempool_handler handler[RTE_MEMPOOL_MAX_HANDLER_IDX];
> +};
> +
> +int16_t add_handler(struct rte_mempool_handler *h);
> +
> +#define REGISTER_MEMPOOL_HANDLER(h) \
> +static int16_t __attribute__((used)) testfn_##h(void);\
> +int16_t __attribute__((constructor, used)) testfn_##h(void)\
> +{\
> +	return add_handler(&h);\
> +}
> +
> +#endif
> -- 
> 1.9.3
>
  
Hunt, David Feb. 3, 2016, 2:16 p.m. UTC | #2
On 28/01/2016 17:52, Jerin Jacob wrote:
> On Tue, Jan 26, 2016 at 05:25:51PM +0000, David Hunt wrote:
>> Adds the new rte_mempool_create_ext api and callback mechanism for
>> external mempool handlers
>>
>> Modifies the existing rte_mempool_create to set up the handler_idx to
>> the relevant mempool handler based on the handler name:
>> 	ring_sp_sc
>> 	ring_mp_mc
>> 	ring_sp_mc
>> 	ring_mp_sc
>>
>> Signed-off-by: David Hunt <david.hunt@intel.com>
>> ---
>>   app/test/test_mempool_perf.c              |   1 -
>>   lib/librte_mempool/Makefile               |   1 +
>>   lib/librte_mempool/rte_mempool.c          | 210 +++++++++++++++++++--------
>>   lib/librte_mempool/rte_mempool.h          | 207 +++++++++++++++++++++++----
>>   lib/librte_mempool/rte_mempool_default.c  | 229 ++++++++++++++++++++++++++++++
>>   lib/librte_mempool/rte_mempool_internal.h |  74 ++++++++++
>>   6 files changed, 634 insertions(+), 88 deletions(-)
>>   create mode 100644 lib/librte_mempool/rte_mempool_default.c
>>   create mode 100644 lib/librte_mempool/rte_mempool_internal.h
>>
>> diff --git a/app/test/test_mempool_perf.c b/app/test/test_mempool_perf.c
>> index cdc02a0..091c1df 100644
>> --- a/app/test/test_mempool_perf.c
>> +++ b/app/test/test_mempool_perf.c
>> @@ -161,7 +161,6 @@ per_lcore_mempool_test(__attribute__((unused)) void *arg)
>>   							   n_get_bulk);
>>   				if (unlikely(ret < 0)) {
>>   					rte_mempool_dump(stdout, mp);
>> -					rte_ring_dump(stdout, mp->ring);
>>   					/* in this case, objects are lost... */
>>   					return -1;
>>   				}
>> diff --git a/lib/librte_mempool/Makefile b/lib/librte_mempool/Makefile
>> index a6898ef..7c81ef6 100644
>> --- a/lib/librte_mempool/Makefile
>> +++ b/lib/librte_mempool/Makefile
>> @@ -42,6 +42,7 @@ LIBABIVER := 1
>>
>>   # all source are stored in SRCS-y
>>   SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_mempool.c
>> +SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_mempool_default.c
>>   ifeq ($(CONFIG_RTE_LIBRTE_XEN_DOM0),y)
>>   SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_dom0_mempool.c
>>   endif
>> diff --git a/lib/librte_mempool/rte_mempool.c b/lib/librte_mempool/rte_mempool.c
>> index aff5f6d..8c01838 100644
>> --- a/lib/librte_mempool/rte_mempool.c
>> +++ b/lib/librte_mempool/rte_mempool.c
>> @@ -59,10 +59,11 @@
>>   #include <rte_spinlock.h>
>>
>>   #include "rte_mempool.h"
>> +#include "rte_mempool_internal.h"
>>
>>   TAILQ_HEAD(rte_mempool_list, rte_tailq_entry);
>>
>> -static struct rte_tailq_elem rte_mempool_tailq = {
>> +struct rte_tailq_elem rte_mempool_tailq = {
>>   	.name = "RTE_MEMPOOL",
>>   };
>>   EAL_REGISTER_TAILQ(rte_mempool_tailq)
>> @@ -149,7 +150,7 @@ mempool_add_elem(struct rte_mempool *mp, void *obj, uint32_t obj_idx,
>>   		obj_init(mp, obj_init_arg, obj, obj_idx);
>>
>>   	/* enqueue in ring */
>> -	rte_ring_sp_enqueue(mp->ring, obj);
>> +	rte_mempool_ext_put_bulk(mp, &obj, 1);
>>   }
>>
>>   uint32_t
>> @@ -375,48 +376,28 @@ rte_mempool_xmem_usage(void *vaddr, uint32_t elt_num, size_t elt_sz,
>>   	return usz;
>>   }
>>
>> -#ifndef RTE_LIBRTE_XEN_DOM0
>> -/* stub if DOM0 support not configured */
>> -struct rte_mempool *
>> -rte_dom0_mempool_create(const char *name __rte_unused,
>> -			unsigned n __rte_unused,
>> -			unsigned elt_size __rte_unused,
>> -			unsigned cache_size __rte_unused,
>> -			unsigned private_data_size __rte_unused,
>> -			rte_mempool_ctor_t *mp_init __rte_unused,
>> -			void *mp_init_arg __rte_unused,
>> -			rte_mempool_obj_ctor_t *obj_init __rte_unused,
>> -			void *obj_init_arg __rte_unused,
>> -			int socket_id __rte_unused,
>> -			unsigned flags __rte_unused)
>> -{
>> -	rte_errno = EINVAL;
>> -	return NULL;
>> -}
>> -#endif
>> -
>>   /* create the mempool */
>>   struct rte_mempool *
>>   rte_mempool_create(const char *name, unsigned n, unsigned elt_size,
>> -		   unsigned cache_size, unsigned private_data_size,
>> -		   rte_mempool_ctor_t *mp_init, void *mp_init_arg,
>> -		   rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
>> -		   int socket_id, unsigned flags)
>> +			unsigned cache_size, unsigned private_data_size,
>> +			rte_mempool_ctor_t *mp_init, void *mp_init_arg,
>> +			rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
>> +			int socket_id, unsigned flags)
>>   {
>>   	if (rte_xen_dom0_supported())
>>   		return rte_dom0_mempool_create(name, n, elt_size,
>> -					       cache_size, private_data_size,
>> -					       mp_init, mp_init_arg,
>> -					       obj_init, obj_init_arg,
>> -					       socket_id, flags);
>> +			cache_size, private_data_size,
>> +			mp_init, mp_init_arg,
>> +			obj_init, obj_init_arg,
>> +			socket_id, flags);
>>   	else
>>   		return rte_mempool_xmem_create(name, n, elt_size,
>> -					       cache_size, private_data_size,
>> -					       mp_init, mp_init_arg,
>> -					       obj_init, obj_init_arg,
>> -					       socket_id, flags,
>> -					       NULL, NULL, MEMPOOL_PG_NUM_DEFAULT,
>> -					       MEMPOOL_PG_SHIFT_MAX);
>> +			cache_size, private_data_size,
>> +			mp_init, mp_init_arg,
>> +			obj_init, obj_init_arg,
>> +			socket_id, flags,
>> +			NULL, NULL,
>> +			MEMPOOL_PG_NUM_DEFAULT, MEMPOOL_PG_SHIFT_MAX);
>>   }
>>
>>   /*
>> @@ -435,11 +416,9 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>>   		const phys_addr_t paddr[], uint32_t pg_num, uint32_t pg_shift)
>>   {
>>   	char mz_name[RTE_MEMZONE_NAMESIZE];
>> -	char rg_name[RTE_RING_NAMESIZE];
>>   	struct rte_mempool_list *mempool_list;
>>   	struct rte_mempool *mp = NULL;
>>   	struct rte_tailq_entry *te;
>> -	struct rte_ring *r;
>>   	const struct rte_memzone *mz;
>>   	size_t mempool_size;
>>   	int mz_flags = RTE_MEMZONE_1GB|RTE_MEMZONE_SIZE_HINT_ONLY;
>> @@ -469,7 +448,7 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>>
>>   	/* asked cache too big */
>>   	if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE ||
>> -	    CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
>> +		CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
>>   		rte_errno = EINVAL;
>>   		return NULL;
>>   	}
>> @@ -502,16 +481,8 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>>   		return NULL;
>>   	}
>>
>> -	rte_rwlock_write_lock(RTE_EAL_MEMPOOL_RWLOCK);
>>
>> -	/* allocate the ring that will be used to store objects */
>> -	/* Ring functions will return appropriate errors if we are
>> -	 * running as a secondary process etc., so no checks made
>> -	 * in this function for that condition */
>> -	snprintf(rg_name, sizeof(rg_name), RTE_MEMPOOL_MZ_FORMAT, name);
>> -	r = rte_ring_create(rg_name, rte_align32pow2(n+1), socket_id, rg_flags);
>> -	if (r == NULL)
>> -		goto exit;
>> +	rte_rwlock_write_lock(RTE_EAL_MEMPOOL_RWLOCK);
>>
>>   	/*
>>   	 * reserve a memory zone for this mempool: private data is
>> @@ -588,7 +559,6 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>>   	memset(mp, 0, sizeof(*mp));
>>   	snprintf(mp->name, sizeof(mp->name), "%s", name);
>>   	mp->phys_addr = mz->phys_addr;
>> -	mp->ring = r;
>>   	mp->size = n;
>>   	mp->flags = flags;
>>   	mp->elt_size = objsz.elt_size;
>> @@ -598,6 +568,22 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>>   	mp->cache_flushthresh = CALC_CACHE_FLUSHTHRESH(cache_size);
>>   	mp->private_data_size = private_data_size;
>>
>> +	/*
>> +	 * Since we have 4 combinations of the SP/SC/MP/MC, and stack,
>> +	 * examine the
>> +	 * flags to set the correct index into the handler table.
>> +	 */
>> +	if (flags & MEMPOOL_F_USE_STACK)
>> +		mp->handler_idx = rte_get_mempool_handler("stack");
>> +	else if (flags & (MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET))
>> +		mp->handler_idx = rte_get_mempool_handler("ring_sp_sc");
>> +	else if (flags & MEMPOOL_F_SP_PUT)
>> +		mp->handler_idx = rte_get_mempool_handler("ring_sp_mc");
>> +	else if (flags & MEMPOOL_F_SC_GET)
>> +		mp->handler_idx = rte_get_mempool_handler("ring_mp_sc");
>> +	else
>> +		mp->handler_idx = rte_get_mempool_handler("ring_mp_mc");
>> +
>
> Why still use flag based selection? Why not "name" based? See below
> for more description


The old API does not have a 'name' parameter, so needs to work out which
handler to use based on the flags. This is not necessary in the new API 
call, and so it uses the "name" based index.


>>   	/* calculate address of the first element for continuous mempool. */
>>   	obj = (char *)mp + MEMPOOL_HEADER_SIZE(mp, pg_num) +
>>   		private_data_size;
>> @@ -613,7 +599,6 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>>   		mp->elt_va_start = (uintptr_t)obj;
>>   		mp->elt_pa[0] = mp->phys_addr +
>>   			(mp->elt_va_start - (uintptr_t)mp);
>> -
>>   	/* mempool elements in a separate chunk of memory. */
>>   	} else {
>>   		mp->elt_va_start = (uintptr_t)vaddr;
>> @@ -622,6 +607,10 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>>
>>   	mp->elt_va_end = mp->elt_va_start;
>>
>> +	/* Parameters are setup. Call the mempool handler alloc */
>> +	if ((rte_mempool_ext_alloc(mp, name, n, socket_id, flags)) == NULL)
>> +		goto exit;
>> +
>>   	/* call the initializer */
>>   	if (mp_init)
>>   		mp_init(mp, mp_init_arg);
>> @@ -646,7 +635,7 @@ rte_mempool_count(const struct rte_mempool *mp)
>>   {
>>   	unsigned count;
>>
>> -	count = rte_ring_count(mp->ring);
>> +	count = rte_mempool_ext_get_count(mp);
>>
>>   #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
>>   	{
>> @@ -681,7 +670,9 @@ rte_mempool_dump_cache(FILE *f, const struct rte_mempool *mp)
>>   	fprintf(f, "    cache_size=%"PRIu32"\n", mp->cache_size);
>>   	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
>>   		cache_count = mp->local_cache[lcore_id].len;
>> -		fprintf(f, "    cache_count[%u]=%u\n", lcore_id, cache_count);
>> +		if (cache_count > 0)
>> +			fprintf(f, "    cache_count[%u]=%u\n",
>> +						lcore_id, cache_count);
>>   		count += cache_count;
>>   	}
>>   	fprintf(f, "    total_cache_count=%u\n", count);
>> @@ -802,14 +793,13 @@ rte_mempool_dump(FILE *f, const struct rte_mempool *mp)
>>
>>   	fprintf(f, "mempool <%s>@%p\n", mp->name, mp);
>>   	fprintf(f, "  flags=%x\n", mp->flags);
>> -	fprintf(f, "  ring=<%s>@%p\n", mp->ring->name, mp->ring);
>>   	fprintf(f, "  phys_addr=0x%" PRIx64 "\n", mp->phys_addr);
>>   	fprintf(f, "  size=%"PRIu32"\n", mp->size);
>>   	fprintf(f, "  header_size=%"PRIu32"\n", mp->header_size);
>>   	fprintf(f, "  elt_size=%"PRIu32"\n", mp->elt_size);
>>   	fprintf(f, "  trailer_size=%"PRIu32"\n", mp->trailer_size);
>>   	fprintf(f, "  total_obj_size=%"PRIu32"\n",
>> -	       mp->header_size + mp->elt_size + mp->trailer_size);
>> +		   mp->header_size + mp->elt_size + mp->trailer_size);
>>
>>   	fprintf(f, "  private_data_size=%"PRIu32"\n", mp->private_data_size);
>>   	fprintf(f, "  pg_num=%"PRIu32"\n", mp->pg_num);
>> @@ -825,7 +815,7 @@ rte_mempool_dump(FILE *f, const struct rte_mempool *mp)
>>   			mp->size);
>>
>>   	cache_count = rte_mempool_dump_cache(f, mp);
>> -	common_count = rte_ring_count(mp->ring);
>> +	common_count = /* rte_ring_count(mp->ring)*/0;
>>   	if ((cache_count + common_count) > mp->size)
>>   		common_count = mp->size - cache_count;
>>   	fprintf(f, "  common_pool_count=%u\n", common_count);
>> @@ -904,7 +894,7 @@ rte_mempool_lookup(const char *name)
>>   }
>>
>>   void rte_mempool_walk(void (*func)(const struct rte_mempool *, void *),
>> -		      void *arg)
>> +			  void *arg)
>>   {
>>   	struct rte_tailq_entry *te = NULL;
>>   	struct rte_mempool_list *mempool_list;
>> @@ -919,3 +909,111 @@ void rte_mempool_walk(void (*func)(const struct rte_mempool *, void *),
>>
>>   	rte_rwlock_read_unlock(RTE_EAL_MEMPOOL_RWLOCK);
>>   }
>> +
>> +
>> +/* create the mempool using and external mempool manager */
>> +struct rte_mempool *
>> +rte_mempool_create_ext(const char *name, unsigned n, unsigned elt_size,
>> +			unsigned cache_size, unsigned private_data_size,
>> +			rte_mempool_ctor_t *mp_init, void *mp_init_arg,
>> +			rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
>> +			int socket_id, unsigned flags,
>> +			const char *handler_name)
>> +{
>> +	char mz_name[RTE_MEMZONE_NAMESIZE];
>> +	struct rte_mempool_list *mempool_list;
>> +	struct rte_mempool *mp = NULL;
>> +	struct rte_tailq_entry *te;
>> +	const struct rte_memzone *mz;
>> +	size_t mempool_size;
>> +	int mz_flags = RTE_MEMZONE_1GB|RTE_MEMZONE_SIZE_HINT_ONLY;
>> +	int rg_flags = 0;
>> +	int16_t handler_idx;
>> +
>> +	mempool_list = RTE_TAILQ_CAST(rte_mempool_tailq.head, rte_mempool_list);
>> +
>> +	/* asked cache too big */
>> +	if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE ||
>> +		CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
>> +		rte_errno = EINVAL;
>> +		return NULL;
>> +	}
>> +
>> +	handler_idx = rte_get_mempool_handler(handler_name);
>> +	if (handler_idx < 0) {
>> +		RTE_LOG(ERR, MEMPOOL, "Cannot find mempool handler by name!\n");
>> +		goto exit;
>> +	}
>> +
>> +	/* ring flags */
>> +	if (flags & MEMPOOL_F_SP_PUT)
>> +		rg_flags |= RING_F_SP_ENQ;
>> +	if (flags & MEMPOOL_F_SC_GET)
>> +		rg_flags |= RING_F_SC_DEQ;
>> +
>
> rg_flags  not used anywhere down


Thanks. I've removed them.


>> +	rte_rwlock_write_lock(RTE_EAL_MEMPOOL_RWLOCK);
>> +
>> +	/*
>> +	 * reserve a memory zone for this mempool: private data is
>> +	 * cache-aligned
>> +	 */
>> +	private_data_size = RTE_ALIGN_CEIL(private_data_size,
>> +							RTE_MEMPOOL_ALIGN);
>> +
>> +	/* try to allocate tailq entry */
>> +	te = rte_zmalloc("MEMPOOL_TAILQ_ENTRY", sizeof(*te), 0);
>> +	if (te == NULL) {
>> +		RTE_LOG(ERR, MEMPOOL, "Cannot allocate tailq entry!\n");
>> +		goto exit;
>> +	}
>> +
>> +	/*
>> +	 * If user provided an external memory buffer, then use it to
>> +	 * store mempool objects. Otherwise reserve a memzone that is large
>> +	 * enough to hold mempool header and metadata plus mempool objects.
>> +	 */
>> +	mempool_size = sizeof(*mp) + private_data_size;
>> +	mempool_size = RTE_ALIGN_CEIL(mempool_size, RTE_MEMPOOL_ALIGN);
>> +
>> +	snprintf(mz_name, sizeof(mz_name), RTE_MEMPOOL_MZ_FORMAT, name);
>> +
>> +	mz = rte_memzone_reserve(mz_name, mempool_size, socket_id, mz_flags);
>> +
>> +	/* no more memory */
>> +	if (mz == NULL) {
>> +		rte_free(te);
>> +		goto exit;
>> +	}
>> +
>> +	/* init the mempool structure */
>> +	mp = mz->addr;
>> +	memset(mp, 0, sizeof(*mp));
>> +	snprintf(mp->name, sizeof(mp->name), "%s", name);
>> +	mp->phys_addr = mz->phys_addr;
>> +	mp->size = n;
>> +	mp->flags = flags;
>> +	mp->cache_size = cache_size;
>> +	mp->cache_flushthresh = CALC_CACHE_FLUSHTHRESH(cache_size);
>> +	mp->private_data_size = private_data_size;
>> +	mp->handler_idx = handler_idx;
>> +	mp->elt_size = elt_size;
>> +	mp->rt_pool = rte_mempool_ext_alloc(mp, name, n, socket_id, flags);
>
>
> IMO, We can avoid the duplicaition of above code with rte_mempool_create.
> i.e  rte_mempool_create -> rte_mempool_create_ext(..,"ring_mp_mc")


rte_mempool_create is not really a subset of rte_mempool_create_ext, so 
doing this would not be possible. I did have a look at this before 
pusing the patch, but the code was so different in each case I decided 
to leave them as is. Maybe break out the section that sets up the 
mempool structure in to a separate functinon?


>> +
>> +	/* call the initializer */
>> +	if (mp_init)
>> +		mp_init(mp, mp_init_arg);
>> +
>> +	mempool_populate(mp, n, 1, obj_init, obj_init_arg);
>> +
>> +	te->data = (void *) mp;
>> +
>> +	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
>> +	TAILQ_INSERT_TAIL(mempool_list, te, next);
>> +	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
>> +
>> +exit:
>> +	rte_rwlock_write_unlock(RTE_EAL_MEMPOOL_RWLOCK);
>> +
>> +	return mp;
>> +
>> +}
>> diff --git a/lib/librte_mempool/rte_mempool.h b/lib/librte_mempool/rte_mempool.h
>> index 6e2390a..620cfb7 100644
>> --- a/lib/librte_mempool/rte_mempool.h
>> +++ b/lib/librte_mempool/rte_mempool.h
>> @@ -88,6 +88,8 @@ extern "C" {
>>   struct rte_mempool_debug_stats {
>>   	uint64_t put_bulk;         /**< Number of puts. */
>>   	uint64_t put_objs;         /**< Number of objects successfully put. */
>> +	uint64_t put_pool_bulk;    /**< Number of puts into pool. */
>> +	uint64_t put_pool_objs;    /**< Number of objects into pool. */
>>   	uint64_t get_success_bulk; /**< Successful allocation number. */
>>   	uint64_t get_success_objs; /**< Objects successfully allocated. */
>>   	uint64_t get_fail_bulk;    /**< Failed allocation number. */
>> @@ -123,6 +125,7 @@ struct rte_mempool_objsz {
>>   #define RTE_MEMPOOL_NAMESIZE 32 /**< Maximum length of a memory pool. */
>>   #define RTE_MEMPOOL_MZ_PREFIX "MP_"
>>
>> +
>>   /* "MP_<name>" */
>>   #define	RTE_MEMPOOL_MZ_FORMAT	RTE_MEMPOOL_MZ_PREFIX "%s"
>>
>> @@ -175,12 +178,85 @@ struct rte_mempool_objtlr {
>>   #endif
>>   };
>>
>> +/* Handler functions for external mempool support */
>> +typedef void *(*rte_mempool_alloc_t)(struct rte_mempool *mp,
>> +		const char *name, unsigned n, int socket_id, unsigned flags);
>> +typedef int (*rte_mempool_put_t)(void *p,
>> +		void * const *obj_table, unsigned n);
>> +typedef int (*rte_mempool_get_t)(void *p, void **obj_table,
>> +		unsigned n);
>> +typedef unsigned (*rte_mempool_get_count)(void *p);
>> +typedef int(*rte_mempool_free_t)(struct rte_mempool *mp);
>> +
>> +/**
>> + * @internal wrapper for external mempool manager alloc callback.
>> + *
>> + * @param mp
>> + *   Pointer to the memory pool.
>> + * @param name
>> + *   Name of the statistics field to increment in the memory pool.
>> + * @param n
>> + *   Number to add to the object-oriented statistics.
>> + * @param socket_id
>> + *   socket id on which to allocate.
>> + * @param flags
>> + *   general flags to allocate function
>> + */
>> +void *
>> +rte_mempool_ext_alloc(struct rte_mempool *mp,
>> +		const char *name, unsigned n, int socket_id, unsigned flags);
>> +
>> +/**
>> + * @internal wrapper for external mempool manager get callback.
>> + *
>> + * @param mp
>> + *   Pointer to the memory pool.
>> + * @param obj_table
>> + *   Pointer to a table of void * pointers (objects).
>> + * @param n
>> + *	 Number of objects to get
>> + */
>> +int
>> +rte_mempool_ext_get_bulk(struct rte_mempool *mp, void **obj_table,
>> +		unsigned n);
>> +
>> +/**
>> + * @internal wrapper for external mempool manager put callback.
>> + *
>> + * @param mp
>> + *   Pointer to the memory pool.
>> + * @param obj_table
>> + *   Pointer to a table of void * pointers (objects).
>> + * @param n
>> + *   Number of objects to put
>> + */
>> +int
>> +rte_mempool_ext_put_bulk(struct rte_mempool *mp, void * const *obj_table,
>> +		unsigned n);
>> +
>> +/**
>> + * @internal wrapper for external mempool manager get_count callback.
>> + *
>> + * @param mp
>> + *   Pointer to the memory pool.
>> + */
>> +int
>> +rte_mempool_ext_get_count(const struct rte_mempool *mp);
>> +
>> +/**
>> + * @internal wrapper for external mempool manager free callback.
>> + *
>> + * @param mp
>> + *   Pointer to the memory pool.
>> + */
>> +int
>> +rte_mempool_ext_free(struct rte_mempool *mp);
>> +
>>   /**
>>    * The RTE mempool structure.
>>    */
>>   struct rte_mempool {
>>   	char name[RTE_MEMPOOL_NAMESIZE]; /**< Name of mempool. */
>> -	struct rte_ring *ring;           /**< Ring to store objects. */
>>   	phys_addr_t phys_addr;           /**< Phys. addr. of mempool struct. */
>>   	int flags;                       /**< Flags of the mempool. */
>>   	uint32_t size;                   /**< Size of the mempool. */
>> @@ -194,6 +270,11 @@ struct rte_mempool {
>>
>>   	unsigned private_data_size;      /**< Size of private data. */
>>
>> +	/* Common pool data structure pointer */
>> +	void *rt_pool __rte_cache_aligned;
>
> Do we need to split rt_pool to next cache line, "cache_size"
> variable, etc are used in fast path, and one more cache line will occupy
> for this change

OK, I'll take out the split.

>> +
>> +	int16_t handler_idx;
>> +
>>   #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
>>   	/** Per-lcore local cache. */
>>   	struct rte_mempool_cache local_cache[RTE_MAX_LCORE];
>> @@ -223,6 +304,10 @@ struct rte_mempool {
>>   #define MEMPOOL_F_NO_CACHE_ALIGN 0x0002 /**< Do not align objs on cache lines.*/
>>   #define MEMPOOL_F_SP_PUT         0x0004 /**< Default put is "single-producer".*/
>>   #define MEMPOOL_F_SC_GET         0x0008 /**< Default get is "single-consumer".*/
>> +#define MEMPOOL_F_USE_STACK      0x0010 /**< Use a stack for the common pool. */
>> +#define MEMPOOL_F_USE_TM         0x0020
>> +#define MEMPOOL_F_NO_SECONDARY   0x0040
>> +
>>
>>   /**
>>    * @internal When debug is enabled, store some statistics.
>> @@ -728,7 +813,6 @@ rte_dom0_mempool_create(const char *name, unsigned n, unsigned elt_size,
>>   		rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
>>   		int socket_id, unsigned flags);
>>
>> -
>>   /**
>>    * Dump the status of the mempool to the console.
>>    *
>> @@ -753,7 +837,7 @@ void rte_mempool_dump(FILE *f, const struct rte_mempool *mp);
>>    */
>>   static inline void __attribute__((always_inline))
>>   __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
>> -		    unsigned n, int is_mp)
>> +		    unsigned n, __attribute__((unused)) int is_mp)
>>   {
>>   #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
>>   	struct rte_mempool_cache *cache;
>> @@ -769,8 +853,7 @@ __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
>>
>>   #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
>>   	/* cache is not enabled or single producer or non-EAL thread */
>> -	if (unlikely(cache_size == 0 || is_mp == 0 ||
>> -		     lcore_id >= RTE_MAX_LCORE))
>> +	if (unlikely(cache_size == 0 || lcore_id >= RTE_MAX_LCORE))
>>   		goto ring_enqueue;
>>
>>   	/* Go straight to ring if put would overflow mem allocated for cache */
>> @@ -793,8 +876,8 @@ __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
>>
>>   	cache->len += n;
>>
>> -	if (cache->len >= flushthresh) {
>> -		rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache_size],
>> +	if (unlikely(cache->len >= flushthresh)) {
>> +		rte_mempool_ext_put_bulk(mp, &cache->objs[cache_size],
>>   				cache->len - cache_size);
>>   		cache->len = cache_size;
>>   	}
>> @@ -804,22 +887,10 @@ __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
>>   ring_enqueue:
>>   #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
>>
>> -	/* push remaining objects in ring */
>> -#ifdef RTE_LIBRTE_MEMPOOL_DEBUG
>> -	if (is_mp) {
>> -		if (rte_ring_mp_enqueue_bulk(mp->ring, obj_table, n) < 0)
>> -			rte_panic("cannot put objects in mempool\n");
>> -	}
>> -	else {
>> -		if (rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n) < 0)
>> -			rte_panic("cannot put objects in mempool\n");
>> -	}
>> -#else
>> -	if (is_mp)
>> -		rte_ring_mp_enqueue_bulk(mp->ring, obj_table, n);
>> -	else
>> -		rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n);
>> -#endif
>> +	/* Increment stats counter to tell us how many pool puts happened */
>> +	__MEMPOOL_STAT_ADD(mp, put_pool, n);
>> +
>> +	rte_mempool_ext_put_bulk(mp, obj_table, n);
>>   }
>>
>>
>> @@ -943,7 +1014,7 @@ rte_mempool_put(struct rte_mempool *mp, void *obj)
>>    */
>>   static inline int __attribute__((always_inline))
>>   __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
>> -		   unsigned n, int is_mc)
>> +		   unsigned n, __attribute__((unused))int is_mc)
>>   {
>>   	int ret;
>>   #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
>> @@ -954,8 +1025,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
>>   	uint32_t cache_size = mp->cache_size;
>>
>>   	/* cache is not enabled or single consumer */
>> -	if (unlikely(cache_size == 0 || is_mc == 0 ||
>> -		     n >= cache_size || lcore_id >= RTE_MAX_LCORE))
>> +	if (unlikely(cache_size == 0 || n >= cache_size ||
>> +						lcore_id >= RTE_MAX_LCORE))
>>   		goto ring_dequeue;
>>
>>   	cache = &mp->local_cache[lcore_id];
>> @@ -967,7 +1038,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
>>   		uint32_t req = n + (cache_size - cache->len);
>>
>>   		/* How many do we require i.e. number to fill the cache + the request */
>> -		ret = rte_ring_mc_dequeue_bulk(mp->ring, &cache->objs[cache->len], req);
>> +		ret = rte_mempool_ext_get_bulk(mp,
>> +						&cache->objs[cache->len], req);
>>   		if (unlikely(ret < 0)) {
>>   			/*
>>   			 * In the offchance that we are buffer constrained,
>> @@ -995,10 +1067,7 @@ ring_dequeue:
>>   #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
>>
>>   	/* get remaining objects from ring */
>> -	if (is_mc)
>> -		ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, n);
>> -	else
>> -		ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);
>> +	ret = rte_mempool_ext_get_bulk(mp, obj_table, n);
>>
>>   	if (ret < 0)
>>   		__MEMPOOL_STAT_ADD(mp, get_fail, n);
>> @@ -1401,6 +1470,82 @@ ssize_t rte_mempool_xmem_usage(void *vaddr, uint32_t elt_num, size_t elt_sz,
>>   void rte_mempool_walk(void (*func)(const struct rte_mempool *, void *arg),
>>   		      void *arg);
>>
>> +/**
>> + * Function to get an index to an external mempool manager
>> + *
>> + * @param name
>> + *   The name of the mempool handler to search for in the list of handlers
>> + * @return
>> + *   The index of the mempool handler in the list of registered mempool
>> + *   handlers
>> + */
>> +int16_t
>> +rte_get_mempool_handler(const char *name);
>> +
>> +
>> +/**
>> + * Create a new mempool named *name* in memory.
>> + *
>> + * This function uses an externally defined alloc callback to allocate memory.
>> + * Its size is set to n elements.
>> + * All elements of the mempool are allocated separately to the mempool header.
>> + *
>> + * @param name
>> + *   The name of the mempool.
>> + * @param n
>> + *   The number of elements in the mempool. The optimum size (in terms of
>> + *   memory usage) for a mempool is when n is a power of two minus one:
>> + *   n = (2^q - 1).
>> + * @param cache_size
>> + *   If cache_size is non-zero, the rte_mempool library will try to
>> + *   limit the accesses to the common lockless pool, by maintaining a
>> + *   per-lcore object cache. This argument must be lower or equal to
>> + *   CONFIG_RTE_MEMPOOL_CACHE_MAX_SIZE and n / 1.5. It is advised to choose
>> + *   cache_size to have "n modulo cache_size == 0": if this is
>> + *   not the case, some elements will always stay in the pool and will
>> + *   never be used. The access to the per-lcore table is of course
>> + *   faster than the multi-producer/consumer pool. The cache can be
>> + *   disabled if the cache_size argument is set to 0; it can be useful to
>> + *   avoid losing objects in cache. Note that even if not used, the
>> + *   memory space for cache is always reserved in a mempool structure,
>> + *   except if CONFIG_RTE_MEMPOOL_CACHE_MAX_SIZE is set to 0.
>> + * @param private_data_size
>> + *   The size of the private data appended after the mempool
>> + *   structure. This is useful for storing some private data after the
>> + *   mempool structure, as is done for rte_mbuf_pool for example.
>> + * @param mp_init
>> + *   A function pointer that is called for initialization of the pool,
>> + *   before object initialization. The user can initialize the private
>> + *   data in this function if needed. This parameter can be NULL if
>> + *   not needed.
>> + * @param mp_init_arg
>> + *   An opaque pointer to data that can be used in the mempool
>> + *   constructor function.
>> + * @param obj_init
>> + *   A function pointer that is called for each object at
>> + *   initialization of the pool. The user can set some meta data in
>> + *   objects if needed. This parameter can be NULL if not needed.
>> + *   The obj_init() function takes the mempool pointer, the init_arg,
>> + *   the object pointer and the object number as parameters.
>> + * @param obj_init_arg
>> + *   An opaque pointer to data that can be used as an argument for
>> + *   each call to the object constructor function.
>> + * @param socket_id
>> + *   The *socket_id* argument is the socket identifier in the case of
>> + *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
>> + *   constraint for the reserved zone.
>> + * @param flags
>> + * @return
>> + *   The pointer to the new allocated mempool, on success. NULL on error
>> + */
>> +struct rte_mempool *
>> +rte_mempool_create_ext(const char *name, unsigned n, unsigned elt_size,
>> +		unsigned cache_size, unsigned private_data_size,
>> +		rte_mempool_ctor_t *mp_init, void *mp_init_arg,
>> +		rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
>> +		int socket_id, unsigned flags,
>> +		const char *handler_name);
>> +
>>   #ifdef __cplusplus
>>   }
>>   #endif
>> diff --git a/lib/librte_mempool/rte_mempool_default.c b/lib/librte_mempool/rte_mempool_default.c
>> new file mode 100644
>> index 0000000..2493dc1
>> --- /dev/null
>> +++ b/lib/librte_mempool/rte_mempool_default.c
>> @@ -0,0 +1,229 @@
>> +/*-
>> + *   BSD LICENSE
>> + *
>> + *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
>> + *   All rights reserved.
>> + *
>> + *   Redistribution and use in source and binary forms, with or without
>> + *   modification, are permitted provided that the following conditions
>> + *   are met:
>> + *
>> + *     * Redistributions of source code must retain the above copyright
>> + *       notice, this list of conditions and the following disclaimer.
>> + *     * Redistributions in binary form must reproduce the above copyright
>> + *       notice, this list of conditions and the following disclaimer in
>> + *       the documentation and/or other materials provided with the
>> + *       distribution.
>> + *     * Neither the name of Intel Corporation nor the names of its
>> + *       contributors may be used to endorse or promote products derived
>> + *       from this software without specific prior written permission.
>> + *
>> + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
>> + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
>> + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
>> + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
>> + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
>> + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
>> + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
>> + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
>> + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
>> + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
>> + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
>> + */
>> +
>> +#include <stdio.h>
>> +#include <rte_mempool.h>
>> +#include <rte_malloc.h>
>> +#include <string.h>
>> +
>> +#include "rte_mempool_internal.h"
>> +
>> +/*
>> + * Indirect jump table to support external memory pools
>> + */
>> +struct rte_mempool_handler_list mempool_handler_list = {
>> +	.sl =  RTE_SPINLOCK_INITIALIZER ,
>> +	.num_handlers = 0
>> +};
>> +
>> +/* TODO Convert to older mechanism of an array of stucts */
>> +int16_t
>> +add_handler(struct rte_mempool_handler *h)
>> +{
>> +	int16_t handler_idx;
>> +
>> +	/*  */
>> +	rte_spinlock_lock(&mempool_handler_list.sl);
>> +
>> +	/* Check whether jump table has space */
>> +	if (mempool_handler_list.num_handlers >= RTE_MEMPOOL_MAX_HANDLER_IDX) {
>> +		rte_spinlock_unlock(&mempool_handler_list.sl);
>> +		RTE_LOG(ERR, MEMPOOL,
>> +				"Maximum number of mempool handlers exceeded\n");
>> +		return -1;
>> +	}
>> +
>> +	if ((h->put == NULL) || (h->get == NULL) ||
>> +		(h->get_count == NULL)) {
>> +		rte_spinlock_unlock(&mempool_handler_list.sl);
>> +		 RTE_LOG(ERR, MEMPOOL,
>> +					"Missing callback while registering mempool handler\n");
>> +		return -1;
>> +	}
>> +
>> +	/* add new handler index */
>> +	handler_idx = mempool_handler_list.num_handlers++;
>> +
>> +	snprintf(mempool_handler_list.handler[handler_idx].name,
>> +				RTE_MEMPOOL_NAMESIZE, "%s", h->name);
>> +	mempool_handler_list.handler[handler_idx].alloc = h->alloc;
>> +	mempool_handler_list.handler[handler_idx].put = h->put;
>> +	mempool_handler_list.handler[handler_idx].get = h->get;
>> +	mempool_handler_list.handler[handler_idx].get_count = h->get_count;
>> +
>> +	rte_spinlock_unlock(&mempool_handler_list.sl);
>> +
>> +	return handler_idx;
>> +}
>> +
>> +/* TODO Convert to older mechanism of an array of stucts */
>> +int16_t
>> +rte_get_mempool_handler(const char *name)
>> +{
>> +	int16_t i;
>> +
>> +	for (i = 0; i < mempool_handler_list.num_handlers; i++) {
>> +		if (!strcmp(name, mempool_handler_list.handler[i].name))
>> +			return i;
>> +	}
>> +	return -1;
>> +}
>> +
>> +static int
>> +common_ring_mp_put(void *p, void * const *obj_table, unsigned n)
>> +{
>> +	return rte_ring_mp_enqueue_bulk((struct rte_ring *)p, obj_table, n);
>> +}
>> +
>> +static int
>> +common_ring_sp_put(void *p, void * const *obj_table, unsigned n)
>> +{
>> +	return rte_ring_sp_enqueue_bulk((struct rte_ring *)p, obj_table, n);
>> +}
>> +
>> +static int
>> +common_ring_mc_get(void *p, void **obj_table, unsigned n)
>> +{
>> +	return rte_ring_mc_dequeue_bulk((struct rte_ring *)p, obj_table, n);
>> +}
>> +
>> +static int
>> +common_ring_sc_get(void *p, void **obj_table, unsigned n)
>> +{
>> +	return rte_ring_sc_dequeue_bulk((struct rte_ring *)p, obj_table, n);
>> +}
>> +
>> +static unsigned
>> +common_ring_get_count(void *p)
>> +{
>> +	return rte_ring_count((struct rte_ring *)p);
>> +}
>> +
>> +
>> +static void *
>> +rte_mempool_common_ring_alloc(struct rte_mempool *mp,
>> +		const char *name, unsigned n, int socket_id, unsigned flags)
>> +{
>> +	struct rte_ring *r;
>> +	char rg_name[RTE_RING_NAMESIZE];
>> +	int rg_flags = 0;
>> +
>> +	if (flags & MEMPOOL_F_SP_PUT)
>> +		rg_flags |= RING_F_SP_ENQ;
>> +	if (flags & MEMPOOL_F_SC_GET)
>> +		rg_flags |= RING_F_SC_DEQ;
>> +
>> +	/* allocate the ring that will be used to store objects */
>> +	/* Ring functions will return appropriate errors if we are
>> +	 * running as a secondary process etc., so no checks made
>> +	 * in this function for that condition */
>> +	snprintf(rg_name, sizeof(rg_name), "%s-ring", name);
>> +	r = rte_ring_create(rg_name, rte_align32pow2(n+1), socket_id, rg_flags);
>> +	if (r == NULL)
>> +		return NULL;
>> +
>> +	mp->rt_pool = (void *)r;
>> +
>> +	return (void *) r;
>> +}
>> +
>> +void *
>> +rte_mempool_ext_alloc(struct rte_mempool *mp,
>> +		const char *name, unsigned n, int socket_id, unsigned flags)
>> +{
>> +	if (mempool_handler_list.handler[mp->handler_idx].alloc) {
>> +		return (mempool_handler_list.handler[mp->handler_idx].alloc)
>> +						(mp, name, n, socket_id, flags);
>> +	}
>> +	return NULL;
>> +}
>> +
>> +inline int __attribute__((always_inline))
>> +rte_mempool_ext_get_bulk(struct rte_mempool *mp, void **obj_table, unsigned n)
>> +{
>> +	return (mempool_handler_list.handler[mp->handler_idx].get)
>> +						(mp->rt_pool, obj_table, n);
>> +}
>> +
>> +inline int __attribute__((always_inline))
>> +rte_mempool_ext_put_bulk(struct rte_mempool *mp, void * const *obj_table,
>> +		unsigned n)
>> +{
>> +	return (mempool_handler_list.handler[mp->handler_idx].put)
>> +						(mp->rt_pool, obj_table, n);
>> +}
>> +
>> +int
>> +rte_mempool_ext_get_count(const struct rte_mempool *mp)
>> +{
>> +	return (mempool_handler_list.handler[mp->handler_idx].get_count)
>> +						(mp->rt_pool);
>> +}
>> +
>> +static struct rte_mempool_handler handler_mp_mc = {
>> +	.name = "ring_mp_mc",
>> +	.alloc = rte_mempool_common_ring_alloc,
>> +	.put = common_ring_mp_put,
>> +	.get = common_ring_mc_get,
>> +	.get_count = common_ring_get_count,
>> +	.free = NULL
>> +};
>> +static struct rte_mempool_handler handler_sp_sc = {
>> +	.name = "ring_sp_sc",
>> +	.alloc = rte_mempool_common_ring_alloc,
>> +	.put = common_ring_sp_put,
>> +	.get = common_ring_sc_get,
>> +	.get_count = common_ring_get_count,
>> +	.free = NULL
>> +};
>> +static struct rte_mempool_handler handler_mp_sc = {
>> +	.name = "ring_mp_sc",
>> +	.alloc = rte_mempool_common_ring_alloc,
>> +	.put = common_ring_mp_put,
>> +	.get = common_ring_sc_get,
>> +	.get_count = common_ring_get_count,
>> +	.free = NULL
>> +};
>> +static struct rte_mempool_handler handler_sp_mc = {
>> +	.name = "ring_sp_mc",
>> +	.alloc = rte_mempool_common_ring_alloc,
>> +	.put = common_ring_sp_put,
>> +	.get = common_ring_mc_get,
>> +	.get_count = common_ring_get_count,
>> +	.free = NULL
>> +};
>> +
>> +REGISTER_MEMPOOL_HANDLER(handler_mp_mc);
>> +REGISTER_MEMPOOL_HANDLER(handler_sp_sc);
>> +REGISTER_MEMPOOL_HANDLER(handler_mp_sc);
>> +REGISTER_MEMPOOL_HANDLER(handler_sp_mc);
>> diff --git a/lib/librte_mempool/rte_mempool_internal.h b/lib/librte_mempool/rte_mempool_internal.h
>> new file mode 100644
>> index 0000000..92b7bde
>> --- /dev/null
>> +++ b/lib/librte_mempool/rte_mempool_internal.h
>> @@ -0,0 +1,74 @@
>> +/*-
>> + *   BSD LICENSE
>> + *
>> + *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
>> + *   All rights reserved.
>> + *
>> + *   Redistribution and use in source and binary forms, with or without
>> + *   modification, are permitted provided that the following conditions
>> + *   are met:
>> + *
>> + *     * Redistributions of source code must retain the above copyright
>> + *       notice, this list of conditions and the following disclaimer.
>> + *     * Redistributions in binary form must reproduce the above copyright
>> + *       notice, this list of conditions and the following disclaimer in
>> + *       the documentation and/or other materials provided with the
>> + *       distribution.
>> + *     * Neither the name of Intel Corporation nor the names of its
>> + *       contributors may be used to endorse or promote products derived
>> + *       from this software without specific prior written permission.
>> + *
>> + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
>> + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
>> + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
>> + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
>> + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
>> + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
>> + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
>> + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
>> + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
>> + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
>> + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
>> + */
>> +
>> +#ifndef _RTE_MEMPOOL_INTERNAL_H_
>> +#define _RTE_MEMPOOL_INTERNAL_H_
>> +
>> +#include <rte_spinlock.h>
>> +#include <rte_mempool.h>
>> +
>> +#define RTE_MEMPOOL_MAX_HANDLER_IDX 16
>> +
>> +struct rte_mempool_handler {
>> +	char name[RTE_MEMPOOL_NAMESIZE]; /**< Name of mempool handler */
>> +
>> +	rte_mempool_alloc_t alloc;
>> +
>> +	rte_mempool_put_t put __rte_cache_aligned;
>> +
>> +	rte_mempool_get_t get __rte_cache_aligned;
>> +
>> +	rte_mempool_get_count get_count __rte_cache_aligned;
>> +
>> +	rte_mempool_free_t free __rte_cache_aligned;
>> +};
>
> IMO, The structure should cache aligned not the individual
> elements as elements are likely read only in fast path.


OK, Will have a look at some variations and take a look at performance.


>> +
>> +struct rte_mempool_handler_list {
>> +	rte_spinlock_t sl;		  /**< Spinlock for add/delete. */
>> +
>> +	int32_t num_handlers;	  /**< Number of handlers that are valid. */
>> +
>> +	/* storage for all possible handlers */
>> +	struct rte_mempool_handler handler[RTE_MEMPOOL_MAX_HANDLER_IDX];
>> +};
>> +
>> +int16_t add_handler(struct rte_mempool_handler *h);
>> +
>> +#define REGISTER_MEMPOOL_HANDLER(h) \
>> +static int16_t __attribute__((used)) testfn_##h(void);\
>> +int16_t __attribute__((constructor, used)) testfn_##h(void)\
>> +{\
>> +	return add_handler(&h);\
>> +}
>> +
>> +#endif
>> --
>> 1.9.3
>>

Thanks, comments appreciated.
Regard,
David.
  
Jerin Jacob Feb. 4, 2016, 1:23 p.m. UTC | #3
On Wed, Feb 03, 2016 at 02:16:06PM +0000, Hunt, David wrote:
> On 28/01/2016 17:52, Jerin Jacob wrote:
> >On Tue, Jan 26, 2016 at 05:25:51PM +0000, David Hunt wrote:
> >>Adds the new rte_mempool_create_ext api and callback mechanism for
> >>external mempool handlers
> >>
> >>Modifies the existing rte_mempool_create to set up the handler_idx to
> >>the relevant mempool handler based on the handler name:
> >>	ring_sp_sc
> >>	ring_mp_mc
> >>	ring_sp_mc
> >>	ring_mp_sc
> >>
> >>Signed-off-by: David Hunt <david.hunt@intel.com>
> >>---

[snip]

> >>+	if (flags & MEMPOOL_F_USE_STACK)
> >>+		mp->handler_idx = rte_get_mempool_handler("stack");
> >>+	else if (flags & (MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET))
> >>+		mp->handler_idx = rte_get_mempool_handler("ring_sp_sc");
> >>+	else if (flags & MEMPOOL_F_SP_PUT)
> >>+		mp->handler_idx = rte_get_mempool_handler("ring_sp_mc");
> >>+	else if (flags & MEMPOOL_F_SC_GET)
> >>+		mp->handler_idx = rte_get_mempool_handler("ring_mp_sc");
> >>+	else
> >>+		mp->handler_idx = rte_get_mempool_handler("ring_mp_mc");
> >>+
> >
> >Why still use flag based selection? Why not "name" based? See below
> >for more description
> 
> 
> The old API does not have a 'name' parameter, so needs to work out which
> handler to use based on the flags. This is not necessary in the new API
> call, and so it uses the "name" based index.
> 
I agree. But the old API to new API mapping still can be done like
below,
rte_mempool_create -> rte_mempool_create_ext(..,"ring_mp_mc")

[snip]

> 
> >>+	/* init the mempool structure */
> >>+	mp = mz->addr;
> >>+	memset(mp, 0, sizeof(*mp));
> >>+	snprintf(mp->name, sizeof(mp->name), "%s", name);
> >>+	mp->phys_addr = mz->phys_addr;
> >>+	mp->size = n;
> >>+	mp->flags = flags;
> >>+	mp->cache_size = cache_size;
> >>+	mp->cache_flushthresh = CALC_CACHE_FLUSHTHRESH(cache_size);
> >>+	mp->private_data_size = private_data_size;
> >>+	mp->handler_idx = handler_idx;
> >>+	mp->elt_size = elt_size;
> >>+	mp->rt_pool = rte_mempool_ext_alloc(mp, name, n, socket_id, flags);
> >
> >
> >IMO, We can avoid the duplicaition of above code with rte_mempool_create.
> >i.e  rte_mempool_create -> rte_mempool_create_ext(..,"ring_mp_mc")
> 
> 
> rte_mempool_create is not really a subset of rte_mempool_create_ext, so
> doing this would not be possible. I did have a look at this before pusing
> the patch, but the code was so different in each case I decided to leave
> them as is. Maybe break out the section that sets up the mempool structure
> in to a separate functinon?

Yes, Their are a lot of common code between 
rte_mempool_create/rte_mempool_xmem_create and rte_mempool_create_ext.

IMO, we need to converge these functions. Otherwise, a new feature in
mempool would call for changing in both places and it's difficult to
maintain.
In my view, both external and internal pool manager should have the same
code for creation and just the backend put/get/alloc function
pointers will be different to maintain the functional consistency.

Thanks, comments appreciated.
Jerin
  
Olivier Matz Feb. 4, 2016, 2:52 p.m. UTC | #4
Hi David,

Nice work, thanks !
Please see some comments below.


On 01/26/2016 06:25 PM, David Hunt wrote:

> diff --git a/lib/librte_mempool/rte_mempool.c b/lib/librte_mempool/rte_mempool.c
> index aff5f6d..8c01838 100644
> --- a/lib/librte_mempool/rte_mempool.c
> +++ b/lib/librte_mempool/rte_mempool.c
> @@ -375,48 +376,28 @@ rte_mempool_xmem_usage(void *vaddr, uint32_t elt_num, size_t elt_sz,
>   	return usz;
>   }
>
> -#ifndef RTE_LIBRTE_XEN_DOM0
> -/* stub if DOM0 support not configured */
> -struct rte_mempool *
> -rte_dom0_mempool_create(const char *name __rte_unused,
> -			unsigned n __rte_unused,
> -			unsigned elt_size __rte_unused,
> -			unsigned cache_size __rte_unused,
> -			unsigned private_data_size __rte_unused,
> -			rte_mempool_ctor_t *mp_init __rte_unused,
> -			void *mp_init_arg __rte_unused,
> -			rte_mempool_obj_ctor_t *obj_init __rte_unused,
> -			void *obj_init_arg __rte_unused,
> -			int socket_id __rte_unused,
> -			unsigned flags __rte_unused)
> -{
> -	rte_errno = EINVAL;
> -	return NULL;
> -}
> -#endif
> -

Could we move this is a separated commit?
"mempool: remove unused rte_dom0_mempool_create stub"


>   /* create the mempool */
>   struct rte_mempool *
>   rte_mempool_create(const char *name, unsigned n, unsigned elt_size,
> -		   unsigned cache_size, unsigned private_data_size,
> -		   rte_mempool_ctor_t *mp_init, void *mp_init_arg,
> -		   rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
> -		   int socket_id, unsigned flags)
> +			unsigned cache_size, unsigned private_data_size,
> +			rte_mempool_ctor_t *mp_init, void *mp_init_arg,
> +			rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
> +			int socket_id, unsigned flags)
>   {
>   	if (rte_xen_dom0_supported())
>   		return rte_dom0_mempool_create(name, n, elt_size,
> -					       cache_size, private_data_size,
> -					       mp_init, mp_init_arg,
> -					       obj_init, obj_init_arg,
> -					       socket_id, flags);
> +			cache_size, private_data_size,
> +			mp_init, mp_init_arg,
> +			obj_init, obj_init_arg,
> +			socket_id, flags);
>   	else
>   		return rte_mempool_xmem_create(name, n, elt_size,
> -					       cache_size, private_data_size,
> -					       mp_init, mp_init_arg,
> -					       obj_init, obj_init_arg,
> -					       socket_id, flags,
> -					       NULL, NULL, MEMPOOL_PG_NUM_DEFAULT,
> -					       MEMPOOL_PG_SHIFT_MAX);
> +			cache_size, private_data_size,
> +			mp_init, mp_init_arg,
> +			obj_init, obj_init_arg,
> +			socket_id, flags,
> +			NULL, NULL,
> +			MEMPOOL_PG_NUM_DEFAULT, MEMPOOL_PG_SHIFT_MAX);
>   }

As far as I can see, you are not modifying the code here, only the
style. For better readability, it should go in another commit that
only fixes indent or style issues.

Also, I think the proper indentation is to use only one tab for the
subsequent lines.
The coding style guide (doc/guides/contributing/coding_style.rst) is
not very clear about this however.

> @@ -469,7 +448,7 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>
>   	/* asked cache too big */
>   	if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE ||
> -	    CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
> +		CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
>   		rte_errno = EINVAL;
>   		return NULL;
>   	}

same here.


> @@ -598,6 +568,22 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>   	mp->cache_flushthresh = CALC_CACHE_FLUSHTHRESH(cache_size);
>   	mp->private_data_size = private_data_size;
>
> +	/*
> +	 * Since we have 4 combinations of the SP/SC/MP/MC, and stack,
> +	 * examine the
> +	 * flags to set the correct index into the handler table.
> +	 */

nit: comment style is not correct


> +	if (flags & MEMPOOL_F_USE_STACK)
> +		mp->handler_idx = rte_get_mempool_handler("stack");

The stack handler does not exist yet, it is introduced in the next
commit. I think this code should be moved in the next commit too.


> @@ -622,6 +607,10 @@ rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
>
>   	mp->elt_va_end = mp->elt_va_start;
>
> +	/* Parameters are setup. Call the mempool handler alloc */
> +	if ((rte_mempool_ext_alloc(mp, name, n, socket_id, flags)) == NULL)
> +		goto exit;
> +

I think some memory needs to be freed here. At least 'te'.
The memzone 'mz' is never freed in this function, even before your
patch, but since Sergio's patch (commit ff909fe21f), we could fix
that issue too.
I can submit a patch for it, or if you prefer, you can fix it in
a separate patch of your series, just let me know.

> @@ -681,7 +670,9 @@ rte_mempool_dump_cache(FILE *f, const struct rte_mempool *mp)
>   	fprintf(f, "    cache_size=%"PRIu32"\n", mp->cache_size);
>   	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
>   		cache_count = mp->local_cache[lcore_id].len;
> -		fprintf(f, "    cache_count[%u]=%u\n", lcore_id, cache_count);
> +		if (cache_count > 0)
> +			fprintf(f, "    cache_count[%u]=%u\n",
> +						lcore_id, cache_count);
>   		count += cache_count;
>   	}
>   	fprintf(f, "    total_cache_count=%u\n", count);

This could also be moved in a separate commit.


> @@ -802,14 +793,13 @@ rte_mempool_dump(FILE *f, const struct rte_mempool *mp)
>   	fprintf(f, "  elt_size=%"PRIu32"\n", mp->elt_size);
>   	fprintf(f, "  trailer_size=%"PRIu32"\n", mp->trailer_size);
>   	fprintf(f, "  total_obj_size=%"PRIu32"\n",
> -	       mp->header_size + mp->elt_size + mp->trailer_size);
> +		   mp->header_size + mp->elt_size + mp->trailer_size);
>
>   	fprintf(f, "  private_data_size=%"PRIu32"\n", mp->private_data_size);
>   	fprintf(f, "  pg_num=%"PRIu32"\n", mp->pg_num);

to be moved in the "style" commit.

> @@ -825,7 +815,7 @@ rte_mempool_dump(FILE *f, const struct rte_mempool *mp)
>   			mp->size);
>
>   	cache_count = rte_mempool_dump_cache(f, mp);
> -	common_count = rte_ring_count(mp->ring);
> +	common_count = /* rte_ring_count(mp->ring)*/0;
>   	if ((cache_count + common_count) > mp->size)
>   		common_count = mp->size - cache_count;
>   	fprintf(f, "  common_pool_count=%u\n", common_count);

should it be rte_mempool_ext_get_count(mp) instead?

> @@ -904,7 +894,7 @@ rte_mempool_lookup(const char *name)
>   }
>
>   void rte_mempool_walk(void (*func)(const struct rte_mempool *, void *),
> -		      void *arg)
> +			  void *arg)
>   {
>   	struct rte_tailq_entry *te = NULL;
>   	struct rte_mempool_list *mempool_list;

to be moved in the "style" commit.


> @@ -919,3 +909,111 @@ void rte_mempool_walk(void (*func)(const struct rte_mempool *, void *),
>
>   	rte_rwlock_read_unlock(RTE_EAL_MEMPOOL_RWLOCK);
>   }
> +
> +
> +/* create the mempool using and external mempool manager */
> +struct rte_mempool *
> +rte_mempool_create_ext(const char *name, unsigned n, unsigned elt_size,
> +			unsigned cache_size, unsigned private_data_size,
> +			rte_mempool_ctor_t *mp_init, void *mp_init_arg,
> +			rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
> +			int socket_id, unsigned flags,
> +			const char *handler_name)
> +{

I would have used one tab here for subsequent lines.


> +	char mz_name[RTE_MEMZONE_NAMESIZE];
> +	struct rte_mempool_list *mempool_list;
> +	struct rte_mempool *mp = NULL;
> +	struct rte_tailq_entry *te;
> +	const struct rte_memzone *mz;
> +	size_t mempool_size;
> +	int mz_flags = RTE_MEMZONE_1GB|RTE_MEMZONE_SIZE_HINT_ONLY;
> +	int rg_flags = 0;
> +	int16_t handler_idx;
> +
> +	mempool_list = RTE_TAILQ_CAST(rte_mempool_tailq.head, rte_mempool_list);
> +
> +	/* asked cache too big */
> +	if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE ||
> +		CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
> +		rte_errno = EINVAL;
> +		return NULL;
> +	}
> +
> +	handler_idx = rte_get_mempool_handler(handler_name);
> +	if (handler_idx < 0) {
> +		RTE_LOG(ERR, MEMPOOL, "Cannot find mempool handler by name!\n");
> +		goto exit;
> +	}
> +
> +	/* ring flags */
> +	if (flags & MEMPOOL_F_SP_PUT)
> +		rg_flags |= RING_F_SP_ENQ;
> +	if (flags & MEMPOOL_F_SC_GET)
> +		rg_flags |= RING_F_SC_DEQ;
> +
> ...

I have the same comment than Jerin here. I think it should be
factorized with rte_mempool_xmem_create() if possible. Maybe a
at least a function rte_mempool_init() could be introduced, in
the same model than rte_ring_init().


> diff --git a/lib/librte_mempool/rte_mempool.h b/lib/librte_mempool/rte_mempool.h
> index 6e2390a..620cfb7 100644
> --- a/lib/librte_mempool/rte_mempool.h
> +++ b/lib/librte_mempool/rte_mempool.h
> @@ -88,6 +88,8 @@ extern "C" {
>   struct rte_mempool_debug_stats {
>   	uint64_t put_bulk;         /**< Number of puts. */
>   	uint64_t put_objs;         /**< Number of objects successfully put. */
> +	uint64_t put_pool_bulk;    /**< Number of puts into pool. */
> +	uint64_t put_pool_objs;    /**< Number of objects into pool. */
>   	uint64_t get_success_bulk; /**< Successful allocation number. */
>   	uint64_t get_success_objs; /**< Objects successfully allocated. */
>   	uint64_t get_fail_bulk;    /**< Failed allocation number. */

I think the comment of put_pool_objs is not very clear.
Shouldn't we have the same stats for get?


> @@ -123,6 +125,7 @@ struct rte_mempool_objsz {
>   #define RTE_MEMPOOL_NAMESIZE 32 /**< Maximum length of a memory pool. */
>   #define RTE_MEMPOOL_MZ_PREFIX "MP_"
>
> +
>   /* "MP_<name>" */
>   #define	RTE_MEMPOOL_MZ_FORMAT	RTE_MEMPOOL_MZ_PREFIX "%s"
>

to be removed

> @@ -175,12 +178,85 @@ struct rte_mempool_objtlr {
>   #endif
>   };
>
> +/* Handler functions for external mempool support */
> +typedef void *(*rte_mempool_alloc_t)(struct rte_mempool *mp,
> +		const char *name, unsigned n, int socket_id, unsigned flags);
> +typedef int (*rte_mempool_put_t)(void *p,
> +		void * const *obj_table, unsigned n);
> +typedef int (*rte_mempool_get_t)(void *p, void **obj_table,
> +		unsigned n);
> +typedef unsigned (*rte_mempool_get_count)(void *p);
> +typedef int(*rte_mempool_free_t)(struct rte_mempool *mp);

a space is missing after 'int'.


> +
> +/**
> + * @internal wrapper for external mempool manager alloc callback.
> + *
> + * @param mp
> + *   Pointer to the memory pool.
> + * @param name
> + *   Name of the statistics field to increment in the memory pool.
> + * @param n
> + *   Number to add to the object-oriented statistics.

Are this comments correct?


> + * @param socket_id
> + *   socket id on which to allocate.
> + * @param flags
> + *   general flags to allocate function

We could add that we are talking about MEMPOOL_F_* flags.

By the way, the '@return' is missing in all declarations.


> +/**
> + * @internal wrapper for external mempool manager get_count callback.
> + *
> + * @param mp
> + *   Pointer to the memory pool.
> + */
> +int
> +rte_mempool_ext_get_count(const struct rte_mempool *mp);

should it be unsigned instead of int?


> +
> +/**
> + * @internal wrapper for external mempool manager free callback.
> + *
> + * @param mp
> + *   Pointer to the memory pool.
> + */
> +int
> +rte_mempool_ext_free(struct rte_mempool *mp);
> +
>   /**
>    * The RTE mempool structure.
>    */
>   struct rte_mempool {
>   	char name[RTE_MEMPOOL_NAMESIZE]; /**< Name of mempool. */
> -	struct rte_ring *ring;           /**< Ring to store objects. */
>   	phys_addr_t phys_addr;           /**< Phys. addr. of mempool struct. */
>   	int flags;                       /**< Flags of the mempool. */
>   	uint32_t size;                   /**< Size of the mempool. */
> @@ -194,6 +270,11 @@ struct rte_mempool {
>
>   	unsigned private_data_size;      /**< Size of private data. */
>
> +	/* Common pool data structure pointer */
> +	void *rt_pool __rte_cache_aligned;

What is the meaning of rt_pool?


> +
> +	int16_t handler_idx;
> +

I don't think I'm getting why an index is better than a pointer to
the struct rte_mempool_handler. It would simplify the add_handler()
function. See below for a detailed explaination.


> @@ -223,6 +304,10 @@ struct rte_mempool {
>   #define MEMPOOL_F_NO_CACHE_ALIGN 0x0002 /**< Do not align objs on cache lines.*/
>   #define MEMPOOL_F_SP_PUT         0x0004 /**< Default put is "single-producer".*/
>   #define MEMPOOL_F_SC_GET         0x0008 /**< Default get is "single-consumer".*/
> +#define MEMPOOL_F_USE_STACK      0x0010 /**< Use a stack for the common pool. */

Stack is not implemented in this commit. It should be moved in next
commit.


> +#define MEMPOOL_F_USE_TM         0x0020
> +#define MEMPOOL_F_NO_SECONDARY   0x0040
> +

What are these flags?


> @@ -728,7 +813,6 @@ rte_dom0_mempool_create(const char *name, unsigned n, unsigned elt_size,
>   		rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
>   		int socket_id, unsigned flags);
>
> -
>   /**
>    * Dump the status of the mempool to the console.
>    *

style


> @@ -753,7 +837,7 @@ void rte_mempool_dump(FILE *f, const struct rte_mempool *mp);
>    */
>   static inline void __attribute__((always_inline))
>   __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
> -		    unsigned n, int is_mp)
> +		    unsigned n, __attribute__((unused)) int is_mp)

You could use __rte_unused instead of __attribute__((unused))


> @@ -769,8 +853,7 @@ __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
>
>   #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
>   	/* cache is not enabled or single producer or non-EAL thread */
> -	if (unlikely(cache_size == 0 || is_mp == 0 ||
> -		     lcore_id >= RTE_MAX_LCORE))
> +	if (unlikely(cache_size == 0 || lcore_id >= RTE_MAX_LCORE))
>   		goto ring_enqueue;
>
>   	/* Go straight to ring if put would overflow mem allocated for cache */

If I understand well, we now always use the cache, even if the mempool
is single-producer. I was wondering if it would have a performance
impact... I suppose that using the cache is more efficient than the ring
in single-producer mode, so it may increase performance. Do you have an
idea of the impact here?

I think we could remove the parameter as the function is marked as
internal. The comment above should also be fixed. The same comments
apply to the get() functions.


> @@ -793,8 +876,8 @@ __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
>
>   	cache->len += n;
>
> -	if (cache->len >= flushthresh) {
> -		rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache_size],
> +	if (unlikely(cache->len >= flushthresh)) {
> +		rte_mempool_ext_put_bulk(mp, &cache->objs[cache_size],
>   				cache->len - cache_size);

Shouldn't we add a __MEMPOOL_STAT_ADD(mp, put_pool,
   cache->len - cache_size) here ?

> @@ -954,8 +1025,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
>   	uint32_t cache_size = mp->cache_size;
>
>   	/* cache is not enabled or single consumer */
> -	if (unlikely(cache_size == 0 || is_mc == 0 ||
> -		     n >= cache_size || lcore_id >= RTE_MAX_LCORE))
> +	if (unlikely(cache_size == 0 || n >= cache_size ||
> +						lcore_id >= RTE_MAX_LCORE))

incorrect indent


> @@ -967,7 +1038,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
>   		uint32_t req = n + (cache_size - cache->len);
>
>   		/* How many do we require i.e. number to fill the cache + the request */
> -		ret = rte_ring_mc_dequeue_bulk(mp->ring, &cache->objs[cache->len], req);
> +		ret = rte_mempool_ext_get_bulk(mp,
> +						&cache->objs[cache->len], req);

indent


> +/**
> + * Function to get an index to an external mempool manager
> + *
> + * @param name
> + *   The name of the mempool handler to search for in the list of handlers
> + * @return
> + *   The index of the mempool handler in the list of registered mempool
> + *   handlers
> + */
> +int16_t
> +rte_get_mempool_handler(const char *name);

I would prefer a function like this:

const struct rte_mempool_handler *
rte_get_mempool_handler(const char *name);

(detailed explaination below)


> diff --git a/lib/librte_mempool/rte_mempool_default.c b/lib/librte_mempool/rte_mempool_default.c
> new file mode 100644
> index 0000000..2493dc1
> --- /dev/null
> +++ b/lib/librte_mempool/rte_mempool_default.c
> +#include "rte_mempool_internal.h"
> +
> +/*
> + * Indirect jump table to support external memory pools
> + */
> +struct rte_mempool_handler_list mempool_handler_list = {
> +	.sl =  RTE_SPINLOCK_INITIALIZER ,
> +	.num_handlers = 0
> +};
> +
> +/* TODO Convert to older mechanism of an array of stucts */
> +int16_t
> +add_handler(struct rte_mempool_handler *h)
> +{
> +	int16_t handler_idx;
> +
> +	/*  */
> +	rte_spinlock_lock(&mempool_handler_list.sl);
> +
> +	/* Check whether jump table has space */
> +	if (mempool_handler_list.num_handlers >= RTE_MEMPOOL_MAX_HANDLER_IDX) {
> +		rte_spinlock_unlock(&mempool_handler_list.sl);
> +		RTE_LOG(ERR, MEMPOOL,
> +				"Maximum number of mempool handlers exceeded\n");
> +		return -1;
> +	}
> +
> +	if ((h->put == NULL) || (h->get == NULL) ||
> +		(h->get_count == NULL)) {
> +		rte_spinlock_unlock(&mempool_handler_list.sl);
> +		 RTE_LOG(ERR, MEMPOOL,
> +					"Missing callback while registering mempool handler\n");
> +		return -1;
> +	}
> +
> +	/* add new handler index */
> +	handler_idx = mempool_handler_list.num_handlers++;
> +
> +	snprintf(mempool_handler_list.handler[handler_idx].name,
> +				RTE_MEMPOOL_NAMESIZE, "%s", h->name);
> +	mempool_handler_list.handler[handler_idx].alloc = h->alloc;
> +	mempool_handler_list.handler[handler_idx].put = h->put;
> +	mempool_handler_list.handler[handler_idx].get = h->get;
> +	mempool_handler_list.handler[handler_idx].get_count = h->get_count;
> +
> +	rte_spinlock_unlock(&mempool_handler_list.sl);
> +
> +	return handler_idx;
> +}

Why not using a similar mechanism than what we have for PMDs?

	void rte_eal_driver_register(struct rte_driver *driver)
	{
		TAILQ_INSERT_TAIL(&dev_driver_list, driver, next);
	}

To do that, you just need to add a TAILQ_ENTRY() in your
rte_mempool_handler structure. This would avoid to duplicate the
structure into a static array whose size is limited.

Accessing to the callbacks would be easier:

	return mp->mp_handler->put(mp->rt_pool, obj_table, n);

instead of:

	return (mempool_handler_list.handler[mp->handler_idx].put)
					(mp->rt_pool, obj_table, n);

If we really want to copy the handlers somewhere, it could be in
the mempool structure. It would avoid an extra dereference
(note the first '.' instead of '->'):

	return mp.mp_handler->put(mp->rt_pool, obj_table, n);

After doing that, we could ask ourself if the wrappers are still
useful or not. I would have say that they could be removed.


The spinlock could be kept, although it may look a bit overkill:
- I don't expect to have several loading at the same time
- There is no unregister() function, so there is no risk to
   browse the list atomically

Last thing, I think this code should go in rte_mempool.c, not in
rte_mempool_default.c.


> +
> +/* TODO Convert to older mechanism of an array of stucts */
> +int16_t
> +rte_get_mempool_handler(const char *name)
> +{
> +	int16_t i;
> +
> +	for (i = 0; i < mempool_handler_list.num_handlers; i++) {
> +		if (!strcmp(name, mempool_handler_list.handler[i].name))
> +			return i;
> +	}
> +	return -1;
> +}

This would be replaced by a TAILQ_FOREACH().


> +static void *
> +rte_mempool_common_ring_alloc(struct rte_mempool *mp,
> +		const char *name, unsigned n, int socket_id, unsigned flags)
> +{
> +	struct rte_ring *r;
> +	char rg_name[RTE_RING_NAMESIZE];
> +	int rg_flags = 0;
> +
> +	if (flags & MEMPOOL_F_SP_PUT)
> +		rg_flags |= RING_F_SP_ENQ;
> +	if (flags & MEMPOOL_F_SC_GET)
> +		rg_flags |= RING_F_SC_DEQ;
> +
> +	/* allocate the ring that will be used to store objects */
> +	/* Ring functions will return appropriate errors if we are
> +	 * running as a secondary process etc., so no checks made
> +	 * in this function for that condition */
> +	snprintf(rg_name, sizeof(rg_name), "%s-ring", name);
> +	r = rte_ring_create(rg_name, rte_align32pow2(n+1), socket_id, rg_flags);
> +	if (r == NULL)
> +		return NULL;
> +
> +	mp->rt_pool = (void *)r;
> +
> +	return (void *) r;

I don't think the explicit casts are required.

> --- /dev/null
> +++ b/lib/librte_mempool/rte_mempool_internal.h

Is it the proper name?
We could imagine a mempool handler provided by a plugin, and
in this case this code should go in rte_mempool.h.

> +
> +struct rte_mempool_handler {
> +	char name[RTE_MEMPOOL_NAMESIZE]; /**< Name of mempool handler */

I would use a const char * here instead.


> +
> +	rte_mempool_alloc_t alloc;
> +
> +	rte_mempool_put_t put __rte_cache_aligned;
> +
> +	rte_mempool_get_t get __rte_cache_aligned;
> +
> +	rte_mempool_get_count get_count __rte_cache_aligned;
> +
> +	rte_mempool_free_t free __rte_cache_aligned;
> +};

I agree with Jerin's comments. I don't think we should cache
align each field. Maybe the whole structure.

> +
> +struct rte_mempool_handler_list {
> +	rte_spinlock_t sl;		  /**< Spinlock for add/delete. */
> +
> +	int32_t num_handlers;	  /**< Number of handlers that are valid. */
> +
> +	/* storage for all possible handlers */
> +	struct rte_mempool_handler handler[RTE_MEMPOOL_MAX_HANDLER_IDX];
> +};
> +
> +int16_t add_handler(struct rte_mempool_handler *h);

I think it should be called rte_mempool_register_handler().

> +
> +#define REGISTER_MEMPOOL_HANDLER(h) \
> +static int16_t __attribute__((used)) testfn_##h(void);\
> +int16_t __attribute__((constructor, used)) testfn_##h(void)\
> +{\
> +	return add_handler(&h);\
> +}
> +
> +#endif
>



Regards,
Olivier
  
Hunt, David Feb. 4, 2016, 4:47 p.m. UTC | #5
On 04/02/2016 14:52, Olivier MATZ wrote:
> Hi David,
>
> Nice work, thanks !
> Please see some comments below.
>
>

[snip]


Olivier,
     Thanks for your comprehensive comments. I'm working on a v2 patch 
based on feedback already received from Jerin, and I'll be sure to 
include your feedback also.
Many thanks,
David.
  
Hunt, David Feb. 4, 2016, 5:34 p.m. UTC | #6
On 04/02/2016 14:52, Olivier MATZ wrote:
> Hi David,

[snip]

Just a comment on one of your comments:

> Why not using a similar mechanism than what we have for PMDs?
>
>      void rte_eal_driver_register(struct rte_driver *driver)
>      {
>          TAILQ_INSERT_TAIL(&dev_driver_list, driver, next);
>      }
>
> To do that, you just need to add a TAILQ_ENTRY() in your
> rte_mempool_handler structure. This would avoid to duplicate the
> structure into a static array whose size is limited.
>
> Accessing to the callbacks would be easier:
>
>      return mp->mp_handler->put(mp->rt_pool, obj_table, n);

One of the iterations of the code did indeed use this mechanism, however 
I ran into problems with multiple processes using the same mempool. In 
that case, the 'mp_handler' element of the mempool in your return 
statement  is only valid for one of the processes. Hence the need for 
and index that's valid for all processes rather than a pointer that's 
valid for only one. And it's not easy to quickly index into an element 
in a queue, hence the array of 16 mempool_handler structs.

[snip]

Rgds,
Dave.
  
Olivier Matz Feb. 5, 2016, 9:26 a.m. UTC | #7
Hi David,

On 02/04/2016 06:34 PM, Hunt, David wrote:
> On 04/02/2016 14:52, Olivier MATZ wrote:
>> Hi David,
> 
> [snip]
> 
> Just a comment on one of your comments:
> 
>> Why not using a similar mechanism than what we have for PMDs?
>>
>>      void rte_eal_driver_register(struct rte_driver *driver)
>>      {
>>          TAILQ_INSERT_TAIL(&dev_driver_list, driver, next);
>>      }
>>
>> To do that, you just need to add a TAILQ_ENTRY() in your
>> rte_mempool_handler structure. This would avoid to duplicate the
>> structure into a static array whose size is limited.
>>
>> Accessing to the callbacks would be easier:
>>
>>      return mp->mp_handler->put(mp->rt_pool, obj_table, n);
> 
> One of the iterations of the code did indeed use this mechanism, however
> I ran into problems with multiple processes using the same mempool. In
> that case, the 'mp_handler' element of the mempool in your return
> statement  is only valid for one of the processes. Hence the need for
> and index that's valid for all processes rather than a pointer that's
> valid for only one. And it's not easy to quickly index into an element
> in a queue, hence the array of 16 mempool_handler structs.

Oh you mean with a secondary processes, I got it now.

Are we sure we can expect that the registered handlers are the same
between multiple processes? For instance, if a handler is registered
with a plugin, the same plugins must be passed to all processes.

I don't see any better solution than yours (except removing secondary
processes of course ;) ).


Thanks for clarifying,
Olivier
  
Olivier Matz Feb. 8, 2016, 11:02 a.m. UTC | #8
Hi David,

On 02/04/2016 05:47 PM, Hunt, David wrote:
> Olivier,
>     Thanks for your comprehensive comments. I'm working on a v2 patch
> based on feedback already received from Jerin, and I'll be sure to
> include your feedback also.
> Many thanks,
> David.


While answering to Keith, I realized there is the same kind of ABI
changes in your patchset too. It means it should also follow the
ABI deprecation process: dpdk/doc/guides/contributing/versioning.rst.

Regards,
Olivier
  
Hunt, David March 1, 2016, 1:32 p.m. UTC | #9
Olivier,
     Here's my comments on your feedback. Hopefully I've covered all of 
it this time, and I've summarised the outstanding questions at the bottom.

On 2/4/2016 2:52 PM, Olivier MATZ wrote:
>
>> -#ifndef RTE_LIBRTE_XEN_DOM0
>> -/* stub if DOM0 support not configured */
>> -struct rte_mempool *
>> -rte_dom0_mempool_create(const char *name __rte_unused,
>> -            unsigned n __rte_unused,
>> -            unsigned elt_size __rte_unused,
>> -            unsigned cache_size __rte_unused,
>> -            unsigned private_data_size __rte_unused,
>> -            rte_mempool_ctor_t *mp_init __rte_unused,
>> -            void *mp_init_arg __rte_unused,
>> -            rte_mempool_obj_ctor_t *obj_init __rte_unused,
>> -            void *obj_init_arg __rte_unused,
>> -            int socket_id __rte_unused,
>> -            unsigned flags __rte_unused)
>> -{
>> -    rte_errno = EINVAL;
>> -    return NULL;
>> -}
>> -#endif
>> -
>
> Could we move this is a separated commit?
> "mempool: remove unused rte_dom0_mempool_create stub"

Will do for v3.


--snip--
> return rte_mempool_xmem_create(name, n, elt_size,
>> -                           cache_size, private_data_size,
>> -                           mp_init, mp_init_arg,
>> -                           obj_init, obj_init_arg,
>> -                           socket_id, flags,
>> -                           NULL, NULL, MEMPOOL_PG_NUM_DEFAULT,
>> -                           MEMPOOL_PG_SHIFT_MAX);
>> +            cache_size, private_data_size,
>> +            mp_init, mp_init_arg,
>> +            obj_init, obj_init_arg,
>> +            socket_id, flags,
>> +            NULL, NULL,
>> +            MEMPOOL_PG_NUM_DEFAULT, MEMPOOL_PG_SHIFT_MAX);
>>   }
>
> As far as I can see, you are not modifying the code here, only the
> style. For better readability, it should go in another commit that
> only fixes indent or style issues.
>

I've removed any changes to style in v2. Only makes things more 
difficult to read.

> Also, I think the proper indentation is to use only one tab for the
> subsequent lines.

I've done this in v2.

>
>> @@ -598,6 +568,22 @@ rte_mempool_xmem_create(const char *name, 
>> unsigned n, unsigned elt_size,
>>       mp->cache_flushthresh = CALC_CACHE_FLUSHTHRESH(cache_size);
>>       mp->private_data_size = private_data_size;
>>
>> +    /*
>> +     * Since we have 4 combinations of the SP/SC/MP/MC, and stack,
>> +     * examine the
>> +     * flags to set the correct index into the handler table.
>> +     */
>
> nit: comment style is not correct
>

Will fix.

>> +    if (flags & MEMPOOL_F_USE_STACK)
>> +        mp->handler_idx = rte_get_mempool_handler("stack");
>
> The stack handler does not exist yet, it is introduced in the next
> commit. I think this code should be moved in the next commit too.

Done in v2

>
>> @@ -622,6 +607,10 @@ rte_mempool_xmem_create(const char *name, 
>> unsigned n, unsigned elt_size,
>>
>>       mp->elt_va_end = mp->elt_va_start;
>>
>> +    /* Parameters are setup. Call the mempool handler alloc */
>> +    if ((rte_mempool_ext_alloc(mp, name, n, socket_id, flags)) == NULL)
>> +        goto exit;
>> +
>
> I think some memory needs to be freed here. At least 'te'.

Done in v2

>> @@ -681,7 +670,9 @@ rte_mempool_dump_cache(FILE *f, const struct 
>> rte_mempool *mp)
>>       fprintf(f, "    cache_size=%"PRIu32"\n", mp->cache_size);
>>       for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
>>           cache_count = mp->local_cache[lcore_id].len;
>> -        fprintf(f, "    cache_count[%u]=%u\n", lcore_id, cache_count);
>> +        if (cache_count > 0)
>> +            fprintf(f, "    cache_count[%u]=%u\n",
>> +                        lcore_id, cache_count);
>>           count += cache_count;
>>       }
>>       fprintf(f, "    total_cache_count=%u\n", count);
>
> This could also be moved in a separate commit.

Removed this change, as it's not really relevant to mempool manager

>> @@ -825,7 +815,7 @@ rte_mempool_dump(FILE *f, const struct 
>> rte_mempool *mp)
>>               mp->size);
>>
>>       cache_count = rte_mempool_dump_cache(f, mp);
>> -    common_count = rte_ring_count(mp->ring);
>> +    common_count = /* rte_ring_count(mp->ring)*/0;
>>       if ((cache_count + common_count) > mp->size)
>>           common_count = mp->size - cache_count;
>>       fprintf(f, "  common_pool_count=%u\n", common_count);
>
> should it be rte_mempool_ext_get_count(mp) instead?
>

Done.

>
>
>> @@ -919,3 +909,111 @@ void rte_mempool_walk(void (*func)(const struct 
>> rte_mempool *, void *),
>>
>>       rte_rwlock_read_unlock(RTE_EAL_MEMPOOL_RWLOCK);
>>   }
>> +
>> +
>> +/* create the mempool using and external mempool manager */
>> +struct rte_mempool *
>> +rte_mempool_create_ext(const char *name, unsigned n, unsigned elt_size,
>> +            unsigned cache_size, unsigned private_data_size,
>> +            rte_mempool_ctor_t *mp_init, void *mp_init_arg,
>> +            rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
>> +            int socket_id, unsigned flags,
>> +            const char *handler_name)
>> +{
>
> I would have used one tab here for subsequent lines.

Done in v2

>
>> +    char mz_name[RTE_MEMZONE_NAMESIZE];
>> +    struct rte_mempool_list *mempool_list;
>> +    struct rte_mempool *mp = NULL;
>> +    struct rte_tailq_entry *te;
>> +    const struct rte_memzone *mz;
>> +    size_t mempool_size;
>> +    int mz_flags = RTE_MEMZONE_1GB|RTE_MEMZONE_SIZE_HINT_ONLY;
>> +    int rg_flags = 0;
>> +    int16_t handler_idx;
>> +
>> +    mempool_list = RTE_TAILQ_CAST(rte_mempool_tailq.head, 
>> rte_mempool_list);
>> +
>> +    /* asked cache too big */
>> +    if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE ||
>> +        CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
>> +        rte_errno = EINVAL;
>> +        return NULL;
>> +    }
>> +
>> +    handler_idx = rte_get_mempool_handler(handler_name);
>> +    if (handler_idx < 0) {
>> +        RTE_LOG(ERR, MEMPOOL, "Cannot find mempool handler by 
>> name!\n");
>> +        goto exit;
>> +    }
>> +
>> +    /* ring flags */
>> +    if (flags & MEMPOOL_F_SP_PUT)
>> +        rg_flags |= RING_F_SP_ENQ;
>> +    if (flags & MEMPOOL_F_SC_GET)
>> +        rg_flags |= RING_F_SC_DEQ;
>> +
>> ...
>
> I have the same comment than Jerin here. I think it should be
> factorized with rte_mempool_xmem_create() if possible. Maybe a
> at least a function rte_mempool_init() could be introduced, in
> the same model than rte_ring_init().

factorization done in v2.

>
>> diff --git a/lib/librte_mempool/rte_mempool.h 
>> b/lib/librte_mempool/rte_mempool.h
>> index 6e2390a..620cfb7 100644
>> --- a/lib/librte_mempool/rte_mempool.h
>> +++ b/lib/librte_mempool/rte_mempool.h
>> @@ -88,6 +88,8 @@ extern "C" {
>>   struct rte_mempool_debug_stats {
>>       uint64_t put_bulk;         /**< Number of puts. */
>>       uint64_t put_objs;         /**< Number of objects successfully 
>> put. */
>> +    uint64_t put_pool_bulk;    /**< Number of puts into pool. */
>> +    uint64_t put_pool_objs;    /**< Number of objects into pool. */
>>       uint64_t get_success_bulk; /**< Successful allocation number. */
>>       uint64_t get_success_objs; /**< Objects successfully allocated. */
>>       uint64_t get_fail_bulk;    /**< Failed allocation number. */
>
> I think the comment of put_pool_objs is not very clear.
> Shouldn't we have the same stats for get?
>

Not used, removed. Covered by put_bulk.

>
>> @@ -123,6 +125,7 @@ struct rte_mempool_objsz {
>>   #define RTE_MEMPOOL_NAMESIZE 32 /**< Maximum length of a memory 
>> pool. */
>>   #define RTE_MEMPOOL_MZ_PREFIX "MP_"
>>
>> +
>>   /* "MP_<name>" */
>>   #define    RTE_MEMPOOL_MZ_FORMAT    RTE_MEMPOOL_MZ_PREFIX "%s"
>>
>
> to be removed

Done in v2.

>
>> @@ -175,12 +178,85 @@ struct rte_mempool_objtlr {
>>   #endif
>>   };
>>
>> +/* Handler functions for external mempool support */
>> +typedef void *(*rte_mempool_alloc_t)(struct rte_mempool *mp,
>> +        const char *name, unsigned n, int socket_id, unsigned flags);
>> +typedef int (*rte_mempool_put_t)(void *p,
>> +        void * const *obj_table, unsigned n);
>> +typedef int (*rte_mempool_get_t)(void *p, void **obj_table,
>> +        unsigned n);
>> +typedef unsigned (*rte_mempool_get_count)(void *p);
>> +typedef int(*rte_mempool_free_t)(struct rte_mempool *mp);
>
> a space is missing after 'int'.

   done in v2

>
>
>> +
>> +/**
>> + * @internal wrapper for external mempool manager alloc callback.
>> + *
>> + * @param mp
>> + *   Pointer to the memory pool.
>> + * @param name
>> + *   Name of the statistics field to increment in the memory pool.
>> + * @param n
>> + *   Number to add to the object-oriented statistics.
>
> Are this comments correct?

Fixed in v2

>
>
>> + * @param socket_id
>> + *   socket id on which to allocate.
>> + * @param flags
>> + *   general flags to allocate function
>
> We could add that we are talking about MEMPOOL_F_* flags.
>
> By the way, the '@return' is missing in all declarations.
>

Will fix in v3

>
>> +/**
>> + * @internal wrapper for external mempool manager get_count callback.
>> + *
>> + * @param mp
>> + *   Pointer to the memory pool.
>> + */
>> +int
>> +rte_mempool_ext_get_count(const struct rte_mempool *mp);
>
> should it be unsigned instead of int?
>

Yes. Will change.

>
>> +
>> +/**
>> + * @internal wrapper for external mempool manager free callback.
>> + *
>> + * @param mp
>> + *   Pointer to the memory pool.
>> + */
>> +int
>> +rte_mempool_ext_free(struct rte_mempool *mp);
>> +
>>   /**
>>    * The RTE mempool structure.
>>    */
>>   struct rte_mempool {
>>       char name[RTE_MEMPOOL_NAMESIZE]; /**< Name of mempool. */
>> -    struct rte_ring *ring;           /**< Ring to store objects. */
>>       phys_addr_t phys_addr;           /**< Phys. addr. of mempool 
>> struct. */
>>       int flags;                       /**< Flags of the mempool. */
>>       uint32_t size;                   /**< Size of the mempool. */
>> @@ -194,6 +270,11 @@ struct rte_mempool {
>>
>>       unsigned private_data_size;      /**< Size of private data. */
>>
>> +    /* Common pool data structure pointer */
>> +    void *rt_pool __rte_cache_aligned;
>
> What is the meaning of rt_pool?

I agree that it's probably not a very good name. Since it's basically 
the pointer which is used by the handlers callbacks, maybe we should 
call it mempool_storage? That leaves it generic enough that it can point 
at a ring, an array, or whatever else is needed for a particular handler.

>> +
>> +    int16_t handler_idx;
>> +
>
> I don't think I'm getting why an index is better than a pointer to
> the struct rte_mempool_handler. It would simplify the add_handler()
> function. See below for a detailed explaination.
>

As discussed in previous mails. It's to facilitate secondary processes.

>> @@ -223,6 +304,10 @@ struct rte_mempool {
>>   #define MEMPOOL_F_NO_CACHE_ALIGN 0x0002 /**< Do not align objs on 
>> cache lines.*/
>>   #define MEMPOOL_F_SP_PUT         0x0004 /**< Default put is 
>> "single-producer".*/
>>   #define MEMPOOL_F_SC_GET         0x0008 /**< Default get is 
>> "single-consumer".*/
>> +#define MEMPOOL_F_USE_STACK      0x0010 /**< Use a stack for the 
>> common pool. */
>
> Stack is not implemented in this commit. It should be moved in next
> commit.

Done in v2

>> +#define MEMPOOL_F_USE_TM         0x0020
>> +#define MEMPOOL_F_NO_SECONDARY   0x0040
>> +
>
> What are these flags?

Not needed. Part of temporary change. Removed.

>> @@ -728,7 +813,6 @@ rte_dom0_mempool_create(const char *name, 
>> unsigned n, unsigned elt_size,
>>           rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
>>           int socket_id, unsigned flags);
>>
>> -
>>   /**
>>    * Dump the status of the mempool to the console.
>>    *
>
> style

will fix in v3.

>
>
>> @@ -753,7 +837,7 @@ void rte_mempool_dump(FILE *f, const struct 
>> rte_mempool *mp);
>>    */
>>   static inline void __attribute__((always_inline))
>>   __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
>> -            unsigned n, int is_mp)
>> +            unsigned n, __attribute__((unused)) int is_mp)
>
> You could use __rte_unused instead of __attribute__((unused))

will change in v3

>
>> @@ -769,8 +853,7 @@ __mempool_put_bulk(struct rte_mempool *mp, void * 
>> const *obj_table,
>>
>>   #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
>>       /* cache is not enabled or single producer or non-EAL thread */
>> -    if (unlikely(cache_size == 0 || is_mp == 0 ||
>> -             lcore_id >= RTE_MAX_LCORE))
>> +    if (unlikely(cache_size == 0 || lcore_id >= RTE_MAX_LCORE))
>>           goto ring_enqueue;
>>
>>       /* Go straight to ring if put would overflow mem allocated for 
>> cache */
>
> If I understand well, we now always use the cache, even if the mempool
> is single-producer. I was wondering if it would have a performance
> impact... I suppose that using the cache is more efficient than the ring
> in single-producer mode, so it may increase performance. Do you have an
> idea of the impact here?

I've seen very little in performance gain, maybe a couple of percent for 
some tests, and up to 10% drop for some single core tests. I'll do some 
more specific testing based on SP versus MP.

>
> I think we could remove the parameter as the function is marked as
> internal. The comment above should also be fixed. The same comments
> apply to the get() functions.
>

will fix comments in v3, and see if we should remove is_mp based on more 
performance testing.

>
>> @@ -793,8 +876,8 @@ __mempool_put_bulk(struct rte_mempool *mp, void * 
>> const *obj_table,
>>
>>       cache->len += n;
>>
>> -    if (cache->len >= flushthresh) {
>> -        rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache_size],
>> +    if (unlikely(cache->len >= flushthresh)) {
>> +        rte_mempool_ext_put_bulk(mp, &cache->objs[cache_size],
>>                   cache->len - cache_size);
>
> Shouldn't we add a __MEMPOOL_STAT_ADD(mp, put_pool,
>   cache->len - cache_size) here ?
>

Correct. Added in v3.

>> @@ -954,8 +1025,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void 
>> **obj_table,
>>       uint32_t cache_size = mp->cache_size;
>>
>>       /* cache is not enabled or single consumer */
>> -    if (unlikely(cache_size == 0 || is_mc == 0 ||
>> -             n >= cache_size || lcore_id >= RTE_MAX_LCORE))
>> +    if (unlikely(cache_size == 0 || n >= cache_size ||
>> +                        lcore_id >= RTE_MAX_LCORE))
>
> incorrect indent

will fix in v3

>
>> @@ -967,7 +1038,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void 
>> **obj_table,
>>           uint32_t req = n + (cache_size - cache->len);
>>
>>           /* How many do we require i.e. number to fill the cache + 
>> the request */
>> -        ret = rte_ring_mc_dequeue_bulk(mp->ring, 
>> &cache->objs[cache->len], req);
>> +        ret = rte_mempool_ext_get_bulk(mp,
>> +                        &cache->objs[cache->len], req);
>
> indent

will fix in v3


>> +/**
>> + * Function to get an index to an external mempool manager
>> + *
>> + * @param name
>> + *   The name of the mempool handler to search for in the list of 
>> handlers
>> + * @return
>> + *   The index of the mempool handler in the list of registered mempool
>> + *   handlers
>> + */
>> +int16_t
>> +rte_get_mempool_handler(const char *name);
>
> I would prefer a function like this:
>
> const struct rte_mempool_handler *
> rte_get_mempool_handler(const char *name);
>
> (detailed explaination below)

Already discussed previously, index needed over pointer because of 
secondary processes.


>> diff --git a/lib/librte_mempool/rte_mempool_default.c 
>> b/lib/librte_mempool/rte_mempool_default.c
>> new file mode 100644
>> index 0000000..2493dc1
>> --- /dev/null
>> +++ b/lib/librte_mempool/rte_mempool_default.c
>> +#include "rte_mempool_internal.h"
>> +
>> +/*
>> + * Indirect jump table to support external memory pools
>> + */
>> +struct rte_mempool_handler_list mempool_handler_list = {
>> +    .sl =  RTE_SPINLOCK_INITIALIZER ,
>> +    .num_handlers = 0
>> +};
>> +
>> +/* TODO Convert to older mechanism of an array of stucts */
>> +int16_t
>> +add_handler(struct rte_mempool_handler *h)
>> +{
>> +    int16_t handler_idx;
>> +
>> +    /*  */
>> +    rte_spinlock_lock(&mempool_handler_list.sl);
>> +
>> +    /* Check whether jump table has space */
>> +    if (mempool_handler_list.num_handlers >= 
>> RTE_MEMPOOL_MAX_HANDLER_IDX) {
>> +        rte_spinlock_unlock(&mempool_handler_list.sl);
>> +        RTE_LOG(ERR, MEMPOOL,
>> +                "Maximum number of mempool handlers exceeded\n");
>> +        return -1;
>> +    }
>> +
>> +    if ((h->put == NULL) || (h->get == NULL) ||
>> +        (h->get_count == NULL)) {
>> +        rte_spinlock_unlock(&mempool_handler_list.sl);
>> +         RTE_LOG(ERR, MEMPOOL,
>> +                    "Missing callback while registering mempool 
>> handler\n");
>> +        return -1;
>> +    }
>> +
>> +    /* add new handler index */
>> +    handler_idx = mempool_handler_list.num_handlers++;
>> +
>> +    snprintf(mempool_handler_list.handler[handler_idx].name,
>> +                RTE_MEMPOOL_NAMESIZE, "%s", h->name);
>> +    mempool_handler_list.handler[handler_idx].alloc = h->alloc;
>> +    mempool_handler_list.handler[handler_idx].put = h->put;
>> +    mempool_handler_list.handler[handler_idx].get = h->get;
>> +    mempool_handler_list.handler[handler_idx].get_count = h->get_count;
>> +
>> +    rte_spinlock_unlock(&mempool_handler_list.sl);
>> +
>> +    return handler_idx;
>> +}
>
> Why not using a similar mechanism than what we have for PMDs?
>
>     void rte_eal_driver_register(struct rte_driver *driver)
>     {
>         TAILQ_INSERT_TAIL(&dev_driver_list, driver, next);
>     }
>
> To do that, you just need to add a TAILQ_ENTRY() in your
> rte_mempool_handler structure. This would avoid to duplicate the
> structure into a static array whose size is limited.
>
> Accessing to the callbacks would be easier:
>
>     return mp->mp_handler->put(mp->rt_pool, obj_table, n);
>
> instead of:
>
>     return (mempool_handler_list.handler[mp->handler_idx].put)
>                     (mp->rt_pool, obj_table, n);
>
> If we really want to copy the handlers somewhere, it could be in
> the mempool structure. It would avoid an extra dereference
> (note the first '.' instead of '->'):
>
>     return mp.mp_handler->put(mp->rt_pool, obj_table, n);
>
> After doing that, we could ask ourself if the wrappers are still
> useful or not. I would have say that they could be removed.
>
>
> The spinlock could be kept, although it may look a bit overkill:
> - I don't expect to have several loading at the same time
> - There is no unregister() function, so there is no risk to
>   browse the list atomically
>

Already discussed previously, index needed over pointer because of 
secondary processes.

> Last thing, I think this code should go in rte_mempool.c, not in
> rte_mempool_default.c.

I was trying to keep the default handlers together in their own file, 
rather than having them in with the mempool framework. I think it's 
better having them separate, and new handlers can go in their own files 
also. no?


>> +
>> +/* TODO Convert to older mechanism of an array of stucts */
>> +int16_t
>> +rte_get_mempool_handler(const char *name)
>> +{
>> +    int16_t i;
>> +
>> +    for (i = 0; i < mempool_handler_list.num_handlers; i++) {
>> +        if (!strcmp(name, mempool_handler_list.handler[i].name))
>> +            return i;
>> +    }
>> +    return -1;
>> +}
>
> This would be replaced by a TAILQ_FOREACH().

Already discussed previously, index needed over pointer because of 
secondary processes.

>
>> +static void *
>> +rte_mempool_common_ring_alloc(struct rte_mempool *mp,
>> +        const char *name, unsigned n, int socket_id, unsigned flags)
>> +{
>> +    struct rte_ring *r;
>> +    char rg_name[RTE_RING_NAMESIZE];
>> +    int rg_flags = 0;
>> +
>> +    if (flags & MEMPOOL_F_SP_PUT)
>> +        rg_flags |= RING_F_SP_ENQ;
>> +    if (flags & MEMPOOL_F_SC_GET)
>> +        rg_flags |= RING_F_SC_DEQ;
>> +
>> +    /* allocate the ring that will be used to store objects */
>> +    /* Ring functions will return appropriate errors if we are
>> +     * running as a secondary process etc., so no checks made
>> +     * in this function for that condition */
>> +    snprintf(rg_name, sizeof(rg_name), "%s-ring", name);
>> +    r = rte_ring_create(rg_name, rte_align32pow2(n+1), socket_id, 
>> rg_flags);
>> +    if (r == NULL)
>> +        return NULL;
>> +
>> +    mp->rt_pool = (void *)r;
>> +
>> +    return (void *) r;
>
> I don't think the explicit casts are required.

will change in v3

>
>> --- /dev/null
>> +++ b/lib/librte_mempool/rte_mempool_internal.h
>
> Is it the proper name?
> We could imagine a mempool handler provided by a plugin, and
> in this case this code should go in rte_mempool.h.

I was trying to keep the public APIs in rte_mempool.h, and aal the 
private stuff in rte_mempool_internal.h. Maybe a better name would be 
rte_mempool_private.h?

>> +
>> +struct rte_mempool_handler {
>> +    char name[RTE_MEMPOOL_NAMESIZE]; /**< Name of mempool handler */
>
> I would use a const char * here instead.
>

Would we then have to allocate the memory for the string elsewhere? I 
would have thought this is the more straightforward method.

>> +
>> +    rte_mempool_alloc_t alloc;
>> +
>> +    rte_mempool_put_t put __rte_cache_aligned;
>> +
>> +    rte_mempool_get_t get __rte_cache_aligned;
>> +
>> +    rte_mempool_get_count get_count __rte_cache_aligned;
>> +
>> +    rte_mempool_free_t free __rte_cache_aligned;
>> +};
>
> I agree with Jerin's comments. I don't think we should cache
> align each field. Maybe the whole structure.

Changed in v2.

>> +
>> +struct rte_mempool_handler_list {
>> +    rte_spinlock_t sl;          /**< Spinlock for add/delete. */
>> +
>> +    int32_t num_handlers;      /**< Number of handlers that are 
>> valid. */
>> +
>> +    /* storage for all possible handlers */
>> +    struct rte_mempool_handler handler[RTE_MEMPOOL_MAX_HANDLER_IDX];
>> +};
>> +
>> +int16_t add_handler(struct rte_mempool_handler *h);
>
> I think it should be called rte_mempool_register_handler().

Agreed, changed in v2.

>> +
>> +#define REGISTER_MEMPOOL_HANDLER(h) \
>> +static int16_t __attribute__((used)) testfn_##h(void);\
>> +int16_t __attribute__((constructor, used)) testfn_##h(void)\
>> +{\
>> +    return add_handler(&h);\
>> +}
>> +
>> +#endif
>>
>
>
>
> Regards,
> Olivier

Apologies for not addressing all of your comments for v2. I'll await 
your comments on the couple of outstanding questions above, then push up v3.
Mainly:
* change "rt_pool" to "mempool_storage"?
* change to const char * for mempool name, or leave as is.
* move all contents of rte_mempool_internal.h to rte_mempool.h, or leave 
as is.
* alternatively change name of rte_mempool_internal.h to 
rte_mempool_private.h
* I need to look into the performace of always using cache for single 
producer/consumer.

Thanks,
David.
  
Olivier Matz March 4, 2016, 9:05 a.m. UTC | #10
Hi David,

>>
>>> @@ -622,6 +607,10 @@ rte_mempool_xmem_create(const char *name,
>>> unsigned n, unsigned elt_size,
>>>
>>>       mp->elt_va_end = mp->elt_va_start;
>>>
>>> +    /* Parameters are setup. Call the mempool handler alloc */
>>> +    if ((rte_mempool_ext_alloc(mp, name, n, socket_id, flags)) == NULL)
>>> +        goto exit;
>>> +
>>
>> I think some memory needs to be freed here. At least 'te'.
> 
> Done in v2

Please note that in the meanwhile, this fix has been pushed (as we need
it for next release):
http://dpdk.org/browse/dpdk/commit/lib/librte_mempool/rte_mempool.c?id=86f36ff9578b5f3d697c8fcf6072dcb70e2b246f


>>> diff --git a/lib/librte_mempool/rte_mempool.h
>>> b/lib/librte_mempool/rte_mempool.h
>>> index 6e2390a..620cfb7 100644
>>> --- a/lib/librte_mempool/rte_mempool.h
>>> +++ b/lib/librte_mempool/rte_mempool.h
>>> @@ -88,6 +88,8 @@ extern "C" {
>>>   struct rte_mempool_debug_stats {
>>>       uint64_t put_bulk;         /**< Number of puts. */
>>>       uint64_t put_objs;         /**< Number of objects successfully
>>> put. */
>>> +    uint64_t put_pool_bulk;    /**< Number of puts into pool. */
>>> +    uint64_t put_pool_objs;    /**< Number of objects into pool. */
>>>       uint64_t get_success_bulk; /**< Successful allocation number. */
>>>       uint64_t get_success_objs; /**< Objects successfully allocated. */
>>>       uint64_t get_fail_bulk;    /**< Failed allocation number. */
>>
>> I think the comment of put_pool_objs is not very clear.
>> Shouldn't we have the same stats for get?
>>
> 
> Not used, removed. Covered by put_bulk.

I guess you mean it will be removed in v3? It is still there in the v2
(the field, not the comment that has been fixed).

Shouldn't we have the same stats for get?


>>>   /**
>>>    * The RTE mempool structure.
>>>    */
>>>   struct rte_mempool {
>>>       char name[RTE_MEMPOOL_NAMESIZE]; /**< Name of mempool. */
>>> -    struct rte_ring *ring;           /**< Ring to store objects. */
>>>       phys_addr_t phys_addr;           /**< Phys. addr. of mempool
>>> struct. */
>>>       int flags;                       /**< Flags of the mempool. */
>>>       uint32_t size;                   /**< Size of the mempool. */
>>> @@ -194,6 +270,11 @@ struct rte_mempool {
>>>
>>>       unsigned private_data_size;      /**< Size of private data. */
>>>
>>> +    /* Common pool data structure pointer */
>>> +    void *rt_pool __rte_cache_aligned;
>>
>> What is the meaning of rt_pool?
> 
> I agree that it's probably not a very good name. Since it's basically
> the pointer which is used by the handlers callbacks, maybe we should
> call it mempool_storage? That leaves it generic enough that it can point
> at a ring, an array, or whatever else is needed for a particular handler.

My question was more about the "rt_" prefix. Maybe I missed something
obvious? I think "pool" or "pool_handler" is ok.


>>> @@ -769,8 +853,7 @@ __mempool_put_bulk(struct rte_mempool *mp, void *
>>> const *obj_table,
>>>
>>>   #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
>>>       /* cache is not enabled or single producer or non-EAL thread */
>>> -    if (unlikely(cache_size == 0 || is_mp == 0 ||
>>> -             lcore_id >= RTE_MAX_LCORE))
>>> +    if (unlikely(cache_size == 0 || lcore_id >= RTE_MAX_LCORE))
>>>           goto ring_enqueue;
>>>
>>>       /* Go straight to ring if put would overflow mem allocated for
>>> cache */
>>
>> If I understand well, we now always use the cache, even if the mempool
>> is single-producer. I was wondering if it would have a performance
>> impact... I suppose that using the cache is more efficient than the ring
>> in single-producer mode, so it may increase performance. Do you have an
>> idea of the impact here?
> 
> I've seen very little in performance gain, maybe a couple of percent for
> some tests, and up to 10% drop for some single core tests. I'll do some
> more specific testing based on SP versus MP.

OK thanks!


>>> diff --git a/lib/librte_mempool/rte_mempool_default.c
>>> b/lib/librte_mempool/rte_mempool_default.c
>>> new file mode 100644
>>> index 0000000..2493dc1
>>> --- /dev/null
>>> +++ b/lib/librte_mempool/rte_mempool_default.c
>>> +#include "rte_mempool_internal.h"
>>> +
>>> +/*
>>> + * Indirect jump table to support external memory pools
>>> + */
>>> +struct rte_mempool_handler_list mempool_handler_list = {
>>> +    .sl =  RTE_SPINLOCK_INITIALIZER ,
>>> +    .num_handlers = 0
>>> +};
>>> +
>>> +/* TODO Convert to older mechanism of an array of stucts */
>>> +int16_t
>>> +add_handler(struct rte_mempool_handler *h)
>>> +{
>>> +    int16_t handler_idx;
>>> +
>>> +    /*  */
>>> +    rte_spinlock_lock(&mempool_handler_list.sl);
>>> +
>>> +    /* Check whether jump table has space */
>>> +    if (mempool_handler_list.num_handlers >=
>>> RTE_MEMPOOL_MAX_HANDLER_IDX) {
>>> +        rte_spinlock_unlock(&mempool_handler_list.sl);
>>> +        RTE_LOG(ERR, MEMPOOL,
>>> +                "Maximum number of mempool handlers exceeded\n");
>>> +        return -1;
>>> +    }
>>> +
>>> +    if ((h->put == NULL) || (h->get == NULL) ||
>>> +        (h->get_count == NULL)) {
>>> +        rte_spinlock_unlock(&mempool_handler_list.sl);
>>> +         RTE_LOG(ERR, MEMPOOL,
>>> +                    "Missing callback while registering mempool
>>> handler\n");
>>> +        return -1;
>>> +    }
>>> +
>>> +    /* add new handler index */
>>> +    handler_idx = mempool_handler_list.num_handlers++;
>>> +
>>> +    snprintf(mempool_handler_list.handler[handler_idx].name,
>>> +                RTE_MEMPOOL_NAMESIZE, "%s", h->name);
>>> +    mempool_handler_list.handler[handler_idx].alloc = h->alloc;
>>> +    mempool_handler_list.handler[handler_idx].put = h->put;
>>> +    mempool_handler_list.handler[handler_idx].get = h->get;
>>> +    mempool_handler_list.handler[handler_idx].get_count = h->get_count;
>>> +
>>> +    rte_spinlock_unlock(&mempool_handler_list.sl);
>>> +
>>> +    return handler_idx;
>>> +}
>>
>> Why not using a similar mechanism than what we have for PMDs?
>>
>>     void rte_eal_driver_register(struct rte_driver *driver)
>>     {
>>         TAILQ_INSERT_TAIL(&dev_driver_list, driver, next);
>>     }
>>
>> To do that, you just need to add a TAILQ_ENTRY() in your
>> rte_mempool_handler structure. This would avoid to duplicate the
>> structure into a static array whose size is limited.
>>
>> Accessing to the callbacks would be easier:
>>
>>     return mp->mp_handler->put(mp->rt_pool, obj_table, n);
>>
>> instead of:
>>
>>     return (mempool_handler_list.handler[mp->handler_idx].put)
>>                     (mp->rt_pool, obj_table, n);
>>
>> If we really want to copy the handlers somewhere, it could be in
>> the mempool structure. It would avoid an extra dereference
>> (note the first '.' instead of '->'):
>>
>>     return mp.mp_handler->put(mp->rt_pool, obj_table, n);
>>
>> After doing that, we could ask ourself if the wrappers are still
>> useful or not. I would have say that they could be removed.
>>
>>
>> The spinlock could be kept, although it may look a bit overkill:
>> - I don't expect to have several loading at the same time
>> - There is no unregister() function, so there is no risk to
>>   browse the list atomically
>>
> 
> Already discussed previously, index needed over pointer because of
> secondary processes.

Could you add a comment stating this? It may help for next readers
to have this info.

>> Last thing, I think this code should go in rte_mempool.c, not in
>> rte_mempool_default.c.
> 
> I was trying to keep the default handlers together in their own file,
> rather than having them in with the mempool framework. I think it's
> better having them separate, and new handlers can go in their own files
> also. no?

OK for the following functions:
 common_ring_mp_put()
 common_ring_sp_put()
 common_ring_mc_get()
 common_ring_sc_get()
 common_ring_get_count()
 rte_mempool_common_ring_alloc()  (note: only this one has
       a rte_mempool prefix, maybe it should be fixed)

The other functions are part of the framework to add an external
handler, I don't think they should go in rte_mempool_default.c
They could either go in rte_mempool.c or in another file
rte_mempool_handler.c.


>>> --- /dev/null
>>> +++ b/lib/librte_mempool/rte_mempool_internal.h
>>
>> Is it the proper name?
>> We could imagine a mempool handler provided by a plugin, and
>> in this case this code should go in rte_mempool.h.
> 
> I was trying to keep the public APIs in rte_mempool.h, and aal the
> private stuff in rte_mempool_internal.h. Maybe a better name would be
> rte_mempool_private.h?

Are these functions internal? I mean, is it possible for an application
or an external PMD (.so) to provide its own handler? I think it would
be really interesting to have this capability.

Then, I would prefer to have this either in rte_mempool.h or in
rte_mempool_handler.h (that would be coherent with the .c)

>>> +
>>> +struct rte_mempool_handler {
>>> +    char name[RTE_MEMPOOL_NAMESIZE]; /**< Name of mempool handler */
>>
>> I would use a const char * here instead.
>>
> 
> Would we then have to allocate the memory for the string elsewhere? I
> would have thought this is the more straightforward method.

My initial question was, can we just have something like this:

	// my_handler.c
	static struct rte_mempool_handler handler_stack = {
		.name = "my_handler",
		.alloc = my_alloc,
		...
	};

	// rte_mempool_handler.c
	int16_t
	rte_mempool_register_handler(struct rte_mempool_handler *h)
	{
		...
		handler->name = h->name; /* instead of snprintf */
		...
	}

But it won't be possible as the structures will be shared with
a secondary process. So the name[RTE_MEMPOOL_NAMESIZE] is fine.



Regards,
Olivier
  
Hunt, David March 8, 2016, 10:04 a.m. UTC | #11
Hi Olivier,

On 3/4/2016 9:05 AM, Olivier MATZ wrote:
> Hi David,
>
>>>> @@ -622,6 +607,10 @@ rte_mempool_xmem_create(const char *name,
>>>> unsigned n, unsigned elt_size,
>>>>
>>>>        mp->elt_va_end = mp->elt_va_start;
>>>>
>>>> +    /* Parameters are setup. Call the mempool handler alloc */
>>>> +    if ((rte_mempool_ext_alloc(mp, name, n, socket_id, flags)) == NULL)
>>>> +        goto exit;
>>>> +
>>> I think some memory needs to be freed here. At least 'te'.
>> Done in v2
> Please note that in the meanwhile, this fix has been pushed (as we need
> it for next release):
> http://dpdk.org/browse/dpdk/commit/lib/librte_mempool/rte_mempool.c?id=86f36ff9578b5f3d697c8fcf6072dcb70e2b246f

v3 will be rebased on top of the latest head of the repo.


>>>> diff --git a/lib/librte_mempool/rte_mempool.h
>>>> b/lib/librte_mempool/rte_mempool.h
>>>> index 6e2390a..620cfb7 100644
>>>> --- a/lib/librte_mempool/rte_mempool.h
>>>> +++ b/lib/librte_mempool/rte_mempool.h
>>>> @@ -88,6 +88,8 @@ extern "C" {
>>>>    struct rte_mempool_debug_stats {
>>>>        uint64_t put_bulk;         /**< Number of puts. */
>>>>        uint64_t put_objs;         /**< Number of objects successfully
>>>> put. */
>>>> +    uint64_t put_pool_bulk;    /**< Number of puts into pool. */
>>>> +    uint64_t put_pool_objs;    /**< Number of objects into pool. */
>>>>        uint64_t get_success_bulk; /**< Successful allocation number. */
>>>>        uint64_t get_success_objs; /**< Objects successfully allocated. */
>>>>        uint64_t get_fail_bulk;    /**< Failed allocation number. */
>>> I think the comment of put_pool_objs is not very clear.
>>> Shouldn't we have the same stats for get?
>>>
>> Not used, removed. Covered by put_bulk.
> I guess you mean it will be removed in v3? It is still there in the v2
> (the field, not the comment that has been fixed).
>
> Shouldn't we have the same stats for get?

I believe get's are covered by the get_success_bulk and get_fail_bulk

>>>>    /**
>>>>     * The RTE mempool structure.
>>>>     */
>>>>    struct rte_mempool {
>>>>        char name[RTE_MEMPOOL_NAMESIZE]; /**< Name of mempool. */
>>>> -    struct rte_ring *ring;           /**< Ring to store objects. */
>>>>        phys_addr_t phys_addr;           /**< Phys. addr. of mempool
>>>> struct. */
>>>>        int flags;                       /**< Flags of the mempool. */
>>>>        uint32_t size;                   /**< Size of the mempool. */
>>>> @@ -194,6 +270,11 @@ struct rte_mempool {
>>>>
>>>>        unsigned private_data_size;      /**< Size of private data. */
>>>>
>>>> +    /* Common pool data structure pointer */
>>>> +    void *rt_pool __rte_cache_aligned;
>>> What is the meaning of rt_pool?
>> I agree that it's probably not a very good name. Since it's basically
>> the pointer which is used by the handlers callbacks, maybe we should
>> call it mempool_storage? That leaves it generic enough that it can point
>> at a ring, an array, or whatever else is needed for a particular handler.
> My question was more about the "rt_" prefix. Maybe I missed something
> obvious? I think "pool" or "pool_handler" is ok.

v3 will use "pool"

>>>> -
>>>> +
>>>> +    /* add new handler index */
>>>> +    handler_idx = mempool_handler_list.num_handlers++;
>>>> +
>>>> +    snprintf(mempool_handler_list.handler[handler_idx].name,
>>>> +                RTE_MEMPOOL_NAMESIZE, "%s", h->name);
>>>> +    mempool_handler_list.handler[handler_idx].alloc = h->alloc;
>>>> +    mempool_handler_list.handler[handler_idx].put = h->put;
>>>> +    mempool_handler_list.handler[handler_idx].get = h->get;
>>>> +    mempool_handler_list.handler[handler_idx].get_count = h->get_count;
>>>> +
>>>> +    rte_spinlock_unlock(&mempool_handler_list.sl);
>>>> +
>>>> +    return handler_idx;
>>>> +}
>>> Why not using a similar mechanism than what we have for PMDs?
>>>
>>>      void rte_eal_driver_register(struct rte_driver *driver)
>>>      {
>>>          TAILQ_INSERT_TAIL(&dev_driver_list, driver, next);
>>>      }
>>>
>> Already discussed previously, index needed over pointer because of
>> secondary processes.
> Could you add a comment stating this? It may help for next readers
> to have this info.

v3: Comment added to the header file where we define header_idx 
explaining the use of an index versus pointers

>>> Last thing, I think this code should go in rte_mempool.c, not in
>>> rte_mempool_default.c.
>> I was trying to keep the default handlers together in their own file,
>> rather than having them in with the mempool framework. I think it's
>> better having them separate, and new handlers can go in their own files
>> also. no?
> OK for the following functions:
>   common_ring_mp_put()
>   common_ring_sp_put()
>   common_ring_mc_get()
>   common_ring_sc_get()
>   common_ring_get_count()
>   rte_mempool_common_ring_alloc()  (note: only this one has
>         a rte_mempool prefix, maybe it should be fixed)
>
> The other functions are part of the framework to add an external
> handler, I don't think they should go in rte_mempool_default.c
> They could either go in rte_mempool.c or in another file
> rte_mempool_handler.c.

v3: Agreed. The bulk of the v3 is simplification of the files.
All of the "common" callbacks are now in in rte_mempool_handler.c and 
rte_mempool_handler.h.
I've renamed the 'alloc' function above to be in line with the naming of 
the others.
The 'custom' handler has been banished to the autotest code, so as to 
keep the library as clean as possible.
What's interesting is that the autotest can have all the code defining 
the custom mempool (including it's registration), keeping the library 
free of user code.

>>>> --- /dev/null
>>>> +++ b/lib/librte_mempool/rte_mempool_internal.h
>>> Is it the proper name?
>>> We could imagine a mempool handler provided by a plugin, and
>>> in this case this code should go in rte_mempool.h.
>> I was trying to keep the public APIs in rte_mempool.h, and aal the
>> private stuff in rte_mempool_internal.h. Maybe a better name would be
>> rte_mempool_private.h?
> Are these functions internal? I mean, is it possible for an application
> or an external PMD (.so) to provide its own handler? I think it would
> be really interesting to have this capability.
>
> Then, I would prefer to have this either in rte_mempool.h or in
> rte_mempool_handler.h (that would be coherent with the .c)
>   

Now rte_mempool_handler.h


So to synopsise:

rte_mempool.[ch] - mempool create, populate, audit, dump, etc.
rte_mempool_handler.[ch] - handler registration, and fns to call callbacks
rte_mempool_default.c - default internal handlers sp/sc, mp/mc, etc.

custom handler has been moved out to app/test/test_ext_mempool.c

Thanks,
Dave.
  

Patch

diff --git a/app/test/test_mempool_perf.c b/app/test/test_mempool_perf.c
index cdc02a0..091c1df 100644
--- a/app/test/test_mempool_perf.c
+++ b/app/test/test_mempool_perf.c
@@ -161,7 +161,6 @@  per_lcore_mempool_test(__attribute__((unused)) void *arg)
 							   n_get_bulk);
 				if (unlikely(ret < 0)) {
 					rte_mempool_dump(stdout, mp);
-					rte_ring_dump(stdout, mp->ring);
 					/* in this case, objects are lost... */
 					return -1;
 				}
diff --git a/lib/librte_mempool/Makefile b/lib/librte_mempool/Makefile
index a6898ef..7c81ef6 100644
--- a/lib/librte_mempool/Makefile
+++ b/lib/librte_mempool/Makefile
@@ -42,6 +42,7 @@  LIBABIVER := 1
 
 # all source are stored in SRCS-y
 SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_mempool.c
+SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_mempool_default.c
 ifeq ($(CONFIG_RTE_LIBRTE_XEN_DOM0),y)
 SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_dom0_mempool.c
 endif
diff --git a/lib/librte_mempool/rte_mempool.c b/lib/librte_mempool/rte_mempool.c
index aff5f6d..8c01838 100644
--- a/lib/librte_mempool/rte_mempool.c
+++ b/lib/librte_mempool/rte_mempool.c
@@ -59,10 +59,11 @@ 
 #include <rte_spinlock.h>
 
 #include "rte_mempool.h"
+#include "rte_mempool_internal.h"
 
 TAILQ_HEAD(rte_mempool_list, rte_tailq_entry);
 
-static struct rte_tailq_elem rte_mempool_tailq = {
+struct rte_tailq_elem rte_mempool_tailq = {
 	.name = "RTE_MEMPOOL",
 };
 EAL_REGISTER_TAILQ(rte_mempool_tailq)
@@ -149,7 +150,7 @@  mempool_add_elem(struct rte_mempool *mp, void *obj, uint32_t obj_idx,
 		obj_init(mp, obj_init_arg, obj, obj_idx);
 
 	/* enqueue in ring */
-	rte_ring_sp_enqueue(mp->ring, obj);
+	rte_mempool_ext_put_bulk(mp, &obj, 1);
 }
 
 uint32_t
@@ -375,48 +376,28 @@  rte_mempool_xmem_usage(void *vaddr, uint32_t elt_num, size_t elt_sz,
 	return usz;
 }
 
-#ifndef RTE_LIBRTE_XEN_DOM0
-/* stub if DOM0 support not configured */
-struct rte_mempool *
-rte_dom0_mempool_create(const char *name __rte_unused,
-			unsigned n __rte_unused,
-			unsigned elt_size __rte_unused,
-			unsigned cache_size __rte_unused,
-			unsigned private_data_size __rte_unused,
-			rte_mempool_ctor_t *mp_init __rte_unused,
-			void *mp_init_arg __rte_unused,
-			rte_mempool_obj_ctor_t *obj_init __rte_unused,
-			void *obj_init_arg __rte_unused,
-			int socket_id __rte_unused,
-			unsigned flags __rte_unused)
-{
-	rte_errno = EINVAL;
-	return NULL;
-}
-#endif
-
 /* create the mempool */
 struct rte_mempool *
 rte_mempool_create(const char *name, unsigned n, unsigned elt_size,
-		   unsigned cache_size, unsigned private_data_size,
-		   rte_mempool_ctor_t *mp_init, void *mp_init_arg,
-		   rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
-		   int socket_id, unsigned flags)
+			unsigned cache_size, unsigned private_data_size,
+			rte_mempool_ctor_t *mp_init, void *mp_init_arg,
+			rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
+			int socket_id, unsigned flags)
 {
 	if (rte_xen_dom0_supported())
 		return rte_dom0_mempool_create(name, n, elt_size,
-					       cache_size, private_data_size,
-					       mp_init, mp_init_arg,
-					       obj_init, obj_init_arg,
-					       socket_id, flags);
+			cache_size, private_data_size,
+			mp_init, mp_init_arg,
+			obj_init, obj_init_arg,
+			socket_id, flags);
 	else
 		return rte_mempool_xmem_create(name, n, elt_size,
-					       cache_size, private_data_size,
-					       mp_init, mp_init_arg,
-					       obj_init, obj_init_arg,
-					       socket_id, flags,
-					       NULL, NULL, MEMPOOL_PG_NUM_DEFAULT,
-					       MEMPOOL_PG_SHIFT_MAX);
+			cache_size, private_data_size,
+			mp_init, mp_init_arg,
+			obj_init, obj_init_arg,
+			socket_id, flags,
+			NULL, NULL,
+			MEMPOOL_PG_NUM_DEFAULT, MEMPOOL_PG_SHIFT_MAX);
 }
 
 /*
@@ -435,11 +416,9 @@  rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
 		const phys_addr_t paddr[], uint32_t pg_num, uint32_t pg_shift)
 {
 	char mz_name[RTE_MEMZONE_NAMESIZE];
-	char rg_name[RTE_RING_NAMESIZE];
 	struct rte_mempool_list *mempool_list;
 	struct rte_mempool *mp = NULL;
 	struct rte_tailq_entry *te;
-	struct rte_ring *r;
 	const struct rte_memzone *mz;
 	size_t mempool_size;
 	int mz_flags = RTE_MEMZONE_1GB|RTE_MEMZONE_SIZE_HINT_ONLY;
@@ -469,7 +448,7 @@  rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
 
 	/* asked cache too big */
 	if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE ||
-	    CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
+		CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
 		rte_errno = EINVAL;
 		return NULL;
 	}
@@ -502,16 +481,8 @@  rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
 		return NULL;
 	}
 
-	rte_rwlock_write_lock(RTE_EAL_MEMPOOL_RWLOCK);
 
-	/* allocate the ring that will be used to store objects */
-	/* Ring functions will return appropriate errors if we are
-	 * running as a secondary process etc., so no checks made
-	 * in this function for that condition */
-	snprintf(rg_name, sizeof(rg_name), RTE_MEMPOOL_MZ_FORMAT, name);
-	r = rte_ring_create(rg_name, rte_align32pow2(n+1), socket_id, rg_flags);
-	if (r == NULL)
-		goto exit;
+	rte_rwlock_write_lock(RTE_EAL_MEMPOOL_RWLOCK);
 
 	/*
 	 * reserve a memory zone for this mempool: private data is
@@ -588,7 +559,6 @@  rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
 	memset(mp, 0, sizeof(*mp));
 	snprintf(mp->name, sizeof(mp->name), "%s", name);
 	mp->phys_addr = mz->phys_addr;
-	mp->ring = r;
 	mp->size = n;
 	mp->flags = flags;
 	mp->elt_size = objsz.elt_size;
@@ -598,6 +568,22 @@  rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
 	mp->cache_flushthresh = CALC_CACHE_FLUSHTHRESH(cache_size);
 	mp->private_data_size = private_data_size;
 
+	/*
+	 * Since we have 4 combinations of the SP/SC/MP/MC, and stack,
+	 * examine the
+	 * flags to set the correct index into the handler table.
+	 */
+	if (flags & MEMPOOL_F_USE_STACK)
+		mp->handler_idx = rte_get_mempool_handler("stack");
+	else if (flags & (MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET))
+		mp->handler_idx = rte_get_mempool_handler("ring_sp_sc");
+	else if (flags & MEMPOOL_F_SP_PUT)
+		mp->handler_idx = rte_get_mempool_handler("ring_sp_mc");
+	else if (flags & MEMPOOL_F_SC_GET)
+		mp->handler_idx = rte_get_mempool_handler("ring_mp_sc");
+	else
+		mp->handler_idx = rte_get_mempool_handler("ring_mp_mc");
+
 	/* calculate address of the first element for continuous mempool. */
 	obj = (char *)mp + MEMPOOL_HEADER_SIZE(mp, pg_num) +
 		private_data_size;
@@ -613,7 +599,6 @@  rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
 		mp->elt_va_start = (uintptr_t)obj;
 		mp->elt_pa[0] = mp->phys_addr +
 			(mp->elt_va_start - (uintptr_t)mp);
-
 	/* mempool elements in a separate chunk of memory. */
 	} else {
 		mp->elt_va_start = (uintptr_t)vaddr;
@@ -622,6 +607,10 @@  rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
 
 	mp->elt_va_end = mp->elt_va_start;
 
+	/* Parameters are setup. Call the mempool handler alloc */
+	if ((rte_mempool_ext_alloc(mp, name, n, socket_id, flags)) == NULL)
+		goto exit;
+
 	/* call the initializer */
 	if (mp_init)
 		mp_init(mp, mp_init_arg);
@@ -646,7 +635,7 @@  rte_mempool_count(const struct rte_mempool *mp)
 {
 	unsigned count;
 
-	count = rte_ring_count(mp->ring);
+	count = rte_mempool_ext_get_count(mp);
 
 #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
 	{
@@ -681,7 +670,9 @@  rte_mempool_dump_cache(FILE *f, const struct rte_mempool *mp)
 	fprintf(f, "    cache_size=%"PRIu32"\n", mp->cache_size);
 	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
 		cache_count = mp->local_cache[lcore_id].len;
-		fprintf(f, "    cache_count[%u]=%u\n", lcore_id, cache_count);
+		if (cache_count > 0)
+			fprintf(f, "    cache_count[%u]=%u\n",
+						lcore_id, cache_count);
 		count += cache_count;
 	}
 	fprintf(f, "    total_cache_count=%u\n", count);
@@ -802,14 +793,13 @@  rte_mempool_dump(FILE *f, const struct rte_mempool *mp)
 
 	fprintf(f, "mempool <%s>@%p\n", mp->name, mp);
 	fprintf(f, "  flags=%x\n", mp->flags);
-	fprintf(f, "  ring=<%s>@%p\n", mp->ring->name, mp->ring);
 	fprintf(f, "  phys_addr=0x%" PRIx64 "\n", mp->phys_addr);
 	fprintf(f, "  size=%"PRIu32"\n", mp->size);
 	fprintf(f, "  header_size=%"PRIu32"\n", mp->header_size);
 	fprintf(f, "  elt_size=%"PRIu32"\n", mp->elt_size);
 	fprintf(f, "  trailer_size=%"PRIu32"\n", mp->trailer_size);
 	fprintf(f, "  total_obj_size=%"PRIu32"\n",
-	       mp->header_size + mp->elt_size + mp->trailer_size);
+		   mp->header_size + mp->elt_size + mp->trailer_size);
 
 	fprintf(f, "  private_data_size=%"PRIu32"\n", mp->private_data_size);
 	fprintf(f, "  pg_num=%"PRIu32"\n", mp->pg_num);
@@ -825,7 +815,7 @@  rte_mempool_dump(FILE *f, const struct rte_mempool *mp)
 			mp->size);
 
 	cache_count = rte_mempool_dump_cache(f, mp);
-	common_count = rte_ring_count(mp->ring);
+	common_count = /* rte_ring_count(mp->ring)*/0;
 	if ((cache_count + common_count) > mp->size)
 		common_count = mp->size - cache_count;
 	fprintf(f, "  common_pool_count=%u\n", common_count);
@@ -904,7 +894,7 @@  rte_mempool_lookup(const char *name)
 }
 
 void rte_mempool_walk(void (*func)(const struct rte_mempool *, void *),
-		      void *arg)
+			  void *arg)
 {
 	struct rte_tailq_entry *te = NULL;
 	struct rte_mempool_list *mempool_list;
@@ -919,3 +909,111 @@  void rte_mempool_walk(void (*func)(const struct rte_mempool *, void *),
 
 	rte_rwlock_read_unlock(RTE_EAL_MEMPOOL_RWLOCK);
 }
+
+
+/* create the mempool using and external mempool manager */
+struct rte_mempool *
+rte_mempool_create_ext(const char *name, unsigned n, unsigned elt_size,
+			unsigned cache_size, unsigned private_data_size,
+			rte_mempool_ctor_t *mp_init, void *mp_init_arg,
+			rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
+			int socket_id, unsigned flags,
+			const char *handler_name)
+{
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+	struct rte_mempool_list *mempool_list;
+	struct rte_mempool *mp = NULL;
+	struct rte_tailq_entry *te;
+	const struct rte_memzone *mz;
+	size_t mempool_size;
+	int mz_flags = RTE_MEMZONE_1GB|RTE_MEMZONE_SIZE_HINT_ONLY;
+	int rg_flags = 0;
+	int16_t handler_idx;
+
+	mempool_list = RTE_TAILQ_CAST(rte_mempool_tailq.head, rte_mempool_list);
+
+	/* asked cache too big */
+	if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE ||
+		CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
+		rte_errno = EINVAL;
+		return NULL;
+	}
+
+	handler_idx = rte_get_mempool_handler(handler_name);
+	if (handler_idx < 0) {
+		RTE_LOG(ERR, MEMPOOL, "Cannot find mempool handler by name!\n");
+		goto exit;
+	}
+
+	/* ring flags */
+	if (flags & MEMPOOL_F_SP_PUT)
+		rg_flags |= RING_F_SP_ENQ;
+	if (flags & MEMPOOL_F_SC_GET)
+		rg_flags |= RING_F_SC_DEQ;
+
+	rte_rwlock_write_lock(RTE_EAL_MEMPOOL_RWLOCK);
+
+	/*
+	 * reserve a memory zone for this mempool: private data is
+	 * cache-aligned
+	 */
+	private_data_size = RTE_ALIGN_CEIL(private_data_size,
+							RTE_MEMPOOL_ALIGN);
+
+	/* try to allocate tailq entry */
+	te = rte_zmalloc("MEMPOOL_TAILQ_ENTRY", sizeof(*te), 0);
+	if (te == NULL) {
+		RTE_LOG(ERR, MEMPOOL, "Cannot allocate tailq entry!\n");
+		goto exit;
+	}
+
+	/*
+	 * If user provided an external memory buffer, then use it to
+	 * store mempool objects. Otherwise reserve a memzone that is large
+	 * enough to hold mempool header and metadata plus mempool objects.
+	 */
+	mempool_size = sizeof(*mp) + private_data_size;
+	mempool_size = RTE_ALIGN_CEIL(mempool_size, RTE_MEMPOOL_ALIGN);
+
+	snprintf(mz_name, sizeof(mz_name), RTE_MEMPOOL_MZ_FORMAT, name);
+
+	mz = rte_memzone_reserve(mz_name, mempool_size, socket_id, mz_flags);
+
+	/* no more memory */
+	if (mz == NULL) {
+		rte_free(te);
+		goto exit;
+	}
+
+	/* init the mempool structure */
+	mp = mz->addr;
+	memset(mp, 0, sizeof(*mp));
+	snprintf(mp->name, sizeof(mp->name), "%s", name);
+	mp->phys_addr = mz->phys_addr;
+	mp->size = n;
+	mp->flags = flags;
+	mp->cache_size = cache_size;
+	mp->cache_flushthresh = CALC_CACHE_FLUSHTHRESH(cache_size);
+	mp->private_data_size = private_data_size;
+	mp->handler_idx = handler_idx;
+	mp->elt_size = elt_size;
+	mp->rt_pool = rte_mempool_ext_alloc(mp, name, n, socket_id, flags);
+
+	/* call the initializer */
+	if (mp_init)
+		mp_init(mp, mp_init_arg);
+
+	mempool_populate(mp, n, 1, obj_init, obj_init_arg);
+
+	te->data = (void *) mp;
+
+	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+	TAILQ_INSERT_TAIL(mempool_list, te, next);
+	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+exit:
+	rte_rwlock_write_unlock(RTE_EAL_MEMPOOL_RWLOCK);
+
+	return mp;
+
+}
diff --git a/lib/librte_mempool/rte_mempool.h b/lib/librte_mempool/rte_mempool.h
index 6e2390a..620cfb7 100644
--- a/lib/librte_mempool/rte_mempool.h
+++ b/lib/librte_mempool/rte_mempool.h
@@ -88,6 +88,8 @@  extern "C" {
 struct rte_mempool_debug_stats {
 	uint64_t put_bulk;         /**< Number of puts. */
 	uint64_t put_objs;         /**< Number of objects successfully put. */
+	uint64_t put_pool_bulk;    /**< Number of puts into pool. */
+	uint64_t put_pool_objs;    /**< Number of objects into pool. */
 	uint64_t get_success_bulk; /**< Successful allocation number. */
 	uint64_t get_success_objs; /**< Objects successfully allocated. */
 	uint64_t get_fail_bulk;    /**< Failed allocation number. */
@@ -123,6 +125,7 @@  struct rte_mempool_objsz {
 #define RTE_MEMPOOL_NAMESIZE 32 /**< Maximum length of a memory pool. */
 #define RTE_MEMPOOL_MZ_PREFIX "MP_"
 
+
 /* "MP_<name>" */
 #define	RTE_MEMPOOL_MZ_FORMAT	RTE_MEMPOOL_MZ_PREFIX "%s"
 
@@ -175,12 +178,85 @@  struct rte_mempool_objtlr {
 #endif
 };
 
+/* Handler functions for external mempool support */
+typedef void *(*rte_mempool_alloc_t)(struct rte_mempool *mp,
+		const char *name, unsigned n, int socket_id, unsigned flags);
+typedef int (*rte_mempool_put_t)(void *p,
+		void * const *obj_table, unsigned n);
+typedef int (*rte_mempool_get_t)(void *p, void **obj_table,
+		unsigned n);
+typedef unsigned (*rte_mempool_get_count)(void *p);
+typedef int(*rte_mempool_free_t)(struct rte_mempool *mp);
+
+/**
+ * @internal wrapper for external mempool manager alloc callback.
+ *
+ * @param mp
+ *   Pointer to the memory pool.
+ * @param name
+ *   Name of the statistics field to increment in the memory pool.
+ * @param n
+ *   Number to add to the object-oriented statistics.
+ * @param socket_id
+ *   socket id on which to allocate.
+ * @param flags
+ *   general flags to allocate function
+ */
+void *
+rte_mempool_ext_alloc(struct rte_mempool *mp,
+		const char *name, unsigned n, int socket_id, unsigned flags);
+
+/**
+ * @internal wrapper for external mempool manager get callback.
+ *
+ * @param mp
+ *   Pointer to the memory pool.
+ * @param obj_table
+ *   Pointer to a table of void * pointers (objects).
+ * @param n
+ *	 Number of objects to get
+ */
+int
+rte_mempool_ext_get_bulk(struct rte_mempool *mp, void **obj_table,
+		unsigned n);
+
+/**
+ * @internal wrapper for external mempool manager put callback.
+ *
+ * @param mp
+ *   Pointer to the memory pool.
+ * @param obj_table
+ *   Pointer to a table of void * pointers (objects).
+ * @param n
+ *   Number of objects to put
+ */
+int
+rte_mempool_ext_put_bulk(struct rte_mempool *mp, void * const *obj_table,
+		unsigned n);
+
+/**
+ * @internal wrapper for external mempool manager get_count callback.
+ *
+ * @param mp
+ *   Pointer to the memory pool.
+ */
+int
+rte_mempool_ext_get_count(const struct rte_mempool *mp);
+
+/**
+ * @internal wrapper for external mempool manager free callback.
+ *
+ * @param mp
+ *   Pointer to the memory pool.
+ */
+int
+rte_mempool_ext_free(struct rte_mempool *mp);
+
 /**
  * The RTE mempool structure.
  */
 struct rte_mempool {
 	char name[RTE_MEMPOOL_NAMESIZE]; /**< Name of mempool. */
-	struct rte_ring *ring;           /**< Ring to store objects. */
 	phys_addr_t phys_addr;           /**< Phys. addr. of mempool struct. */
 	int flags;                       /**< Flags of the mempool. */
 	uint32_t size;                   /**< Size of the mempool. */
@@ -194,6 +270,11 @@  struct rte_mempool {
 
 	unsigned private_data_size;      /**< Size of private data. */
 
+	/* Common pool data structure pointer */
+	void *rt_pool __rte_cache_aligned;
+
+	int16_t handler_idx;
+
 #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
 	/** Per-lcore local cache. */
 	struct rte_mempool_cache local_cache[RTE_MAX_LCORE];
@@ -223,6 +304,10 @@  struct rte_mempool {
 #define MEMPOOL_F_NO_CACHE_ALIGN 0x0002 /**< Do not align objs on cache lines.*/
 #define MEMPOOL_F_SP_PUT         0x0004 /**< Default put is "single-producer".*/
 #define MEMPOOL_F_SC_GET         0x0008 /**< Default get is "single-consumer".*/
+#define MEMPOOL_F_USE_STACK      0x0010 /**< Use a stack for the common pool. */
+#define MEMPOOL_F_USE_TM         0x0020
+#define MEMPOOL_F_NO_SECONDARY   0x0040
+
 
 /**
  * @internal When debug is enabled, store some statistics.
@@ -728,7 +813,6 @@  rte_dom0_mempool_create(const char *name, unsigned n, unsigned elt_size,
 		rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
 		int socket_id, unsigned flags);
 
-
 /**
  * Dump the status of the mempool to the console.
  *
@@ -753,7 +837,7 @@  void rte_mempool_dump(FILE *f, const struct rte_mempool *mp);
  */
 static inline void __attribute__((always_inline))
 __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
-		    unsigned n, int is_mp)
+		    unsigned n, __attribute__((unused)) int is_mp)
 {
 #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
 	struct rte_mempool_cache *cache;
@@ -769,8 +853,7 @@  __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
 
 #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
 	/* cache is not enabled or single producer or non-EAL thread */
-	if (unlikely(cache_size == 0 || is_mp == 0 ||
-		     lcore_id >= RTE_MAX_LCORE))
+	if (unlikely(cache_size == 0 || lcore_id >= RTE_MAX_LCORE))
 		goto ring_enqueue;
 
 	/* Go straight to ring if put would overflow mem allocated for cache */
@@ -793,8 +876,8 @@  __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
 
 	cache->len += n;
 
-	if (cache->len >= flushthresh) {
-		rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache_size],
+	if (unlikely(cache->len >= flushthresh)) {
+		rte_mempool_ext_put_bulk(mp, &cache->objs[cache_size],
 				cache->len - cache_size);
 		cache->len = cache_size;
 	}
@@ -804,22 +887,10 @@  __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
 ring_enqueue:
 #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
 
-	/* push remaining objects in ring */
-#ifdef RTE_LIBRTE_MEMPOOL_DEBUG
-	if (is_mp) {
-		if (rte_ring_mp_enqueue_bulk(mp->ring, obj_table, n) < 0)
-			rte_panic("cannot put objects in mempool\n");
-	}
-	else {
-		if (rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n) < 0)
-			rte_panic("cannot put objects in mempool\n");
-	}
-#else
-	if (is_mp)
-		rte_ring_mp_enqueue_bulk(mp->ring, obj_table, n);
-	else
-		rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n);
-#endif
+	/* Increment stats counter to tell us how many pool puts happened */
+	__MEMPOOL_STAT_ADD(mp, put_pool, n);
+
+	rte_mempool_ext_put_bulk(mp, obj_table, n);
 }
 
 
@@ -943,7 +1014,7 @@  rte_mempool_put(struct rte_mempool *mp, void *obj)
  */
 static inline int __attribute__((always_inline))
 __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
-		   unsigned n, int is_mc)
+		   unsigned n, __attribute__((unused))int is_mc)
 {
 	int ret;
 #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
@@ -954,8 +1025,8 @@  __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
 	uint32_t cache_size = mp->cache_size;
 
 	/* cache is not enabled or single consumer */
-	if (unlikely(cache_size == 0 || is_mc == 0 ||
-		     n >= cache_size || lcore_id >= RTE_MAX_LCORE))
+	if (unlikely(cache_size == 0 || n >= cache_size ||
+						lcore_id >= RTE_MAX_LCORE))
 		goto ring_dequeue;
 
 	cache = &mp->local_cache[lcore_id];
@@ -967,7 +1038,8 @@  __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
 		uint32_t req = n + (cache_size - cache->len);
 
 		/* How many do we require i.e. number to fill the cache + the request */
-		ret = rte_ring_mc_dequeue_bulk(mp->ring, &cache->objs[cache->len], req);
+		ret = rte_mempool_ext_get_bulk(mp,
+						&cache->objs[cache->len], req);
 		if (unlikely(ret < 0)) {
 			/*
 			 * In the offchance that we are buffer constrained,
@@ -995,10 +1067,7 @@  ring_dequeue:
 #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
 
 	/* get remaining objects from ring */
-	if (is_mc)
-		ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, n);
-	else
-		ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);
+	ret = rte_mempool_ext_get_bulk(mp, obj_table, n);
 
 	if (ret < 0)
 		__MEMPOOL_STAT_ADD(mp, get_fail, n);
@@ -1401,6 +1470,82 @@  ssize_t rte_mempool_xmem_usage(void *vaddr, uint32_t elt_num, size_t elt_sz,
 void rte_mempool_walk(void (*func)(const struct rte_mempool *, void *arg),
 		      void *arg);
 
+/**
+ * Function to get an index to an external mempool manager
+ *
+ * @param name
+ *   The name of the mempool handler to search for in the list of handlers
+ * @return
+ *   The index of the mempool handler in the list of registered mempool
+ *   handlers
+ */
+int16_t
+rte_get_mempool_handler(const char *name);
+
+
+/**
+ * Create a new mempool named *name* in memory.
+ *
+ * This function uses an externally defined alloc callback to allocate memory.
+ * Its size is set to n elements.
+ * All elements of the mempool are allocated separately to the mempool header.
+ *
+ * @param name
+ *   The name of the mempool.
+ * @param n
+ *   The number of elements in the mempool. The optimum size (in terms of
+ *   memory usage) for a mempool is when n is a power of two minus one:
+ *   n = (2^q - 1).
+ * @param cache_size
+ *   If cache_size is non-zero, the rte_mempool library will try to
+ *   limit the accesses to the common lockless pool, by maintaining a
+ *   per-lcore object cache. This argument must be lower or equal to
+ *   CONFIG_RTE_MEMPOOL_CACHE_MAX_SIZE and n / 1.5. It is advised to choose
+ *   cache_size to have "n modulo cache_size == 0": if this is
+ *   not the case, some elements will always stay in the pool and will
+ *   never be used. The access to the per-lcore table is of course
+ *   faster than the multi-producer/consumer pool. The cache can be
+ *   disabled if the cache_size argument is set to 0; it can be useful to
+ *   avoid losing objects in cache. Note that even if not used, the
+ *   memory space for cache is always reserved in a mempool structure,
+ *   except if CONFIG_RTE_MEMPOOL_CACHE_MAX_SIZE is set to 0.
+ * @param private_data_size
+ *   The size of the private data appended after the mempool
+ *   structure. This is useful for storing some private data after the
+ *   mempool structure, as is done for rte_mbuf_pool for example.
+ * @param mp_init
+ *   A function pointer that is called for initialization of the pool,
+ *   before object initialization. The user can initialize the private
+ *   data in this function if needed. This parameter can be NULL if
+ *   not needed.
+ * @param mp_init_arg
+ *   An opaque pointer to data that can be used in the mempool
+ *   constructor function.
+ * @param obj_init
+ *   A function pointer that is called for each object at
+ *   initialization of the pool. The user can set some meta data in
+ *   objects if needed. This parameter can be NULL if not needed.
+ *   The obj_init() function takes the mempool pointer, the init_arg,
+ *   the object pointer and the object number as parameters.
+ * @param obj_init_arg
+ *   An opaque pointer to data that can be used as an argument for
+ *   each call to the object constructor function.
+ * @param socket_id
+ *   The *socket_id* argument is the socket identifier in the case of
+ *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
+ *   constraint for the reserved zone.
+ * @param flags
+ * @return
+ *   The pointer to the new allocated mempool, on success. NULL on error
+ */
+struct rte_mempool *
+rte_mempool_create_ext(const char *name, unsigned n, unsigned elt_size,
+		unsigned cache_size, unsigned private_data_size,
+		rte_mempool_ctor_t *mp_init, void *mp_init_arg,
+		rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
+		int socket_id, unsigned flags,
+		const char *handler_name);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/lib/librte_mempool/rte_mempool_default.c b/lib/librte_mempool/rte_mempool_default.c
new file mode 100644
index 0000000..2493dc1
--- /dev/null
+++ b/lib/librte_mempool/rte_mempool_default.c
@@ -0,0 +1,229 @@ 
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <rte_mempool.h>
+#include <rte_malloc.h>
+#include <string.h>
+
+#include "rte_mempool_internal.h"
+
+/*
+ * Indirect jump table to support external memory pools
+ */
+struct rte_mempool_handler_list mempool_handler_list = {
+	.sl =  RTE_SPINLOCK_INITIALIZER ,
+	.num_handlers = 0
+};
+
+/* TODO Convert to older mechanism of an array of stucts */
+int16_t
+add_handler(struct rte_mempool_handler *h)
+{
+	int16_t handler_idx;
+
+	/*  */
+	rte_spinlock_lock(&mempool_handler_list.sl);
+
+	/* Check whether jump table has space */
+	if (mempool_handler_list.num_handlers >= RTE_MEMPOOL_MAX_HANDLER_IDX) {
+		rte_spinlock_unlock(&mempool_handler_list.sl);
+		RTE_LOG(ERR, MEMPOOL,
+				"Maximum number of mempool handlers exceeded\n");
+		return -1;
+	}
+
+	if ((h->put == NULL) || (h->get == NULL) ||
+		(h->get_count == NULL)) {
+		rte_spinlock_unlock(&mempool_handler_list.sl);
+		 RTE_LOG(ERR, MEMPOOL,
+					"Missing callback while registering mempool handler\n");
+		return -1;
+	}
+
+	/* add new handler index */
+	handler_idx = mempool_handler_list.num_handlers++;
+
+	snprintf(mempool_handler_list.handler[handler_idx].name,
+				RTE_MEMPOOL_NAMESIZE, "%s", h->name);
+	mempool_handler_list.handler[handler_idx].alloc = h->alloc;
+	mempool_handler_list.handler[handler_idx].put = h->put;
+	mempool_handler_list.handler[handler_idx].get = h->get;
+	mempool_handler_list.handler[handler_idx].get_count = h->get_count;
+
+	rte_spinlock_unlock(&mempool_handler_list.sl);
+
+	return handler_idx;
+}
+
+/* TODO Convert to older mechanism of an array of stucts */
+int16_t
+rte_get_mempool_handler(const char *name)
+{
+	int16_t i;
+
+	for (i = 0; i < mempool_handler_list.num_handlers; i++) {
+		if (!strcmp(name, mempool_handler_list.handler[i].name))
+			return i;
+	}
+	return -1;
+}
+
+static int
+common_ring_mp_put(void *p, void * const *obj_table, unsigned n)
+{
+	return rte_ring_mp_enqueue_bulk((struct rte_ring *)p, obj_table, n);
+}
+
+static int
+common_ring_sp_put(void *p, void * const *obj_table, unsigned n)
+{
+	return rte_ring_sp_enqueue_bulk((struct rte_ring *)p, obj_table, n);
+}
+
+static int
+common_ring_mc_get(void *p, void **obj_table, unsigned n)
+{
+	return rte_ring_mc_dequeue_bulk((struct rte_ring *)p, obj_table, n);
+}
+
+static int
+common_ring_sc_get(void *p, void **obj_table, unsigned n)
+{
+	return rte_ring_sc_dequeue_bulk((struct rte_ring *)p, obj_table, n);
+}
+
+static unsigned
+common_ring_get_count(void *p)
+{
+	return rte_ring_count((struct rte_ring *)p);
+}
+
+
+static void *
+rte_mempool_common_ring_alloc(struct rte_mempool *mp,
+		const char *name, unsigned n, int socket_id, unsigned flags)
+{
+	struct rte_ring *r;
+	char rg_name[RTE_RING_NAMESIZE];
+	int rg_flags = 0;
+
+	if (flags & MEMPOOL_F_SP_PUT)
+		rg_flags |= RING_F_SP_ENQ;
+	if (flags & MEMPOOL_F_SC_GET)
+		rg_flags |= RING_F_SC_DEQ;
+
+	/* allocate the ring that will be used to store objects */
+	/* Ring functions will return appropriate errors if we are
+	 * running as a secondary process etc., so no checks made
+	 * in this function for that condition */
+	snprintf(rg_name, sizeof(rg_name), "%s-ring", name);
+	r = rte_ring_create(rg_name, rte_align32pow2(n+1), socket_id, rg_flags);
+	if (r == NULL)
+		return NULL;
+
+	mp->rt_pool = (void *)r;
+
+	return (void *) r;
+}
+
+void *
+rte_mempool_ext_alloc(struct rte_mempool *mp,
+		const char *name, unsigned n, int socket_id, unsigned flags)
+{
+	if (mempool_handler_list.handler[mp->handler_idx].alloc) {
+		return (mempool_handler_list.handler[mp->handler_idx].alloc)
+						(mp, name, n, socket_id, flags);
+	}
+	return NULL;
+}
+
+inline int __attribute__((always_inline))
+rte_mempool_ext_get_bulk(struct rte_mempool *mp, void **obj_table, unsigned n)
+{
+	return (mempool_handler_list.handler[mp->handler_idx].get)
+						(mp->rt_pool, obj_table, n);
+}
+
+inline int __attribute__((always_inline))
+rte_mempool_ext_put_bulk(struct rte_mempool *mp, void * const *obj_table,
+		unsigned n)
+{
+	return (mempool_handler_list.handler[mp->handler_idx].put)
+						(mp->rt_pool, obj_table, n);
+}
+
+int
+rte_mempool_ext_get_count(const struct rte_mempool *mp)
+{
+	return (mempool_handler_list.handler[mp->handler_idx].get_count)
+						(mp->rt_pool);
+}
+
+static struct rte_mempool_handler handler_mp_mc = {
+	.name = "ring_mp_mc",
+	.alloc = rte_mempool_common_ring_alloc,
+	.put = common_ring_mp_put,
+	.get = common_ring_mc_get,
+	.get_count = common_ring_get_count,
+	.free = NULL
+};
+static struct rte_mempool_handler handler_sp_sc = {
+	.name = "ring_sp_sc",
+	.alloc = rte_mempool_common_ring_alloc,
+	.put = common_ring_sp_put,
+	.get = common_ring_sc_get,
+	.get_count = common_ring_get_count,
+	.free = NULL
+};
+static struct rte_mempool_handler handler_mp_sc = {
+	.name = "ring_mp_sc",
+	.alloc = rte_mempool_common_ring_alloc,
+	.put = common_ring_mp_put,
+	.get = common_ring_sc_get,
+	.get_count = common_ring_get_count,
+	.free = NULL
+};
+static struct rte_mempool_handler handler_sp_mc = {
+	.name = "ring_sp_mc",
+	.alloc = rte_mempool_common_ring_alloc,
+	.put = common_ring_sp_put,
+	.get = common_ring_mc_get,
+	.get_count = common_ring_get_count,
+	.free = NULL
+};
+
+REGISTER_MEMPOOL_HANDLER(handler_mp_mc);
+REGISTER_MEMPOOL_HANDLER(handler_sp_sc);
+REGISTER_MEMPOOL_HANDLER(handler_mp_sc);
+REGISTER_MEMPOOL_HANDLER(handler_sp_mc);
diff --git a/lib/librte_mempool/rte_mempool_internal.h b/lib/librte_mempool/rte_mempool_internal.h
new file mode 100644
index 0000000..92b7bde
--- /dev/null
+++ b/lib/librte_mempool/rte_mempool_internal.h
@@ -0,0 +1,74 @@ 
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_MEMPOOL_INTERNAL_H_
+#define _RTE_MEMPOOL_INTERNAL_H_
+
+#include <rte_spinlock.h>
+#include <rte_mempool.h>
+
+#define RTE_MEMPOOL_MAX_HANDLER_IDX 16
+
+struct rte_mempool_handler {
+	char name[RTE_MEMPOOL_NAMESIZE]; /**< Name of mempool handler */
+
+	rte_mempool_alloc_t alloc;
+
+	rte_mempool_put_t put __rte_cache_aligned;
+
+	rte_mempool_get_t get __rte_cache_aligned;
+
+	rte_mempool_get_count get_count __rte_cache_aligned;
+
+	rte_mempool_free_t free __rte_cache_aligned;
+};
+
+struct rte_mempool_handler_list {
+	rte_spinlock_t sl;		  /**< Spinlock for add/delete. */
+
+	int32_t num_handlers;	  /**< Number of handlers that are valid. */
+
+	/* storage for all possible handlers */
+	struct rte_mempool_handler handler[RTE_MEMPOOL_MAX_HANDLER_IDX];
+};
+
+int16_t add_handler(struct rte_mempool_handler *h);
+
+#define REGISTER_MEMPOOL_HANDLER(h) \
+static int16_t __attribute__((used)) testfn_##h(void);\
+int16_t __attribute__((constructor, used)) testfn_##h(void)\
+{\
+	return add_handler(&h);\
+}
+
+#endif