[dpdk-dev,v6,1/5] mempool: support external handler

Message ID 1464797998-76690-2-git-send-email-david.hunt@intel.com (mailing list archive)
State Superseded, archived
Headers

Commit Message

Hunt, David June 1, 2016, 4:19 p.m. UTC
  Until now, the objects stored in a mempool were internally stored in a
ring. This patch introduces the possibility to register external handlers
replacing the ring.

The default behavior remains unchanged, but calling the new function
rte_mempool_set_handler() right after rte_mempool_create_empty() allows
the user to change the handler that will be used when populating
the mempool.

v7 changes:
  * Moved the flags handling from rte_mempool_create_empty to
    rte_mempool_create, as it's only there for backward compatibility
  * Various comment additions and cleanup
  * Renamed rte_mempool_handler to rte_mempool_ops
  * Added a union for *pool and u64 pool_id in struct rte_mempool

v6 changes:
  * split the original patch into a few parts for easier review.
  * rename functions with _ext_ to _ops_.
  * addressed some review comments
  * renamed put and get functions to enqueue and dequeue
  * renamed rte_mempool_handler struct to rte_mempool_handler_ops
  * changed occurences of rte_mempool_handler_ops to const, as they
    contain function pointers (security)
  * added some extra comments

v5 changes: rebasing on top of 35 patch set mempool work.

Signed-off-by: Olivier Matz <olivier.matz@6wind.com>
Signed-off-by: David Hunt <david.hunt@intel.com>
---
 lib/librte_mempool/Makefile              |   1 +
 lib/librte_mempool/rte_mempool.c         |  71 ++++------
 lib/librte_mempool/rte_mempool.h         | 235 ++++++++++++++++++++++++++++---
 lib/librte_mempool/rte_mempool_handler.c | 141 +++++++++++++++++++
 4 files changed, 384 insertions(+), 64 deletions(-)
 create mode 100644 lib/librte_mempool/rte_mempool_handler.c
  

Comments

Hunt, David June 1, 2016, 4:29 p.m. UTC | #1
On 6/1/2016 5:19 PM, David Hunt wrote:
> Until now, the objects stored in a mempool were internally stored in a
> ring. This patch introduces the possibility to register external handlers
> replacing the ring.
>
> The default behavior remains unchanged, but calling the new function
> rte_mempool_set_handler() right after rte_mempool_create_empty() allows
> the user to change the handler that will be used when populating
> the mempool.
>
> v7 changes:
>    * Moved the flags handling from rte_mempool_create_empty to
>      rte_mempool_create, as it's only there for backward compatibility
>    * Various comment additions and cleanup
>    * Renamed rte_mempool_handler to rte_mempool_ops
>    * Added a union for *pool and u64 pool_id in struct rte_mempool

These v7 changes should me merged with the v6 changes below as this is a 
v6 patch.
Or removed altogether, as they are in the cover letter.

> v6 changes:
>    * split the original patch into a few parts for easier review.
>    * rename functions with _ext_ to _ops_.
>    * addressed some review comments
>    * renamed put and get functions to enqueue and dequeue
>    * renamed rte_mempool_handler struct to rte_mempool_handler_ops
>    * changed occurences of rte_mempool_handler_ops to const, as they
>      contain function pointers (security)
>    * added some extra comments
>
>

[...]
  
Jan Viktorin June 1, 2016, 5:54 p.m. UTC | #2
Hello David,

the rename s/handler/ops/ has a lot of residues. Sorry for that :). I tried to
mark most of them. Otherwise, I couldn't see many more serious issues for now.

Just, note the s/pool/priv/ rename suggestion.

On Wed,  1 Jun 2016 17:19:54 +0100
David Hunt <david.hunt@intel.com> wrote:

> Until now, the objects stored in a mempool were internally stored in a
> ring. This patch introduces the possibility to register external handlers
> replacing the ring.
> 
> The default behavior remains unchanged, but calling the new function
> rte_mempool_set_handler() right after rte_mempool_create_empty() allows
> the user to change the handler that will be used when populating
> the mempool.
> 
> v7 changes:
>   * Moved the flags handling from rte_mempool_create_empty to
>     rte_mempool_create, as it's only there for backward compatibility
>   * Various comment additions and cleanup
>   * Renamed rte_mempool_handler to rte_mempool_ops
>   * Added a union for *pool and u64 pool_id in struct rte_mempool
> 
> v6 changes:
>   * split the original patch into a few parts for easier review.
>   * rename functions with _ext_ to _ops_.
>   * addressed some review comments
>   * renamed put and get functions to enqueue and dequeue
>   * renamed rte_mempool_handler struct to rte_mempool_handler_ops
>   * changed occurences of rte_mempool_handler_ops to const, as they
>     contain function pointers (security)
>   * added some extra comments
> 
> v5 changes: rebasing on top of 35 patch set mempool work.
> 
> Signed-off-by: Olivier Matz <olivier.matz@6wind.com>
> Signed-off-by: David Hunt <david.hunt@intel.com>
> ---
>  lib/librte_mempool/Makefile              |   1 +
>  lib/librte_mempool/rte_mempool.c         |  71 ++++------
>  lib/librte_mempool/rte_mempool.h         | 235 ++++++++++++++++++++++++++++---
>  lib/librte_mempool/rte_mempool_handler.c | 141 +++++++++++++++++++
>  4 files changed, 384 insertions(+), 64 deletions(-)
>  create mode 100644 lib/librte_mempool/rte_mempool_handler.c
> 
> diff --git a/lib/librte_mempool/Makefile b/lib/librte_mempool/Makefile
> index 43423e0..793745f 100644
> --- a/lib/librte_mempool/Makefile
> +++ b/lib/librte_mempool/Makefile
> @@ -42,6 +42,7 @@ LIBABIVER := 2
>  
>  # all source are stored in SRCS-y
>  SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_mempool.c
> +SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_mempool_handler.c
>  # install includes
>  SYMLINK-$(CONFIG_RTE_LIBRTE_MEMPOOL)-include := rte_mempool.h
>  
> diff --git a/lib/librte_mempool/rte_mempool.c b/lib/librte_mempool/rte_mempool.c
> index b54de43..9f34d30 100644
> --- a/lib/librte_mempool/rte_mempool.c
> +++ b/lib/librte_mempool/rte_mempool.c
> @@ -148,7 +148,7 @@ mempool_add_elem(struct rte_mempool *mp, void *obj, phys_addr_t physaddr)
>  #endif
>  
>  	/* enqueue in ring */
> -	rte_ring_sp_enqueue(mp->ring, obj);
> +	rte_mempool_ops_enqueue_bulk(mp, &obj, 1);
>  }
>  
>  /* call obj_cb() for each mempool element */
> @@ -303,40 +303,6 @@ rte_mempool_xmem_usage(__rte_unused void *vaddr, uint32_t elt_num,
>  	return (size_t)paddr_idx << pg_shift;
>  }
>  
> -/* create the internal ring */
> -static int
> -rte_mempool_ring_create(struct rte_mempool *mp)
> -{
> -	int rg_flags = 0, ret;
> -	char rg_name[RTE_RING_NAMESIZE];
> -	struct rte_ring *r;
> -
> -	ret = snprintf(rg_name, sizeof(rg_name),
> -		RTE_MEMPOOL_MZ_FORMAT, mp->name);
> -	if (ret < 0 || ret >= (int)sizeof(rg_name))
> -		return -ENAMETOOLONG;
> -
> -	/* ring flags */
> -	if (mp->flags & MEMPOOL_F_SP_PUT)
> -		rg_flags |= RING_F_SP_ENQ;
> -	if (mp->flags & MEMPOOL_F_SC_GET)
> -		rg_flags |= RING_F_SC_DEQ;
> -
> -	/* Allocate the ring that will be used to store objects.
> -	 * Ring functions will return appropriate errors if we are
> -	 * running as a secondary process etc., so no checks made
> -	 * in this function for that condition.
> -	 */
> -	r = rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
> -		mp->socket_id, rg_flags);
> -	if (r == NULL)
> -		return -rte_errno;
> -
> -	mp->ring = r;
> -	mp->flags |= MEMPOOL_F_RING_CREATED;
> -	return 0;
> -}
> -
>  /* free a memchunk allocated with rte_memzone_reserve() */
>  static void
>  rte_mempool_memchunk_mz_free(__rte_unused struct rte_mempool_memhdr *memhdr,
> @@ -354,7 +320,7 @@ rte_mempool_free_memchunks(struct rte_mempool *mp)
>  	void *elt;
>  
>  	while (!STAILQ_EMPTY(&mp->elt_list)) {
> -		rte_ring_sc_dequeue(mp->ring, &elt);
> +		rte_mempool_ops_dequeue_bulk(mp, &elt, 1);
>  		(void)elt;
>  		STAILQ_REMOVE_HEAD(&mp->elt_list, next);
>  		mp->populated_size--;
> @@ -383,13 +349,16 @@ rte_mempool_populate_phys(struct rte_mempool *mp, char *vaddr,
>  	unsigned i = 0;
>  	size_t off;
>  	struct rte_mempool_memhdr *memhdr;
> -	int ret;
>  
>  	/* create the internal ring if not already done */
>  	if ((mp->flags & MEMPOOL_F_RING_CREATED) == 0) {
> -		ret = rte_mempool_ring_create(mp);
> -		if (ret < 0)
> -			return ret;
> +		rte_errno = 0;
> +		mp->pool = rte_mempool_ops_alloc(mp);
> +		if (mp->pool == NULL) {
> +			if (rte_errno == 0)
> +				return -EINVAL;
> +			return -rte_errno;

Are you sure the rte_errno is a positive value?

> +		}
>  	}
>  
>  	/* mempool is already populated */
> @@ -703,7 +672,7 @@ rte_mempool_free(struct rte_mempool *mp)
>  	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
>  
>  	rte_mempool_free_memchunks(mp);
> -	rte_ring_free(mp->ring);
> +	rte_mempool_ops_free(mp);
>  	rte_memzone_free(mp->mz);
>  }
>  
> @@ -815,6 +784,20 @@ rte_mempool_create_empty(const char *name, unsigned n, unsigned elt_size,
>  		RTE_PTR_ADD(mp, MEMPOOL_HEADER_SIZE(mp, 0));
>  
>  	te->data = mp;
> +
> +	/*
> +	 * Since we have 4 combinations of the SP/SC/MP/MC examine the flags to
> +	 * set the correct index into the handler table.
> +	 */
> +	if (flags & (MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET))
> +		rte_mempool_set_handler(mp, "ring_sp_sc");
> +	else if (flags & MEMPOOL_F_SP_PUT)
> +		rte_mempool_set_handler(mp, "ring_sp_mc");
> +	else if (flags & MEMPOOL_F_SC_GET)
> +		rte_mempool_set_handler(mp, "ring_mp_sc");
> +	else
> +		rte_mempool_set_handler(mp, "ring_mp_mc");
> +
>  	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
>  	TAILQ_INSERT_TAIL(mempool_list, te, next);
>  	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
> @@ -930,7 +913,7 @@ rte_mempool_count(const struct rte_mempool *mp)
>  	unsigned count;
>  	unsigned lcore_id;
>  
> -	count = rte_ring_count(mp->ring);
> +	count = rte_mempool_ops_get_count(mp);
>  
>  	if (mp->cache_size == 0)
>  		return count;
> @@ -1123,7 +1106,7 @@ rte_mempool_dump(FILE *f, struct rte_mempool *mp)
>  
>  	fprintf(f, "mempool <%s>@%p\n", mp->name, mp);
>  	fprintf(f, "  flags=%x\n", mp->flags);
> -	fprintf(f, "  ring=<%s>@%p\n", mp->ring->name, mp->ring);
> +	fprintf(f, "  pool=%p\n", mp->pool);
>  	fprintf(f, "  phys_addr=0x%" PRIx64 "\n", mp->mz->phys_addr);
>  	fprintf(f, "  nb_mem_chunks=%u\n", mp->nb_mem_chunks);
>  	fprintf(f, "  size=%"PRIu32"\n", mp->size);
> @@ -1144,7 +1127,7 @@ rte_mempool_dump(FILE *f, struct rte_mempool *mp)
>  	}
>  
>  	cache_count = rte_mempool_dump_cache(f, mp);
> -	common_count = rte_ring_count(mp->ring);
> +	common_count = rte_mempool_ops_get_count(mp);
>  	if ((cache_count + common_count) > mp->size)
>  		common_count = mp->size - cache_count;
>  	fprintf(f, "  common_pool_count=%u\n", common_count);
> diff --git a/lib/librte_mempool/rte_mempool.h b/lib/librte_mempool/rte_mempool.h
> index 60339bd..b659565 100644
> --- a/lib/librte_mempool/rte_mempool.h
> +++ b/lib/librte_mempool/rte_mempool.h
> @@ -67,6 +67,7 @@
>  #include <inttypes.h>
>  #include <sys/queue.h>
>  
> +#include <rte_spinlock.h>
>  #include <rte_log.h>
>  #include <rte_debug.h>
>  #include <rte_lcore.h>
> @@ -204,9 +205,13 @@ struct rte_mempool_memhdr {
>  struct rte_mempool {
>  	char name[RTE_MEMPOOL_NAMESIZE]; /**< Name of mempool. */
>  	struct rte_ring *ring;           /**< Ring to store objects. */
> +	union {
> +		void *pool;              /**< Ring or pool to store objects */

What about calling this pdata or priv? I think, it can improve some doc comments.
Describing a "pool" may refer to both the rte_mempool itself or to the mp->pool
pointer. The "priv" would help to understand the code a bit.

Then the rte_mempool_alloc_t can be called rte_mempool_priv_alloc_t. Or something
similar. It's than clear enough, what the function should do...

> +		uint64_t *pool_id;       /**< External mempool identifier */

Why is pool_id a pointer? Is it a typo? I've understood it should be 64b wide
from the discussion (Olivier, Jerin, David):

| >>> something like below,
| >>> union {
| >>> 	void *pool;
| >>> 	uint64_t val;
| >>> 	uint8_t extra_mem[16] // available free bytes in fast path cache line
| >>>
| >>> }  
| >>
| >> Something for the future, perhaps? Will the 8-bits in the flags suffice for
| >> now?  
| > 
| > OK. But simple anonymous union for same type should be OK add now? Not
| > much change I believe, If its difficult then postpone it
| > 
| > union {
| > 	void *pool;
| > 	uint64_t val;
| > }  
|
| I'm ok with the simple union with (void *) and (uint64_t).
| Maybe "val" should be replaced by something more appropriate.
| Is "pool_id" a better name?

> +	};
>  	const struct rte_memzone *mz;    /**< Memzone where pool is allocated */
>  	int flags;                       /**< Flags of the mempool. */
> -	int socket_id;                   /**< Socket id passed at mempool creation. */
> +	int socket_id;                   /**< Socket id passed at create */
>  	uint32_t size;                   /**< Max size of the mempool. */
>  	uint32_t cache_size;             /**< Size of per-lcore local cache. */
>  	uint32_t cache_flushthresh;
> @@ -217,6 +222,14 @@ struct rte_mempool {
>  	uint32_t trailer_size;           /**< Size of trailer (after elt). */
>  
>  	unsigned private_data_size;      /**< Size of private data. */
> +	/**
> +	 * Index into rte_mempool_handler_table array of mempool handler ops

s/rte_mempool_handler_table/rte_mempool_ops_table/

> +	 * structs, which contain callback function pointers.
> +	 * We're using an index here rather than pointers to the callbacks
> +	 * to facilitate any secondary processes that may want to use
> +	 * this mempool.
> +	 */
> +	int32_t handler_idx;

s/handler_idx/ops_idx/

What about ops_index? Not a big deal...

>  
>  	struct rte_mempool_cache *local_cache; /**< Per-lcore local cache */
>  
> @@ -325,6 +338,199 @@ void rte_mempool_check_cookies(const struct rte_mempool *mp,
>  #define __mempool_check_cookies(mp, obj_table_const, n, free) do {} while(0)
>  #endif /* RTE_LIBRTE_MEMPOOL_DEBUG */
>  
> +#define RTE_MEMPOOL_HANDLER_NAMESIZE 32 /**< Max length of handler name. */
> +
> +/**
> + * typedef for allocating the external pool.

What about:

function prototype to provide an implementation specific data

> + * The handler's alloc function does whatever it needs to do to grab memory
> + * for this handler, and sets the *pool opaque pointer in the rte_mempool
> + * struct. In the default handler, *pool points to a ring,in other handlers,

What about:

The function should provide a memory heap representation or another private data
used for allocation by the rte_mempool_ops. E.g. the default ops provides an
instance of the rte_ring for this purpose.

> + * it will mostlikely point to a different type of data structure.
> + * It will be transparent to the application programmer.

I'd add something like this:

The function should not touch the given *mp instance.

...because it's goal is NOT to set the mp->pool, this is done by the
rte_mempool_populate_phys - the caller of this rte_mempool_ops_alloc.

This is why I've suggested to pass the rte_mempool as const in the v5.
Is there any reason to modify the rte_mempool contents by the implementation?
I think, we just need to read the flags, socket_id, etc.

> + */
> +typedef void *(*rte_mempool_alloc_t)(struct rte_mempool *mp);
> +
> +/** Free the external pool opaque data (that pointed to by *pool) */

What about:

/** Free the opaque private data stored in the mp->pool pointer. */

> +typedef void (*rte_mempool_free_t)(void *p);
> +
> +/**
> + * Put an object in the external pool.
> + * The *p pointer is the opaque data for a given mempool handler (ring,
> + * array, linked list, etc)

The obj_table is not documented. Is it really a table? I'd called an array instead.

> + */
> +typedef int (*rte_mempool_put_t)(void *p, void * const *obj_table, unsigned n);

unsigned int

> +
> +/** Get an object from the external pool. */
> +typedef int (*rte_mempool_get_t)(void *p, void **obj_table, unsigned n);

unsigned int

> +
> +/** Return the number of available objects in the external pool. */

Is the number of available objects or the total number of all objects
(so probably a constant value)?

> +typedef unsigned (*rte_mempool_get_count)(void *p);

Should it be const void *p?

> +
> +/** Structure defining a mempool handler. */

What about:

/** Structure defining mempool operations. */

> +struct rte_mempool_ops {
> +	char name[RTE_MEMPOOL_HANDLER_NAMESIZE]; /**< Name of mempool handler */

s/RTE_MEMPOOL_HANDLER_NAMESIZE/RTE_MEMPOOL_OPS_NAMESIZE/

> +	rte_mempool_alloc_t alloc;       /**< Allocate the external pool. */

What about:

Allocate the private data for this rte_mempool_ops.

> +	rte_mempool_free_t free;         /**< Free the external pool. */
> +	rte_mempool_put_t put;           /**< Put an object. */
> +	rte_mempool_get_t get;           /**< Get an object. */
> +	rte_mempool_get_count get_count; /**< Get the number of available objs. */
> +} __rte_cache_aligned;
> +
> +#define RTE_MEMPOOL_MAX_HANDLER_IDX 16  /**< Max registered handlers */

s/RTE_MEMPOOL_MAX_HANDLER_IDX/RTE_MEMPOOL_MAX_OPS_IDX/

> +
> +/**
> + * Structure storing the table of registered handlers, each of which contain
> + * the function pointers for the mempool handler functions.
> + * Each process has it's own storage for this handler struct aray so that
> + * the mempools can be shared across primary and secondary processes.
> + * The indices used to access the array are valid across processes, whereas
> + * any function pointers stored directly in the mempool struct would not be.
> + * This results in us simply having "handler_idx" in the mempool struct.
> + */
> +struct rte_mempool_handler_table {

s/rte_mempool_handler_table/rte_mempool_ops_table/

> +	rte_spinlock_t sl;     /**< Spinlock for add/delete. */
> +	uint32_t num_handlers; /**< Number of handlers in the table. */

s/num_handlers/num_ops/

> +	/**
> +	 * Storage for all possible handlers.
> +	 */
> +	struct rte_mempool_ops handler_ops[RTE_MEMPOOL_MAX_HANDLER_IDX];

s/handler_ops/ops/

> +} __rte_cache_aligned;
> +
> +/** Array of registered handlers */
> +extern struct rte_mempool_handler_table rte_mempool_handler_table;

s/rte_mempool_handler_table/rte_mempool_ops_table/

> +
> +/**
> + * @internal Get the mempool handler from its index.
> + *
> + * @param handler_idx
> + *   The index of the handler in the handler table. It must be a valid
> + *   index: (0 <= idx < num_handlers).
> + * @return
> + *   The pointer to the handler in the table.
> + */
> +static inline struct rte_mempool_ops *
> +rte_mempool_handler_get(int handler_idx)

rte_mempool_ops_get(int ops_idx)

> +{
> +	return &rte_mempool_handler_table.handler_ops[handler_idx];
> +}
> +
> +/**
> + * @internal wrapper for external mempool manager alloc callback.
> + *
> + * @param mp
> + *   Pointer to the memory pool.
> + * @return
> + *   The opaque pointer to the external pool.
> + */
> +void *
> +rte_mempool_ops_alloc(struct rte_mempool *mp);
> +
> +/**
> + * @internal wrapper for external mempool manager get callback.
> + *
> + * @param mp
> + *   Pointer to the memory pool.
> + * @param obj_table
> + *   Pointer to a table of void * pointers (objects).
> + * @param n
> + *   Number of objects to get.
> + * @return
> + *   - 0: Success; got n objects.
> + *   - <0: Error; code of handler get function.
> + */
> +static inline int
> +rte_mempool_ops_dequeue_bulk(struct rte_mempool *mp,
> +		void **obj_table, unsigned n)
> +{
> +	struct rte_mempool_ops *handler;

*ops

> +
> +	handler = rte_mempool_handler_get(mp->handler_idx);
> +	return handler->get(mp->pool, obj_table, n);
> +}
> +
> +/**
> + * @internal wrapper for external mempool manager put callback.
> + *
> + * @param mp
> + *   Pointer to the memory pool.
> + * @param obj_table
> + *   Pointer to a table of void * pointers (objects).
> + * @param n
> + *   Number of objects to put.
> + * @return
> + *   - 0: Success; n objects supplied.
> + *   - <0: Error; code of handler put function.
> + */
> +static inline int
> +rte_mempool_ops_enqueue_bulk(struct rte_mempool *mp, void * const *obj_table,
> +		unsigned n)
> +{
> +	struct rte_mempool_ops *handler;

*ops

> +
> +	handler = rte_mempool_handler_get(mp->handler_idx);
> +	return handler->put(mp->pool, obj_table, n);
> +}
> +
> +/**
> + * @internal wrapper for external mempool manager get_count callback.
> + *
> + * @param mp
> + *   Pointer to the memory pool.
> + * @return
> + *   The number of available objects in the external pool.
> + */
> +unsigned
> +rte_mempool_ops_get_count(const struct rte_mempool *mp);
> +
> +/**
> + * @internal wrapper for external mempool manager free callback.
> + *
> + * @param mp
> + *   Pointer to the memory pool.
> + */
> +void
> +rte_mempool_ops_free(struct rte_mempool *mp);
> +
> +/**
> + * Set the handler of a mempool
> + *
> + * This can only be done on a mempool that is not populated, i.e. just after
> + * a call to rte_mempool_create_empty().
> + *
> + * @param mp
> + *   Pointer to the memory pool.
> + * @param name
> + *   Name of the handler.
> + * @return
> + *   - 0: Sucess; the new handler is configured.
> + *   - EINVAL - Invalid handler name provided
> + *   - EEXIST - mempool already has a handler assigned

They are returned as -EINVAL and -EEXIST.

IMHO, using "-" here is counter-intuitive:

 - EINVAL

does it mean a positive with "-" or negative value?

> + */
> +int
> +rte_mempool_set_handler(struct rte_mempool *mp, const char *name);

rte_mempool_set_ops

What about rte_mempool_set_ops_byname? Not a big deal...

> +
> +/**
> + * Register an external pool handler.

Register mempool operations

> + *
> + * @param h
> + *   Pointer to the external pool handler
> + * @return
> + *   - >=0: Sucess; return the index of the handler in the table.
> + *   - EINVAL - missing callbacks while registering handler
> + *   - ENOSPC - the maximum number of handlers has been reached

- -EINVAL
- -ENOSPC

> + */
> +int rte_mempool_handler_register(const struct rte_mempool_ops *h);

rte_mempool_ops_register

> +
> +/**
> + * Macro to statically register an external pool handler.

What about adding:

Note that the rte_mempool_ops_register fails silently here when
more then RTE_MEMPOOL_MAX_OPS_IDX is registered.

> + */
> +#define MEMPOOL_REGISTER_HANDLER(h)	

MEMPOOL_REGISTER_OPS
				\
> +	void mp_hdlr_init_##h(void);					\
> +	void __attribute__((constructor, used)) mp_hdlr_init_##h(void)	\
> +	{								\
> +		rte_mempool_handler_register(&h);			\
> +	}
> +
>  /**
>   * An object callback function for mempool.
>   *
> @@ -774,7 +980,7 @@ __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
>  	cache->len += n;
>  
>  	if (cache->len >= flushthresh) {
> -		rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache_size],
> +		rte_mempool_ops_enqueue_bulk(mp, &cache->objs[cache_size],
>  				cache->len - cache_size);
>  		cache->len = cache_size;
>  	}
> @@ -785,19 +991,10 @@ ring_enqueue:
>  
>  	/* push remaining objects in ring */
>  #ifdef RTE_LIBRTE_MEMPOOL_DEBUG
> -	if (is_mp) {
> -		if (rte_ring_mp_enqueue_bulk(mp->ring, obj_table, n) < 0)
> -			rte_panic("cannot put objects in mempool\n");
> -	}
> -	else {
> -		if (rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n) < 0)
> -			rte_panic("cannot put objects in mempool\n");
> -	}
> +	if (rte_mempool_ops_enqueue_bulk(mp, obj_table, n) < 0)
> +		rte_panic("cannot put objects in mempool\n");
>  #else
> -	if (is_mp)
> -		rte_ring_mp_enqueue_bulk(mp->ring, obj_table, n);
> -	else
> -		rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n);
> +	rte_mempool_ops_enqueue_bulk(mp, obj_table, n);
>  #endif
>  }
>  
> @@ -922,7 +1119,7 @@ rte_mempool_put(struct rte_mempool *mp, void *obj)
>   */
>  static inline int __attribute__((always_inline))
>  __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
> -		   unsigned n, int is_mc)
> +		   unsigned n, __rte_unused int is_mc)

unsigned int

>  {
>  	int ret;
>  	struct rte_mempool_cache *cache;
> @@ -945,7 +1142,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
>  		uint32_t req = n + (cache_size - cache->len);
>  
>  		/* How many do we require i.e. number to fill the cache + the request */
> -		ret = rte_ring_mc_dequeue_bulk(mp->ring, &cache->objs[cache->len], req);
> +		ret = rte_mempool_ops_dequeue_bulk(mp,
> +			&cache->objs[cache->len], req);
>  		if (unlikely(ret < 0)) {
>  			/*
>  			 * In the offchance that we are buffer constrained,
> @@ -972,10 +1170,7 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
>  ring_dequeue:
>  
>  	/* get remaining objects from ring */
> -	if (is_mc)
> -		ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, n);
> -	else
> -		ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);
> +	ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, n);
>  
>  	if (ret < 0)
>  		__MEMPOOL_STAT_ADD(mp, get_fail, n);
> diff --git a/lib/librte_mempool/rte_mempool_handler.c b/lib/librte_mempool/rte_mempool_handler.c
> new file mode 100644
> index 0000000..ed85d65
> --- /dev/null
> +++ b/lib/librte_mempool/rte_mempool_handler.c

rte_mempool_ops.c

> @@ -0,0 +1,141 @@
> +/*-
> + *   BSD LICENSE
> + *
> + *   Copyright(c) 2016 Intel Corporation. All rights reserved.
> + *   Copyright(c) 2016 6WIND S.A.
> + *   All rights reserved.
> + *
> + *   Redistribution and use in source and binary forms, with or without
> + *   modification, are permitted provided that the following conditions
> + *   are met:
> + *
> + *     * Redistributions of source code must retain the above copyright
> + *       notice, this list of conditions and the following disclaimer.
> + *     * Redistributions in binary form must reproduce the above copyright
> + *       notice, this list of conditions and the following disclaimer in
> + *       the documentation and/or other materials provided with the
> + *       distribution.
> + *     * Neither the name of Intel Corporation nor the names of its
> + *       contributors may be used to endorse or promote products derived
> + *       from this software without specific prior written permission.
> + *
> + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> + */
> +
> +#include <stdio.h>
> +#include <string.h>
> +
> +#include <rte_mempool.h>
> +
> +/* indirect jump table to support external memory pools */
> +struct rte_mempool_handler_table rte_mempool_handler_table = {

rte_mempool_ops_table

> +	.sl =  RTE_SPINLOCK_INITIALIZER ,
> +	.num_handlers = 0

num_ops

> +};
> +
> +/* add a new handler in rte_mempool_handler_table, return its index */
> +int
> +rte_mempool_handler_register(const struct rte_mempool_ops *h)
> +{
> +	struct rte_mempool_ops *handler;

*ops

> +	int16_t handler_idx;

ops_idx

> +
> +	rte_spinlock_lock(&rte_mempool_handler_table.sl);
> +
> +	if (rte_mempool_handler_table.num_handlers >=
> +			RTE_MEMPOOL_MAX_HANDLER_IDX) {
> +		rte_spinlock_unlock(&rte_mempool_handler_table.sl);
> +		RTE_LOG(ERR, MEMPOOL,
> +			"Maximum number of mempool handlers exceeded\n");
> +		return -ENOSPC;
> +	}
> +
> +	if (h->put == NULL || h->get == NULL || h->get_count == NULL) {
> +		rte_spinlock_unlock(&rte_mempool_handler_table.sl);
> +		RTE_LOG(ERR, MEMPOOL,
> +			"Missing callback while registering mempool handler\n");
> +		return -EINVAL;
> +	}
> +
> +	handler_idx = rte_mempool_handler_table.num_handlers++;
> +	handler = &rte_mempool_handler_table.handler_ops[handler_idx];
> +	snprintf(handler->name, sizeof(handler->name), "%s", h->name);
> +	handler->alloc = h->alloc;
> +	handler->put = h->put;
> +	handler->get = h->get;
> +	handler->get_count = h->get_count;
> +
> +	rte_spinlock_unlock(&rte_mempool_handler_table.sl);
> +
> +	return handler_idx;
> +}
> +
> +/* wrapper to allocate an external pool handler */
> +void *
> +rte_mempool_ops_alloc(struct rte_mempool *mp)
> +{
> +	struct rte_mempool_ops *handler;

*ops

> +
> +	handler = rte_mempool_handler_get(mp->handler_idx);
> +	if (handler->alloc == NULL)
> +		return NULL;
> +	return handler->alloc(mp);
> +}
> +
> +/* wrapper to free an external pool handler */
> +void
> +rte_mempool_ops_free(struct rte_mempool *mp)
> +{
> +	struct rte_mempool_ops *handler;

*ops

> +
> +	handler = rte_mempool_handler_get(mp->handler_idx);
> +	if (handler->free == NULL)
> +		return;
> +	return handler->free(mp);
> +}
> +
> +/* wrapper to get available objects in an external pool handler */
> +unsigned int
> +rte_mempool_ops_get_count(const struct rte_mempool *mp)
> +{
> +	struct rte_mempool_ops *handler;

*ops

> +
> +	handler = rte_mempool_handler_get(mp->handler_idx);
> +	return handler->get_count(mp->pool);
> +}
> +
> +/* sets a handler previously registered by rte_mempool_handler_register */
> +int
> +rte_mempool_set_handler(struct rte_mempool *mp, const char *name)
> +{
> +	struct rte_mempool_ops *handler = NULL;

*ops

> +	unsigned i;
> +
> +	/* too late, the mempool is already populated */
> +	if (mp->flags & MEMPOOL_F_RING_CREATED)
> +		return -EEXIST;
> +
> +	for (i = 0; i < rte_mempool_handler_table.num_handlers; i++) {
> +		if (!strcmp(name,
> +				rte_mempool_handler_table.handler_ops[i].name)) {
> +			handler = &rte_mempool_handler_table.handler_ops[i];
> +			break;
> +		}
> +	}
> +
> +	if (handler == NULL)
> +		return -EINVAL;
> +
> +	mp->handler_idx = i;
> +	return 0;
> +}

Regards
Jan
  
Hunt, David June 2, 2016, 9:11 a.m. UTC | #3
On 6/1/2016 6:54 PM, Jan Viktorin wrote:
> Hello David,
>
> the rename s/handler/ops/ has a lot of residues. Sorry for that :). I tried to
> mark most of them. Otherwise, I couldn't see many more serious issues for now.

Ah, I had assumed that we were just talking about the 
rte_mempool_handler_ops structure,
not a global replace of 'handler' with 'ops'.  It does make sense to 
change it to ops, so we don't have
two words describing the same entity. I'll change to ops.

Just, note the s/pool/priv/ rename suggestion.


I prefer your suggestion of pdata rather than priv, how about "pool_data"?


Again, thanks for the comprehensive review.

Regards,
Dave.

[...]
  
Hunt, David June 2, 2016, 11:23 a.m. UTC | #4
On 6/1/2016 6:54 PM, Jan Viktorin wrote:
>
>   		mp->populated_size--;
> @@ -383,13 +349,16 @@ rte_mempool_populate_phys(struct rte_mempool *mp, char *vaddr,
>   	unsigned i = 0;
>   	size_t off;
>   	struct rte_mempool_memhdr *memhdr;
> -	int ret;
>   
>   	/* create the internal ring if not already done */
>   	if ((mp->flags & MEMPOOL_F_RING_CREATED) == 0) {
> -		ret = rte_mempool_ring_create(mp);
> -		if (ret < 0)
> -			return ret;
> +		rte_errno = 0;
> +		mp->pool = rte_mempool_ops_alloc(mp);
> +		if (mp->pool == NULL) {
> +			if (rte_errno == 0)
> +				return -EINVAL;
> +			return -rte_errno;
> Are you sure the rte_errno is a positive value?

If the user returns EINVAL, or similar, we want to return negative, as 
per the rest of DPDK.

>> @@ -204,9 +205,13 @@ struct rte_mempool_memhdr {
>>   struct rte_mempool {
>>   	char name[RTE_MEMPOOL_NAMESIZE]; /**< Name of mempool. */
>>   	struct rte_ring *ring;           /**< Ring to store objects. */
>> +	union {
>> +		void *pool;              /**< Ring or pool to store objects */
> What about calling this pdata or priv? I think, it can improve some doc comments.
> Describing a "pool" may refer to both the rte_mempool itself or to the mp->pool
> pointer. The "priv" would help to understand the code a bit.
>
> Then the rte_mempool_alloc_t can be called rte_mempool_priv_alloc_t. Or something
> similar. It's than clear enough, what the function should do...

I'd lean towards pdata or maybe pool_data. Not sure about the function 
name change though,
the function does return a pool_data pointer, which we put into 
mp->pool_data.

>> +		uint64_t *pool_id;       /**< External mempool identifier */
> Why is pool_id a pointer? Is it a typo? I've understood it should be 64b wide
> from the discussion (Olivier, Jerin, David):

Yes, typo.

>   	uint32_t trailer_size;           /**< Size of trailer (after elt). */
>   
>   	unsigned private_data_size;      /**< Size of private data. */
> +	/**
> +	 * Index into rte_mempool_handler_table array of mempool handler ops
> s/rte_mempool_handler_table/rte_mempool_ops_table/

Done.

>> +	 * structs, which contain callback function pointers.
>> +	 * We're using an index here rather than pointers to the callbacks
>> +	 * to facilitate any secondary processes that may want to use
>> +	 * this mempool.
>> +	 */
>> +	int32_t handler_idx;
> s/handler_idx/ops_idx/
>
> What about ops_index? Not a big deal...

I agree ops_index is better. Changed.

>>   
>>   	struct rte_mempool_cache *local_cache; /**< Per-lcore local cache */
>>   
>> @@ -325,6 +338,199 @@ void rte_mempool_check_cookies(const struct rte_mempool *mp,
>>   #define __mempool_check_cookies(mp, obj_table_const, n, free) do {} while(0)
>>   #endif /* RTE_LIBRTE_MEMPOOL_DEBUG */
>>   
>> +#define RTE_MEMPOOL_HANDLER_NAMESIZE 32 /**< Max length of handler name. */
>> +
>> +/**
>> + * typedef for allocating the external pool.
> What about:
>
> function prototype to provide an implementation specific data
>
>> + * The handler's alloc function does whatever it needs to do to grab memory
>> + * for this handler, and sets the *pool opaque pointer in the rte_mempool
>> + * struct. In the default handler, *pool points to a ring,in other handlers,
> What about:
>
> The function should provide a memory heap representation or another private data
> used for allocation by the rte_mempool_ops. E.g. the default ops provides an
> instance of the rte_ring for this purpose.
>
>> + * it will mostlikely point to a different type of data structure.
>> + * It will be transparent to the application programmer.
> I'd add something like this:
>
> The function should not touch the given *mp instance.

Agreed. Reworked somewhat.


> ...because it's goal is NOT to set the mp->pool, this is done by the
> rte_mempool_populate_phys - the caller of this rte_mempool_ops_alloc.
>
> This is why I've suggested to pass the rte_mempool as const in the v5.
> Is there any reason to modify the rte_mempool contents by the implementation?
> I think, we just need to read the flags, socket_id, etc.

Yes, I agree it should be const. Changed.

>> + */
>> +typedef void *(*rte_mempool_alloc_t)(struct rte_mempool *mp);
>> +
>> +/** Free the external pool opaque data (that pointed to by *pool) */
> What about:
>
> /** Free the opaque private data stored in the mp->pool pointer. */

I've merged the two versions of the comment:
/** Free the opaque private data pointed to by mp->pool_data pointer */


>> +typedef void (*rte_mempool_free_t)(void *p);
>> +
>> +/**
>> + * Put an object in the external pool.
>> + * The *p pointer is the opaque data for a given mempool handler (ring,
>> + * array, linked list, etc)
> The obj_table is not documented. Is it really a table? I'd called an array instead.

You're probably right, but it's always been called obj_table, and I'm 
not sure
this patchset is the place to change it.  Maybe a follow up patch?

>> + */
>> +typedef int (*rte_mempool_put_t)(void *p, void * const *obj_table, unsigned n);
> unsigned int
Done
>
>> +
>> +/** Get an object from the external pool. */
>> +typedef int (*rte_mempool_get_t)(void *p, void **obj_table, unsigned n);
> unsigned int
Done
>
>> +
>> +/** Return the number of available objects in the external pool. */
> Is the number of available objects or the total number of all objects
> (so probably a constant value)?

It's intended to be the umber of available objects

>
>> +typedef unsigned (*rte_mempool_get_count)(void *p);
> Should it be const void *p?

Yes, it could be. Changed.

>
>> +
>> +/** Structure defining a mempool handler. */
> What about:
>
> /** Structure defining mempool operations. */

Changed.

>> +struct rte_mempool_ops {
>> +	char name[RTE_MEMPOOL_HANDLER_NAMESIZE]; /**< Name of mempool handler */
> s/RTE_MEMPOOL_HANDLER_NAMESIZE/RTE_MEMPOOL_OPS_NAMESIZE/

Done

>> +	rte_mempool_alloc_t alloc;       /**< Allocate the external pool. */
> What about:
>
> Allocate the private data for this rte_mempool_ops.

Changed to /**< Allocate private data */


>> +	rte_mempool_free_t free;         /**< Free the external pool. */
>> +	rte_mempool_put_t put;           /**< Put an object. */
>> +	rte_mempool_get_t get;           /**< Get an object. */
>> +	rte_mempool_get_count get_count; /**< Get the number of available objs. */
>> +} __rte_cache_aligned;
>> +
>> +#define RTE_MEMPOOL_MAX_HANDLER_IDX 16  /**< Max registered handlers */
> s/RTE_MEMPOOL_MAX_HANDLER_IDX/RTE_MEMPOOL_MAX_OPS_IDX/

Changed

>> +
>> +/**
>> + * Structure storing the table of registered handlers, each of which contain
>> + * the function pointers for the mempool handler functions.
>> + * Each process has it's own storage for this handler struct aray so that
>> + * the mempools can be shared across primary and secondary processes.
>> + * The indices used to access the array are valid across processes, whereas
>> + * any function pointers stored directly in the mempool struct would not be.
>> + * This results in us simply having "handler_idx" in the mempool struct.
>> + */
>> +struct rte_mempool_handler_table {
> s/rte_mempool_handler_table/rte_mempool_ops_table/

Done

>> +	rte_spinlock_t sl;     /**< Spinlock for add/delete. */
>> +	uint32_t num_handlers; /**< Number of handlers in the table. */
> s/num_handlers/num_ops/

Done

>> +	/**
>> +	 * Storage for all possible handlers.
>> +	 */
>> +	struct rte_mempool_ops handler_ops[RTE_MEMPOOL_MAX_HANDLER_IDX];
> s/handler_ops/ops/

Done
>> +} __rte_cache_aligned;
>> +
>> +/** Array of registered handlers */
>> +extern struct rte_mempool_handler_table rte_mempool_handler_table;
> s/rte_mempool_handler_table/rte_mempool_ops_table/

Done

>> +
>> +/**
>> + * @internal Get the mempool handler from its index.
>> + *
>> + * @param handler_idx
>> + *   The index of the handler in the handler table. It must be a valid
>> + *   index: (0 <= idx < num_handlers).
>> + * @return
>> + *   The pointer to the handler in the table.
>> + */
>> +static inline struct rte_mempool_ops *
>> +rte_mempool_handler_get(int handler_idx)
> rte_mempool_ops_get(int ops_idx)

Done


>> +{
>> +	return &rte_mempool_handler_table.handler_ops[handler_idx];
>> +}
>> +
>> +/**
>> + * @internal wrapper for external mempool manager alloc callback.
>> + *
>> + * @param mp
>> + *   Pointer to the memory pool.
>> + * @return
>> + *   The opaque pointer to the external pool.
>> + */
>> +void *
>> +rte_mempool_ops_alloc(struct rte_mempool *mp);
>> +
>> +/**
>> + * @internal wrapper for external mempool manager get callback.
>> + *
>> + * @param mp
>> + *   Pointer to the memory pool.
>> + * @param obj_table
>> + *   Pointer to a table of void * pointers (objects).
>> + * @param n
>> + *   Number of objects to get.
>> + * @return
>> + *   - 0: Success; got n objects.
>> + *   - <0: Error; code of handler get function.
>> + */
>> +static inline int
>> +rte_mempool_ops_dequeue_bulk(struct rte_mempool *mp,
>> +		void **obj_table, unsigned n)
>> +{
>> +	struct rte_mempool_ops *handler;
> *ops
Done

>> +
>> +	handler = rte_mempool_handler_get(mp->handler_idx);
>> +	return handler->get(mp->pool, obj_table, n);
>> +}
>> +
>> +/**
>> + * @internal wrapper for external mempool manager put callback.
>> + *
>> + * @param mp
>> + *   Pointer to the memory pool.
>> + * @param obj_table
>> + *   Pointer to a table of void * pointers (objects).
>> + * @param n
>> + *   Number of objects to put.
>> + * @return
>> + *   - 0: Success; n objects supplied.
>> + *   - <0: Error; code of handler put function.
>> + */
>> +static inline int
>> +rte_mempool_ops_enqueue_bulk(struct rte_mempool *mp, void * const *obj_table,
>> +		unsigned n)
>> +{
>> +	struct rte_mempool_ops *handler;
> *ops
Done

>> + * @return
>> + *   - 0: Sucess; the new handler is configured.
>> + *   - EINVAL - Invalid handler name provided
>> + *   - EEXIST - mempool already has a handler assigned
> They are returned as -EINVAL and -EEXIST.
>
> IMHO, using "-" here is counter-intuitive:
>
>   - EINVAL
>
> does it mean a positive with "-" or negative value?

EINVAL is positive, so it's returning negative. Common usage in DPDK, 
afaics.


>> + */
>> +int
>> +rte_mempool_set_handler(struct rte_mempool *mp, const char *name);
> rte_mempool_set_ops
>
> What about rte_mempool_set_ops_byname? Not a big deal...

I agree.  rte_mempool_set_ops_byname

>> +
>> +/**
>> + * Register an external pool handler.
> Register mempool operations

Done

>> + *
>> + * @param h
>> + *   Pointer to the external pool handler
>> + * @return
>> + *   - >=0: Sucess; return the index of the handler in the table.
>> + *   - EINVAL - missing callbacks while registering handler
>> + *   - ENOSPC - the maximum number of handlers has been reached
> - -EINVAL
> - -ENOSPC

:)

>> + */
>> +int rte_mempool_handler_register(const struct rte_mempool_ops *h);
> rte_mempool_ops_register

Done

>> +
>> +/**
>> + * Macro to statically register an external pool handler.
> What about adding:
>
> Note that the rte_mempool_ops_register fails silently here when
> more then RTE_MEMPOOL_MAX_OPS_IDX is registered.

Done


>> + */
>> +#define MEMPOOL_REGISTER_HANDLER(h)	
> MEMPOOL_REGISTER_OPS

Done

> 				\
>> +	void mp_hdlr_init_##h(void);					\
>> +	void __attribute__((constructor, used)) mp_hdlr_init_##h(void)	\
>> +	{								\
>> +		rte_mempool_handler_register(&h);			\
>> +	}
>> +
>>   /**
>>    * An object callback function for mempool.
>>    *
>> @@ -774,7 +980,7 @@ __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
>>   	cache->len += n;
>>   
>>   	if (cache->len >= flushthresh) {
>> -		rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache_size],
>> +		rte_mempool_ops_enqueue_bulk(mp, &cache->objs[cache_size],
>>   				cache->len - cache_size);
>>   		cache->len = cache_size;
>>   	}
>> @@ -785,19 +991,10 @@ ring_enqueue:
>>   
>>   	/* push remaining objects in ring */
>>   #ifdef RTE_LIBRTE_MEMPOOL_DEBUG
>> -	if (is_mp) {
>> -		if (rte_ring_mp_enqueue_bulk(mp->ring, obj_table, n) < 0)
>> -			rte_panic("cannot put objects in mempool\n");
>> -	}
>> -	else {
>> -		if (rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n) < 0)
>> -			rte_panic("cannot put objects in mempool\n");
>> -	}
>> +	if (rte_mempool_ops_enqueue_bulk(mp, obj_table, n) < 0)
>> +		rte_panic("cannot put objects in mempool\n");
>>   #else
>> -	if (is_mp)
>> -		rte_ring_mp_enqueue_bulk(mp->ring, obj_table, n);
>> -	else
>> -		rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n);
>> +	rte_mempool_ops_enqueue_bulk(mp, obj_table, n);
>>   #endif
>>   }
>>   
>> @@ -922,7 +1119,7 @@ rte_mempool_put(struct rte_mempool *mp, void *obj)
>>    */
>>   static inline int __attribute__((always_inline))
>>   __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
>> -		   unsigned n, int is_mc)
>> +		   unsigned n, __rte_unused int is_mc)
> unsigned int

Done

>>   {
>>   	int ret;
>>   	struct rte_mempool_cache *cache;
>> @@ -945,7 +1142,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
>>   		uint32_t req = n + (cache_size - cache->len);
>>   
>>   		/* How many do we require i.e. number to fill the cache + the request */
>> -		ret = rte_ring_mc_dequeue_bulk(mp->ring, &cache->objs[cache->len], req);
>> +		ret = rte_mempool_ops_dequeue_bulk(mp,
>> +			&cache->objs[cache->len], req);
>>   		if (unlikely(ret < 0)) {
>>   			/*
>>   			 * In the offchance that we are buffer constrained,
>> @@ -972,10 +1170,7 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
>>   ring_dequeue:
>>   
>>   	/* get remaining objects from ring */
>> -	if (is_mc)
>> -		ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, n);
>> -	else
>> -		ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);
>> +	ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, n);
>>   
>>   	if (ret < 0)
>>   		__MEMPOOL_STAT_ADD(mp, get_fail, n);
>> diff --git a/lib/librte_mempool/rte_mempool_handler.c b/lib/librte_mempool/rte_mempool_handler.c
>> new file mode 100644
>> index 0000000..ed85d65
>> --- /dev/null
>> +++ b/lib/librte_mempool/rte_mempool_handler.c
> rte_mempool_ops.c

Done.

>
>> @@ -0,0 +1,141 @@
>> +/*-
>> + *   BSD LICENSE
>> + *
>> + *   Copyright(c) 2016 Intel Corporation. All rights reserved.
>> + *   Copyright(c) 2016 6WIND S.A.
>> + *   All rights reserved.
>> + *
>> + *   Redistribution and use in source and binary forms, with or without
>> + *   modification, are permitted provided that the following conditions
>> + *   are met:
>> + *
>> + *     * Redistributions of source code must retain the above copyright
>> + *       notice, this list of conditions and the following disclaimer.
>> + *     * Redistributions in binary form must reproduce the above copyright
>> + *       notice, this list of conditions and the following disclaimer in
>> + *       the documentation and/or other materials provided with the
>> + *       distribution.
>> + *     * Neither the name of Intel Corporation nor the names of its
>> + *       contributors may be used to endorse or promote products derived
>> + *       from this software without specific prior written permission.
>> + *
>> + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
>> + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
>> + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
>> + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
>> + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
>> + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
>> + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
>> + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
>> + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
>> + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
>> + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
>> + */
>> +
>> +#include <stdio.h>
>> +#include <string.h>
>> +
>> +#include <rte_mempool.h>
>> +
>> +/* indirect jump table to support external memory pools */
>> +struct rte_mempool_handler_table rte_mempool_handler_table = {
> rte_mempool_ops_table

Done

>> +	.sl =  RTE_SPINLOCK_INITIALIZER ,
>> +	.num_handlers = 0
> num_ops

Done

>> +};
>> +
>> +/* add a new handler in rte_mempool_handler_table, return its index */
>> +int
>> +rte_mempool_handler_register(const struct rte_mempool_ops *h)
>> +{
>> +	struct rte_mempool_ops *handler;
> *ops

Done

>> +	int16_t handler_idx;
> ops_idx

Done

>> +
>> +	rte_spinlock_lock(&rte_mempool_handler_table.sl);
>> +
>> +	if (rte_mempool_handler_table.num_handlers >=
>> +			RTE_MEMPOOL_MAX_HANDLER_IDX) {
>> +		rte_spinlock_unlock(&rte_mempool_handler_table.sl);
>> +		RTE_LOG(ERR, MEMPOOL,
>> +			"Maximum number of mempool handlers exceeded\n");
>> +		return -ENOSPC;
>> +	}
>> +
>> +	if (h->put == NULL || h->get == NULL || h->get_count == NULL) {
>> +		rte_spinlock_unlock(&rte_mempool_handler_table.sl);
>> +		RTE_LOG(ERR, MEMPOOL,
>> +			"Missing callback while registering mempool handler\n");
>> +		return -EINVAL;
>> +	}
>> +
>> +	handler_idx = rte_mempool_handler_table.num_handlers++;
>> +	handler = &rte_mempool_handler_table.handler_ops[handler_idx];
>> +	snprintf(handler->name, sizeof(handler->name), "%s", h->name);
>> +	handler->alloc = h->alloc;
>> +	handler->put = h->put;
>> +	handler->get = h->get;
>> +	handler->get_count = h->get_count;
>> +
>> +	rte_spinlock_unlock(&rte_mempool_handler_table.sl);
>> +
>> +	return handler_idx;
>> +}
>> +
>> +/* wrapper to allocate an external pool handler */
>> +void *
>> +rte_mempool_ops_alloc(struct rte_mempool *mp)
>> +{
>> +	struct rte_mempool_ops *handler;
> *ops


Done

>> +
>> +	handler = rte_mempool_handler_get(mp->handler_idx);
>> +	if (handler->alloc == NULL)
>> +		return NULL;
>> +	return handler->alloc(mp);
>> +}
>> +
>> +/* wrapper to free an external pool handler */
>> +void
>> +rte_mempool_ops_free(struct rte_mempool *mp)
>> +{
>> +	struct rte_mempool_ops *handler;
> *ops

Done

>> +
>> +	handler = rte_mempool_handler_get(mp->handler_idx);
>> +	if (handler->free == NULL)
>> +		return;
>> +	return handler->free(mp);
>> +}
>> +
>> +/* wrapper to get available objects in an external pool handler */
>> +unsigned int
>> +rte_mempool_ops_get_count(const struct rte_mempool *mp)
>> +{
>> +	struct rte_mempool_ops *handler;
> *ops

Done

>> +
>> +	handler = rte_mempool_handler_get(mp->handler_idx);
>> +	return handler->get_count(mp->pool);
>> +}
>> +
>> +/* sets a handler previously registered by rte_mempool_handler_register */
>> +int
>> +rte_mempool_set_handler(struct rte_mempool *mp, const char *name)
>> +{
>> +	struct rte_mempool_ops *handler = NULL;
> *ops

Done

>> +	unsigned i;
>> +
>> +	/* too late, the mempool is already populated */
>> +	if (mp->flags & MEMPOOL_F_RING_CREATED)
>> +		return -EEXIST;
>> +
>> +	for (i = 0; i < rte_mempool_handler_table.num_handlers; i++) {
>> +		if (!strcmp(name,
>> +				rte_mempool_handler_table.handler_ops[i].name)) {
>> +			handler = &rte_mempool_handler_table.handler_ops[i];
>> +			break;
>> +		}
>> +	}
>> +
>> +	if (handler == NULL)
>> +		return -EINVAL;
>> +
>> +	mp->handler_idx = i;
>> +	return 0;
>> +}
> Regards
> Jan


Thanks,
Dave.
  
Jan Viktorin June 2, 2016, 1:43 p.m. UTC | #5
On Thu, 2 Jun 2016 12:23:41 +0100
"Hunt, David" <david.hunt@intel.com> wrote:

> On 6/1/2016 6:54 PM, Jan Viktorin wrote:
> >
> >   		mp->populated_size--;
> > @@ -383,13 +349,16 @@ rte_mempool_populate_phys(struct rte_mempool *mp, char *vaddr,
> >   	unsigned i = 0;
> >   	size_t off;
> >   	struct rte_mempool_memhdr *memhdr;
> > -	int ret;
> >   
> >   	/* create the internal ring if not already done */
> >   	if ((mp->flags & MEMPOOL_F_RING_CREATED) == 0) {
> > -		ret = rte_mempool_ring_create(mp);
> > -		if (ret < 0)
> > -			return ret;
> > +		rte_errno = 0;
> > +		mp->pool = rte_mempool_ops_alloc(mp);
> > +		if (mp->pool == NULL) {
> > +			if (rte_errno == 0)
> > +				return -EINVAL;
> > +			return -rte_errno;
> > Are you sure the rte_errno is a positive value?  
> 
> If the user returns EINVAL, or similar, we want to return negative, as 
> per the rest of DPDK.

Oh, yes... you're right.

> 
> >> @@ -204,9 +205,13 @@ struct rte_mempool_memhdr {
> >>   struct rte_mempool {
> >>   	char name[RTE_MEMPOOL_NAMESIZE]; /**< Name of mempool. */
> >>   	struct rte_ring *ring;           /**< Ring to store objects. */
> >> +	union {
> >> +		void *pool;              /**< Ring or pool to store objects */  
> > What about calling this pdata or priv? I think, it can improve some doc comments.
> > Describing a "pool" may refer to both the rte_mempool itself or to the mp->pool
> > pointer. The "priv" would help to understand the code a bit.
> >
> > Then the rte_mempool_alloc_t can be called rte_mempool_priv_alloc_t. Or something
> > similar. It's than clear enough, what the function should do...  
> 
> I'd lean towards pdata or maybe pool_data. Not sure about the function 
> name change though,
> the function does return a pool_data pointer, which we put into 
> mp->pool_data.

Yes. But from the name of the function, it is difficult to understand what is its
purpose.

> 
> >> +		uint64_t *pool_id;       /**< External mempool identifier */  
> > Why is pool_id a pointer? Is it a typo? I've understood it should be 64b wide
> > from the discussion (Olivier, Jerin, David):  
> 

[...]

> 
> >> +typedef void (*rte_mempool_free_t)(void *p);
> >> +
> >> +/**
> >> + * Put an object in the external pool.
> >> + * The *p pointer is the opaque data for a given mempool handler (ring,
> >> + * array, linked list, etc)  
> > The obj_table is not documented. Is it really a table? I'd called an array instead.  
> 
> You're probably right, but it's always been called obj_table, and I'm 
> not sure
> this patchset is the place to change it.  Maybe a follow up patch?

In that case, it's OK.

> 
> >> + */
> >> +typedef int (*rte_mempool_put_t)(void *p, void * const *obj_table, unsigned n);  
> > unsigned int  
> Done
> >  
> >> +
> >> +/** Get an object from the external pool. */
> >> +typedef int (*rte_mempool_get_t)(void *p, void **obj_table, unsigned n);  
> > unsigned int  
> Done
> >  
> >> +
> >> +/** Return the number of available objects in the external pool. */  
> > Is the number of available objects or the total number of all objects
> > (so probably a constant value)?  
> 
> It's intended to be the umber of available objects

OK, forget it.

> 
> >  
> >> +typedef unsigned (*rte_mempool_get_count)(void *p);  
> > Should it be const void *p?  
> 
> Yes, it could be. Changed.
> 
> >  

[...]

> 
> >> + * @return
> >> + *   - 0: Sucess; the new handler is configured.
> >> + *   - EINVAL - Invalid handler name provided
> >> + *   - EEXIST - mempool already has a handler assigned  
> > They are returned as -EINVAL and -EEXIST.
> >
> > IMHO, using "-" here is counter-intuitive:
> >
> >   - EINVAL
> >
> > does it mean a positive with "-" or negative value?  
> 
> EINVAL is positive, so it's returning negative. Common usage in DPDK, 
> afaics.

Yes, of course. But it is not so clear from the doc. I've already wrote
a code checking the positive error codes. That was because of no "minus
sign" in the doc. So, my code was wrong and it took me a while to find
out the reason... I supposed, the positive value was intentional. Finally,
I had to lookup the source code (the calling path) to verify...

> 
> 
> >> + */
> >> +int
> >> +rte_mempool_set_handler(struct rte_mempool *mp, const char *name);  
> > rte_mempool_set_ops
> >
> > What about rte_mempool_set_ops_byname? Not a big deal...  
> 
> I agree.  rte_mempool_set_ops_byname
> 
> >> +
> >> +/**
> >> + * Register an external pool handler.  
> > Register mempool operations  
> 
> Done
> 
> >> + *
> >> + * @param h
> >> + *   Pointer to the external pool handler
> >> + * @return
> >> + *   - >=0: Sucess; return the index of the handler in the table.
> >> + *   - EINVAL - missing callbacks while registering handler
> >> + *   - ENOSPC - the maximum number of handlers has been reached  
> > - -EINVAL
> > - -ENOSPC  
> 
> :)

Similar as above... If it's a DPDK standard to write it like this, then I am
OK with that.

> 

[...]
  

Patch

diff --git a/lib/librte_mempool/Makefile b/lib/librte_mempool/Makefile
index 43423e0..793745f 100644
--- a/lib/librte_mempool/Makefile
+++ b/lib/librte_mempool/Makefile
@@ -42,6 +42,7 @@  LIBABIVER := 2
 
 # all source are stored in SRCS-y
 SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_mempool.c
+SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_mempool_handler.c
 # install includes
 SYMLINK-$(CONFIG_RTE_LIBRTE_MEMPOOL)-include := rte_mempool.h
 
diff --git a/lib/librte_mempool/rte_mempool.c b/lib/librte_mempool/rte_mempool.c
index b54de43..9f34d30 100644
--- a/lib/librte_mempool/rte_mempool.c
+++ b/lib/librte_mempool/rte_mempool.c
@@ -148,7 +148,7 @@  mempool_add_elem(struct rte_mempool *mp, void *obj, phys_addr_t physaddr)
 #endif
 
 	/* enqueue in ring */
-	rte_ring_sp_enqueue(mp->ring, obj);
+	rte_mempool_ops_enqueue_bulk(mp, &obj, 1);
 }
 
 /* call obj_cb() for each mempool element */
@@ -303,40 +303,6 @@  rte_mempool_xmem_usage(__rte_unused void *vaddr, uint32_t elt_num,
 	return (size_t)paddr_idx << pg_shift;
 }
 
-/* create the internal ring */
-static int
-rte_mempool_ring_create(struct rte_mempool *mp)
-{
-	int rg_flags = 0, ret;
-	char rg_name[RTE_RING_NAMESIZE];
-	struct rte_ring *r;
-
-	ret = snprintf(rg_name, sizeof(rg_name),
-		RTE_MEMPOOL_MZ_FORMAT, mp->name);
-	if (ret < 0 || ret >= (int)sizeof(rg_name))
-		return -ENAMETOOLONG;
-
-	/* ring flags */
-	if (mp->flags & MEMPOOL_F_SP_PUT)
-		rg_flags |= RING_F_SP_ENQ;
-	if (mp->flags & MEMPOOL_F_SC_GET)
-		rg_flags |= RING_F_SC_DEQ;
-
-	/* Allocate the ring that will be used to store objects.
-	 * Ring functions will return appropriate errors if we are
-	 * running as a secondary process etc., so no checks made
-	 * in this function for that condition.
-	 */
-	r = rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
-		mp->socket_id, rg_flags);
-	if (r == NULL)
-		return -rte_errno;
-
-	mp->ring = r;
-	mp->flags |= MEMPOOL_F_RING_CREATED;
-	return 0;
-}
-
 /* free a memchunk allocated with rte_memzone_reserve() */
 static void
 rte_mempool_memchunk_mz_free(__rte_unused struct rte_mempool_memhdr *memhdr,
@@ -354,7 +320,7 @@  rte_mempool_free_memchunks(struct rte_mempool *mp)
 	void *elt;
 
 	while (!STAILQ_EMPTY(&mp->elt_list)) {
-		rte_ring_sc_dequeue(mp->ring, &elt);
+		rte_mempool_ops_dequeue_bulk(mp, &elt, 1);
 		(void)elt;
 		STAILQ_REMOVE_HEAD(&mp->elt_list, next);
 		mp->populated_size--;
@@ -383,13 +349,16 @@  rte_mempool_populate_phys(struct rte_mempool *mp, char *vaddr,
 	unsigned i = 0;
 	size_t off;
 	struct rte_mempool_memhdr *memhdr;
-	int ret;
 
 	/* create the internal ring if not already done */
 	if ((mp->flags & MEMPOOL_F_RING_CREATED) == 0) {
-		ret = rte_mempool_ring_create(mp);
-		if (ret < 0)
-			return ret;
+		rte_errno = 0;
+		mp->pool = rte_mempool_ops_alloc(mp);
+		if (mp->pool == NULL) {
+			if (rte_errno == 0)
+				return -EINVAL;
+			return -rte_errno;
+		}
 	}
 
 	/* mempool is already populated */
@@ -703,7 +672,7 @@  rte_mempool_free(struct rte_mempool *mp)
 	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
 
 	rte_mempool_free_memchunks(mp);
-	rte_ring_free(mp->ring);
+	rte_mempool_ops_free(mp);
 	rte_memzone_free(mp->mz);
 }
 
@@ -815,6 +784,20 @@  rte_mempool_create_empty(const char *name, unsigned n, unsigned elt_size,
 		RTE_PTR_ADD(mp, MEMPOOL_HEADER_SIZE(mp, 0));
 
 	te->data = mp;
+
+	/*
+	 * Since we have 4 combinations of the SP/SC/MP/MC examine the flags to
+	 * set the correct index into the handler table.
+	 */
+	if (flags & (MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET))
+		rte_mempool_set_handler(mp, "ring_sp_sc");
+	else if (flags & MEMPOOL_F_SP_PUT)
+		rte_mempool_set_handler(mp, "ring_sp_mc");
+	else if (flags & MEMPOOL_F_SC_GET)
+		rte_mempool_set_handler(mp, "ring_mp_sc");
+	else
+		rte_mempool_set_handler(mp, "ring_mp_mc");
+
 	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
 	TAILQ_INSERT_TAIL(mempool_list, te, next);
 	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
@@ -930,7 +913,7 @@  rte_mempool_count(const struct rte_mempool *mp)
 	unsigned count;
 	unsigned lcore_id;
 
-	count = rte_ring_count(mp->ring);
+	count = rte_mempool_ops_get_count(mp);
 
 	if (mp->cache_size == 0)
 		return count;
@@ -1123,7 +1106,7 @@  rte_mempool_dump(FILE *f, struct rte_mempool *mp)
 
 	fprintf(f, "mempool <%s>@%p\n", mp->name, mp);
 	fprintf(f, "  flags=%x\n", mp->flags);
-	fprintf(f, "  ring=<%s>@%p\n", mp->ring->name, mp->ring);
+	fprintf(f, "  pool=%p\n", mp->pool);
 	fprintf(f, "  phys_addr=0x%" PRIx64 "\n", mp->mz->phys_addr);
 	fprintf(f, "  nb_mem_chunks=%u\n", mp->nb_mem_chunks);
 	fprintf(f, "  size=%"PRIu32"\n", mp->size);
@@ -1144,7 +1127,7 @@  rte_mempool_dump(FILE *f, struct rte_mempool *mp)
 	}
 
 	cache_count = rte_mempool_dump_cache(f, mp);
-	common_count = rte_ring_count(mp->ring);
+	common_count = rte_mempool_ops_get_count(mp);
 	if ((cache_count + common_count) > mp->size)
 		common_count = mp->size - cache_count;
 	fprintf(f, "  common_pool_count=%u\n", common_count);
diff --git a/lib/librte_mempool/rte_mempool.h b/lib/librte_mempool/rte_mempool.h
index 60339bd..b659565 100644
--- a/lib/librte_mempool/rte_mempool.h
+++ b/lib/librte_mempool/rte_mempool.h
@@ -67,6 +67,7 @@ 
 #include <inttypes.h>
 #include <sys/queue.h>
 
+#include <rte_spinlock.h>
 #include <rte_log.h>
 #include <rte_debug.h>
 #include <rte_lcore.h>
@@ -204,9 +205,13 @@  struct rte_mempool_memhdr {
 struct rte_mempool {
 	char name[RTE_MEMPOOL_NAMESIZE]; /**< Name of mempool. */
 	struct rte_ring *ring;           /**< Ring to store objects. */
+	union {
+		void *pool;              /**< Ring or pool to store objects */
+		uint64_t *pool_id;       /**< External mempool identifier */
+	};
 	const struct rte_memzone *mz;    /**< Memzone where pool is allocated */
 	int flags;                       /**< Flags of the mempool. */
-	int socket_id;                   /**< Socket id passed at mempool creation. */
+	int socket_id;                   /**< Socket id passed at create */
 	uint32_t size;                   /**< Max size of the mempool. */
 	uint32_t cache_size;             /**< Size of per-lcore local cache. */
 	uint32_t cache_flushthresh;
@@ -217,6 +222,14 @@  struct rte_mempool {
 	uint32_t trailer_size;           /**< Size of trailer (after elt). */
 
 	unsigned private_data_size;      /**< Size of private data. */
+	/**
+	 * Index into rte_mempool_handler_table array of mempool handler ops
+	 * structs, which contain callback function pointers.
+	 * We're using an index here rather than pointers to the callbacks
+	 * to facilitate any secondary processes that may want to use
+	 * this mempool.
+	 */
+	int32_t handler_idx;
 
 	struct rte_mempool_cache *local_cache; /**< Per-lcore local cache */
 
@@ -325,6 +338,199 @@  void rte_mempool_check_cookies(const struct rte_mempool *mp,
 #define __mempool_check_cookies(mp, obj_table_const, n, free) do {} while(0)
 #endif /* RTE_LIBRTE_MEMPOOL_DEBUG */
 
+#define RTE_MEMPOOL_HANDLER_NAMESIZE 32 /**< Max length of handler name. */
+
+/**
+ * typedef for allocating the external pool.
+ * The handler's alloc function does whatever it needs to do to grab memory
+ * for this handler, and sets the *pool opaque pointer in the rte_mempool
+ * struct. In the default handler, *pool points to a ring,in other handlers,
+ * it will mostlikely point to a different type of data structure.
+ * It will be transparent to the application programmer.
+ */
+typedef void *(*rte_mempool_alloc_t)(struct rte_mempool *mp);
+
+/** Free the external pool opaque data (that pointed to by *pool) */
+typedef void (*rte_mempool_free_t)(void *p);
+
+/**
+ * Put an object in the external pool.
+ * The *p pointer is the opaque data for a given mempool handler (ring,
+ * array, linked list, etc)
+ */
+typedef int (*rte_mempool_put_t)(void *p, void * const *obj_table, unsigned n);
+
+/** Get an object from the external pool. */
+typedef int (*rte_mempool_get_t)(void *p, void **obj_table, unsigned n);
+
+/** Return the number of available objects in the external pool. */
+typedef unsigned (*rte_mempool_get_count)(void *p);
+
+/** Structure defining a mempool handler. */
+struct rte_mempool_ops {
+	char name[RTE_MEMPOOL_HANDLER_NAMESIZE]; /**< Name of mempool handler */
+	rte_mempool_alloc_t alloc;       /**< Allocate the external pool. */
+	rte_mempool_free_t free;         /**< Free the external pool. */
+	rte_mempool_put_t put;           /**< Put an object. */
+	rte_mempool_get_t get;           /**< Get an object. */
+	rte_mempool_get_count get_count; /**< Get the number of available objs. */
+} __rte_cache_aligned;
+
+#define RTE_MEMPOOL_MAX_HANDLER_IDX 16  /**< Max registered handlers */
+
+/**
+ * Structure storing the table of registered handlers, each of which contain
+ * the function pointers for the mempool handler functions.
+ * Each process has it's own storage for this handler struct aray so that
+ * the mempools can be shared across primary and secondary processes.
+ * The indices used to access the array are valid across processes, whereas
+ * any function pointers stored directly in the mempool struct would not be.
+ * This results in us simply having "handler_idx" in the mempool struct.
+ */
+struct rte_mempool_handler_table {
+	rte_spinlock_t sl;     /**< Spinlock for add/delete. */
+	uint32_t num_handlers; /**< Number of handlers in the table. */
+	/**
+	 * Storage for all possible handlers.
+	 */
+	struct rte_mempool_ops handler_ops[RTE_MEMPOOL_MAX_HANDLER_IDX];
+} __rte_cache_aligned;
+
+/** Array of registered handlers */
+extern struct rte_mempool_handler_table rte_mempool_handler_table;
+
+/**
+ * @internal Get the mempool handler from its index.
+ *
+ * @param handler_idx
+ *   The index of the handler in the handler table. It must be a valid
+ *   index: (0 <= idx < num_handlers).
+ * @return
+ *   The pointer to the handler in the table.
+ */
+static inline struct rte_mempool_ops *
+rte_mempool_handler_get(int handler_idx)
+{
+	return &rte_mempool_handler_table.handler_ops[handler_idx];
+}
+
+/**
+ * @internal wrapper for external mempool manager alloc callback.
+ *
+ * @param mp
+ *   Pointer to the memory pool.
+ * @return
+ *   The opaque pointer to the external pool.
+ */
+void *
+rte_mempool_ops_alloc(struct rte_mempool *mp);
+
+/**
+ * @internal wrapper for external mempool manager get callback.
+ *
+ * @param mp
+ *   Pointer to the memory pool.
+ * @param obj_table
+ *   Pointer to a table of void * pointers (objects).
+ * @param n
+ *   Number of objects to get.
+ * @return
+ *   - 0: Success; got n objects.
+ *   - <0: Error; code of handler get function.
+ */
+static inline int
+rte_mempool_ops_dequeue_bulk(struct rte_mempool *mp,
+		void **obj_table, unsigned n)
+{
+	struct rte_mempool_ops *handler;
+
+	handler = rte_mempool_handler_get(mp->handler_idx);
+	return handler->get(mp->pool, obj_table, n);
+}
+
+/**
+ * @internal wrapper for external mempool manager put callback.
+ *
+ * @param mp
+ *   Pointer to the memory pool.
+ * @param obj_table
+ *   Pointer to a table of void * pointers (objects).
+ * @param n
+ *   Number of objects to put.
+ * @return
+ *   - 0: Success; n objects supplied.
+ *   - <0: Error; code of handler put function.
+ */
+static inline int
+rte_mempool_ops_enqueue_bulk(struct rte_mempool *mp, void * const *obj_table,
+		unsigned n)
+{
+	struct rte_mempool_ops *handler;
+
+	handler = rte_mempool_handler_get(mp->handler_idx);
+	return handler->put(mp->pool, obj_table, n);
+}
+
+/**
+ * @internal wrapper for external mempool manager get_count callback.
+ *
+ * @param mp
+ *   Pointer to the memory pool.
+ * @return
+ *   The number of available objects in the external pool.
+ */
+unsigned
+rte_mempool_ops_get_count(const struct rte_mempool *mp);
+
+/**
+ * @internal wrapper for external mempool manager free callback.
+ *
+ * @param mp
+ *   Pointer to the memory pool.
+ */
+void
+rte_mempool_ops_free(struct rte_mempool *mp);
+
+/**
+ * Set the handler of a mempool
+ *
+ * This can only be done on a mempool that is not populated, i.e. just after
+ * a call to rte_mempool_create_empty().
+ *
+ * @param mp
+ *   Pointer to the memory pool.
+ * @param name
+ *   Name of the handler.
+ * @return
+ *   - 0: Sucess; the new handler is configured.
+ *   - EINVAL - Invalid handler name provided
+ *   - EEXIST - mempool already has a handler assigned
+ */
+int
+rte_mempool_set_handler(struct rte_mempool *mp, const char *name);
+
+/**
+ * Register an external pool handler.
+ *
+ * @param h
+ *   Pointer to the external pool handler
+ * @return
+ *   - >=0: Sucess; return the index of the handler in the table.
+ *   - EINVAL - missing callbacks while registering handler
+ *   - ENOSPC - the maximum number of handlers has been reached
+ */
+int rte_mempool_handler_register(const struct rte_mempool_ops *h);
+
+/**
+ * Macro to statically register an external pool handler.
+ */
+#define MEMPOOL_REGISTER_HANDLER(h)					\
+	void mp_hdlr_init_##h(void);					\
+	void __attribute__((constructor, used)) mp_hdlr_init_##h(void)	\
+	{								\
+		rte_mempool_handler_register(&h);			\
+	}
+
 /**
  * An object callback function for mempool.
  *
@@ -774,7 +980,7 @@  __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
 	cache->len += n;
 
 	if (cache->len >= flushthresh) {
-		rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache_size],
+		rte_mempool_ops_enqueue_bulk(mp, &cache->objs[cache_size],
 				cache->len - cache_size);
 		cache->len = cache_size;
 	}
@@ -785,19 +991,10 @@  ring_enqueue:
 
 	/* push remaining objects in ring */
 #ifdef RTE_LIBRTE_MEMPOOL_DEBUG
-	if (is_mp) {
-		if (rte_ring_mp_enqueue_bulk(mp->ring, obj_table, n) < 0)
-			rte_panic("cannot put objects in mempool\n");
-	}
-	else {
-		if (rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n) < 0)
-			rte_panic("cannot put objects in mempool\n");
-	}
+	if (rte_mempool_ops_enqueue_bulk(mp, obj_table, n) < 0)
+		rte_panic("cannot put objects in mempool\n");
 #else
-	if (is_mp)
-		rte_ring_mp_enqueue_bulk(mp->ring, obj_table, n);
-	else
-		rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n);
+	rte_mempool_ops_enqueue_bulk(mp, obj_table, n);
 #endif
 }
 
@@ -922,7 +1119,7 @@  rte_mempool_put(struct rte_mempool *mp, void *obj)
  */
 static inline int __attribute__((always_inline))
 __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
-		   unsigned n, int is_mc)
+		   unsigned n, __rte_unused int is_mc)
 {
 	int ret;
 	struct rte_mempool_cache *cache;
@@ -945,7 +1142,8 @@  __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
 		uint32_t req = n + (cache_size - cache->len);
 
 		/* How many do we require i.e. number to fill the cache + the request */
-		ret = rte_ring_mc_dequeue_bulk(mp->ring, &cache->objs[cache->len], req);
+		ret = rte_mempool_ops_dequeue_bulk(mp,
+			&cache->objs[cache->len], req);
 		if (unlikely(ret < 0)) {
 			/*
 			 * In the offchance that we are buffer constrained,
@@ -972,10 +1170,7 @@  __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
 ring_dequeue:
 
 	/* get remaining objects from ring */
-	if (is_mc)
-		ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, n);
-	else
-		ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);
+	ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, n);
 
 	if (ret < 0)
 		__MEMPOOL_STAT_ADD(mp, get_fail, n);
diff --git a/lib/librte_mempool/rte_mempool_handler.c b/lib/librte_mempool/rte_mempool_handler.c
new file mode 100644
index 0000000..ed85d65
--- /dev/null
+++ b/lib/librte_mempool/rte_mempool_handler.c
@@ -0,0 +1,141 @@ 
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *   Copyright(c) 2016 6WIND S.A.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <string.h>
+
+#include <rte_mempool.h>
+
+/* indirect jump table to support external memory pools */
+struct rte_mempool_handler_table rte_mempool_handler_table = {
+	.sl =  RTE_SPINLOCK_INITIALIZER ,
+	.num_handlers = 0
+};
+
+/* add a new handler in rte_mempool_handler_table, return its index */
+int
+rte_mempool_handler_register(const struct rte_mempool_ops *h)
+{
+	struct rte_mempool_ops *handler;
+	int16_t handler_idx;
+
+	rte_spinlock_lock(&rte_mempool_handler_table.sl);
+
+	if (rte_mempool_handler_table.num_handlers >=
+			RTE_MEMPOOL_MAX_HANDLER_IDX) {
+		rte_spinlock_unlock(&rte_mempool_handler_table.sl);
+		RTE_LOG(ERR, MEMPOOL,
+			"Maximum number of mempool handlers exceeded\n");
+		return -ENOSPC;
+	}
+
+	if (h->put == NULL || h->get == NULL || h->get_count == NULL) {
+		rte_spinlock_unlock(&rte_mempool_handler_table.sl);
+		RTE_LOG(ERR, MEMPOOL,
+			"Missing callback while registering mempool handler\n");
+		return -EINVAL;
+	}
+
+	handler_idx = rte_mempool_handler_table.num_handlers++;
+	handler = &rte_mempool_handler_table.handler_ops[handler_idx];
+	snprintf(handler->name, sizeof(handler->name), "%s", h->name);
+	handler->alloc = h->alloc;
+	handler->put = h->put;
+	handler->get = h->get;
+	handler->get_count = h->get_count;
+
+	rte_spinlock_unlock(&rte_mempool_handler_table.sl);
+
+	return handler_idx;
+}
+
+/* wrapper to allocate an external pool handler */
+void *
+rte_mempool_ops_alloc(struct rte_mempool *mp)
+{
+	struct rte_mempool_ops *handler;
+
+	handler = rte_mempool_handler_get(mp->handler_idx);
+	if (handler->alloc == NULL)
+		return NULL;
+	return handler->alloc(mp);
+}
+
+/* wrapper to free an external pool handler */
+void
+rte_mempool_ops_free(struct rte_mempool *mp)
+{
+	struct rte_mempool_ops *handler;
+
+	handler = rte_mempool_handler_get(mp->handler_idx);
+	if (handler->free == NULL)
+		return;
+	return handler->free(mp);
+}
+
+/* wrapper to get available objects in an external pool handler */
+unsigned int
+rte_mempool_ops_get_count(const struct rte_mempool *mp)
+{
+	struct rte_mempool_ops *handler;
+
+	handler = rte_mempool_handler_get(mp->handler_idx);
+	return handler->get_count(mp->pool);
+}
+
+/* sets a handler previously registered by rte_mempool_handler_register */
+int
+rte_mempool_set_handler(struct rte_mempool *mp, const char *name)
+{
+	struct rte_mempool_ops *handler = NULL;
+	unsigned i;
+
+	/* too late, the mempool is already populated */
+	if (mp->flags & MEMPOOL_F_RING_CREATED)
+		return -EEXIST;
+
+	for (i = 0; i < rte_mempool_handler_table.num_handlers; i++) {
+		if (!strcmp(name,
+				rte_mempool_handler_table.handler_ops[i].name)) {
+			handler = &rte_mempool_handler_table.handler_ops[i];
+			break;
+		}
+	}
+
+	if (handler == NULL)
+		return -EINVAL;
+
+	mp->handler_idx = i;
+	return 0;
+}