[dpdk-dev,6/6] mempool: add in the RTE_NEXT_ABI protection for ABI breakages

Message ID 1455634095-4183-7-git-send-email-david.hunt@intel.com (mailing list archive)
State Superseded, archived
Headers

Commit Message

Hunt, David Feb. 16, 2016, 2:48 p.m. UTC
v2: Kept all the NEXT_ABI defs to this patch so as to make the
previous patches easier to read, and also to imake it clear what
code is necessary to keep ABI compatibility when NEXT_ABI is
disabled.

Signed-off-by: David Hunt <david.hunt@intel.com>
---
 app/test/Makefile                |   2 +
 app/test/test_mempool_perf.c     |   3 +
 lib/librte_mbuf/rte_mbuf.c       |   7 ++
 lib/librte_mempool/Makefile      |   2 +
 lib/librte_mempool/rte_mempool.c | 240 ++++++++++++++++++++++++++++++++++++++-
 lib/librte_mempool/rte_mempool.h |  68 ++++++++++-
 6 files changed, 320 insertions(+), 2 deletions(-)
  

Comments

Olivier Matz Feb. 19, 2016, 1:33 p.m. UTC | #1
Hi David,

On 02/16/2016 03:48 PM, David Hunt wrote:
> v2: Kept all the NEXT_ABI defs to this patch so as to make the
> previous patches easier to read, and also to imake it clear what
> code is necessary to keep ABI compatibility when NEXT_ABI is
> disabled.
> 
> Signed-off-by: David Hunt <david.hunt@intel.com>
> ---
>  app/test/Makefile                |   2 +
>  app/test/test_mempool_perf.c     |   3 +
>  lib/librte_mbuf/rte_mbuf.c       |   7 ++
>  lib/librte_mempool/Makefile      |   2 +
>  lib/librte_mempool/rte_mempool.c | 240 ++++++++++++++++++++++++++++++++++++++-
>  lib/librte_mempool/rte_mempool.h |  68 ++++++++++-
>  6 files changed, 320 insertions(+), 2 deletions(-)

Given the size of this patch, I don't think it's worth adding the
NEXT ABI in that case.

Regards,
Olivier
  

Patch

diff --git a/app/test/Makefile b/app/test/Makefile
index 9a2f75f..8fcf0c2 100644
--- a/app/test/Makefile
+++ b/app/test/Makefile
@@ -74,7 +74,9 @@  SRCS-$(CONFIG_RTE_LIBRTE_TIMER) += test_timer_perf.c
 SRCS-$(CONFIG_RTE_LIBRTE_TIMER) += test_timer_racecond.c
 
 SRCS-y += test_mempool.c
+ifeq ($(CONFIG_RTE_NEXT_ABI),y)
 SRCS-y += test_ext_mempool.c
+endif
 SRCS-y += test_mempool_perf.c
 
 SRCS-y += test_mbuf.c
diff --git a/app/test/test_mempool_perf.c b/app/test/test_mempool_perf.c
index 091c1df..ca69e49 100644
--- a/app/test/test_mempool_perf.c
+++ b/app/test/test_mempool_perf.c
@@ -161,6 +161,9 @@  per_lcore_mempool_test(__attribute__((unused)) void *arg)
 							   n_get_bulk);
 				if (unlikely(ret < 0)) {
 					rte_mempool_dump(stdout, mp);
+#ifndef RTE_NEXT_ABI
+					rte_ring_dump(stdout, mp->ring);
+#endif
 					/* in this case, objects are lost... */
 					return -1;
 				}
diff --git a/lib/librte_mbuf/rte_mbuf.c b/lib/librte_mbuf/rte_mbuf.c
index 42b0cd1..967d987 100644
--- a/lib/librte_mbuf/rte_mbuf.c
+++ b/lib/librte_mbuf/rte_mbuf.c
@@ -167,6 +167,7 @@  rte_pktmbuf_pool_create(const char *name, unsigned n,
 	mbp_priv.mbuf_data_room_size = data_room_size;
 	mbp_priv.mbuf_priv_size = priv_size;
 
+#ifdef RTE_NEXT_ABI
 #ifdef RTE_MEMPOOL_HANDLER_EXT
 	return rte_mempool_create_ext(name, n, elt_size,
 		cache_size, sizeof(struct rte_pktmbuf_pool_private),
@@ -179,6 +180,12 @@  rte_pktmbuf_pool_create(const char *name, unsigned n,
 		rte_pktmbuf_pool_init, &mbp_priv, rte_pktmbuf_init, NULL,
 		socket_id, 0);
 #endif
+#else
+	return rte_mempool_create(name, n, elt_size,
+		cache_size, sizeof(struct rte_pktmbuf_pool_private),
+		rte_pktmbuf_pool_init, &mbp_priv, rte_pktmbuf_init, NULL,
+		socket_id, 0);
+#endif
 }
 
 /* do some sanity checks on a mbuf: panic if it fails */
diff --git a/lib/librte_mempool/Makefile b/lib/librte_mempool/Makefile
index 4f72546..8038785 100644
--- a/lib/librte_mempool/Makefile
+++ b/lib/librte_mempool/Makefile
@@ -42,9 +42,11 @@  LIBABIVER := 1
 
 # all source are stored in SRCS-y
 SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_mempool.c
+ifeq ($(CONFIG_RTE_NEXT_ABI),y)
 SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_mempool_default.c
 SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_mempool_stack.c
 SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  custom_mempool.c
+endif
 ifeq ($(CONFIG_RTE_LIBRTE_XEN_DOM0),y)
 SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_dom0_mempool.c
 endif
diff --git a/lib/librte_mempool/rte_mempool.c b/lib/librte_mempool/rte_mempool.c
index 44bc92f..53e44ff 100644
--- a/lib/librte_mempool/rte_mempool.c
+++ b/lib/librte_mempool/rte_mempool.c
@@ -59,7 +59,9 @@ 
 #include <rte_spinlock.h>
 
 #include "rte_mempool.h"
+#ifdef RTE_NEXT_ABI
 #include "rte_mempool_internal.h"
+#endif
 
 TAILQ_HEAD(rte_mempool_list, rte_tailq_entry);
 
@@ -400,6 +402,7 @@  rte_mempool_create(const char *name, unsigned n, unsigned elt_size,
 					       MEMPOOL_PG_SHIFT_MAX);
 }
 
+#ifdef RTE_NEXT_ABI
 /*
  * Common mempool create function.
  * Create the mempool over already allocated chunk of memory.
@@ -698,6 +701,229 @@  rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
 
 	return mp;
 }
+#else
+/*
+ * Create the mempool over already allocated chunk of memory.
+ * That external memory buffer can consists of physically disjoint pages.
+ * Setting vaddr to NULL, makes mempool to fallback to original behaviour
+ * and allocate space for mempool and it's elements as one big chunk of
+ * physically continuos memory.
+ * */
+struct rte_mempool *
+rte_mempool_xmem_create(const char *name, unsigned n, unsigned elt_size,
+		unsigned cache_size, unsigned private_data_size,
+		rte_mempool_ctor_t *mp_init, void *mp_init_arg,
+		rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
+		int socket_id, unsigned flags, void *vaddr,
+		const phys_addr_t paddr[], uint32_t pg_num, uint32_t pg_shift)
+{
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+	char rg_name[RTE_RING_NAMESIZE];
+	struct rte_mempool_list *mempool_list;
+	struct rte_mempool *mp = NULL;
+	struct rte_tailq_entry *te;
+	struct rte_ring *r;
+	const struct rte_memzone *mz;
+	size_t mempool_size;
+	int mz_flags = RTE_MEMZONE_1GB|RTE_MEMZONE_SIZE_HINT_ONLY;
+	int rg_flags = 0;
+	void *obj;
+	struct rte_mempool_objsz objsz;
+	void *startaddr;
+	int page_size = getpagesize();
+
+	/* compilation-time checks */
+	RTE_BUILD_BUG_ON((sizeof(struct rte_mempool) &
+			  RTE_CACHE_LINE_MASK) != 0);
+#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
+	RTE_BUILD_BUG_ON((sizeof(struct rte_mempool_cache) &
+			  RTE_CACHE_LINE_MASK) != 0);
+	RTE_BUILD_BUG_ON((offsetof(struct rte_mempool, local_cache) &
+			  RTE_CACHE_LINE_MASK) != 0);
+#endif
+#ifdef RTE_LIBRTE_MEMPOOL_DEBUG
+	RTE_BUILD_BUG_ON((sizeof(struct rte_mempool_debug_stats) &
+			  RTE_CACHE_LINE_MASK) != 0);
+	RTE_BUILD_BUG_ON((offsetof(struct rte_mempool, stats) &
+			  RTE_CACHE_LINE_MASK) != 0);
+#endif
+
+	mempool_list = RTE_TAILQ_CAST(rte_mempool_tailq.head, rte_mempool_list);
+
+	/* asked cache too big */
+	if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE ||
+	    CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
+		rte_errno = EINVAL;
+		return NULL;
+	}
+
+	/* check that we have both VA and PA */
+	if (vaddr != NULL && paddr == NULL) {
+		rte_errno = EINVAL;
+		return NULL;
+	}
+
+	/* Check that pg_num and pg_shift parameters are valid. */
+	if (pg_num < RTE_DIM(mp->elt_pa) || pg_shift > MEMPOOL_PG_SHIFT_MAX) {
+		rte_errno = EINVAL;
+		return NULL;
+	}
+
+	/* "no cache align" imply "no spread" */
+	if (flags & MEMPOOL_F_NO_CACHE_ALIGN)
+		flags |= MEMPOOL_F_NO_SPREAD;
+
+	/* ring flags */
+	if (flags & MEMPOOL_F_SP_PUT)
+		rg_flags |= RING_F_SP_ENQ;
+	if (flags & MEMPOOL_F_SC_GET)
+		rg_flags |= RING_F_SC_DEQ;
+
+	/* calculate mempool object sizes. */
+	if (!rte_mempool_calc_obj_size(elt_size, flags, &objsz)) {
+		rte_errno = EINVAL;
+		return NULL;
+	}
+
+	rte_rwlock_write_lock(RTE_EAL_MEMPOOL_RWLOCK);
+
+	/* allocate the ring that will be used to store objects */
+	/* Ring functions will return appropriate errors if we are
+	 * running as a secondary process etc., so no checks made
+	 * in this function for that condition */
+	snprintf(rg_name, sizeof(rg_name), RTE_MEMPOOL_MZ_FORMAT, name);
+	r = rte_ring_create(rg_name, rte_align32pow2(n+1), socket_id, rg_flags);
+	if (r == NULL)
+		goto exit;
+
+	/*
+	 * reserve a memory zone for this mempool: private data is
+	 * cache-aligned
+	 */
+	private_data_size = (private_data_size +
+		RTE_MEMPOOL_ALIGN_MASK) & (~RTE_MEMPOOL_ALIGN_MASK);
+
+	if (!rte_eal_has_hugepages()) {
+		/*
+		 * expand private data size to a whole page, so that the
+		 * first pool element will start on a new standard page
+		 */
+		int head = sizeof(struct rte_mempool);
+		int new_size = (private_data_size + head) % page_size;
+
+		if (new_size)
+			private_data_size += page_size - new_size;
+	}
+
+	/* try to allocate tailq entry */
+	te = rte_zmalloc("MEMPOOL_TAILQ_ENTRY", sizeof(*te), 0);
+	if (te == NULL) {
+		RTE_LOG(ERR, MEMPOOL, "Cannot allocate tailq entry!\n");
+		goto exit;
+	}
+
+	/*
+	 * If user provided an external memory buffer, then use it to
+	 * store mempool objects. Otherwise reserve a memzone that is large
+	 * enough to hold mempool header and metadata plus mempool objects.
+	 */
+	mempool_size = MEMPOOL_HEADER_SIZE(mp, pg_num) + private_data_size;
+	mempool_size = RTE_ALIGN_CEIL(mempool_size, RTE_MEMPOOL_ALIGN);
+	if (vaddr == NULL)
+		mempool_size += (size_t)objsz.total_size * n;
+
+	if (!rte_eal_has_hugepages()) {
+		/*
+		 * we want the memory pool to start on a page boundary,
+		 * because pool elements crossing page boundaries would
+		 * result in discontiguous physical addresses
+		 */
+		mempool_size += page_size;
+	}
+
+	snprintf(mz_name, sizeof(mz_name), RTE_MEMPOOL_MZ_FORMAT, name);
+
+	mz = rte_memzone_reserve(mz_name, mempool_size, socket_id, mz_flags);
+
+	/*
+	 * no more memory: in this case we loose previously reserved
+	 * space for the ring as we cannot free it
+	 */
+	if (mz == NULL) {
+		rte_free(te);
+		goto exit;
+	}
+
+	if (rte_eal_has_hugepages()) {
+		startaddr = (void *)mz->addr;
+	} else {
+		/* align memory pool start address on a page boundary */
+		unsigned long addr = (unsigned long)mz->addr;
+
+		if (addr & (page_size - 1)) {
+			addr += page_size;
+			addr &= ~(page_size - 1);
+		}
+		startaddr = (void *)addr;
+	}
+
+	/* init the mempool structure */
+	mp = startaddr;
+	memset(mp, 0, sizeof(*mp));
+	snprintf(mp->name, sizeof(mp->name), "%s", name);
+	mp->phys_addr = mz->phys_addr;
+	mp->ring = r;
+	mp->size = n;
+	mp->flags = flags;
+	mp->elt_size = objsz.elt_size;
+	mp->header_size = objsz.header_size;
+	mp->trailer_size = objsz.trailer_size;
+	mp->cache_size = cache_size;
+	mp->cache_flushthresh = CALC_CACHE_FLUSHTHRESH(cache_size);
+	mp->private_data_size = private_data_size;
+
+	/* calculate address of the first element for continuous mempool. */
+	obj = (char *)mp + MEMPOOL_HEADER_SIZE(mp, pg_num) +
+		private_data_size;
+	obj = RTE_PTR_ALIGN_CEIL(obj, RTE_MEMPOOL_ALIGN);
+
+	/* populate address translation fields. */
+	mp->pg_num = pg_num;
+	mp->pg_shift = pg_shift;
+	mp->pg_mask = RTE_LEN2MASK(mp->pg_shift, typeof(mp->pg_mask));
+
+	/* mempool elements allocated together with mempool */
+	if (vaddr == NULL) {
+		mp->elt_va_start = (uintptr_t)obj;
+		mp->elt_pa[0] = mp->phys_addr +
+			(mp->elt_va_start - (uintptr_t)mp);
+
+	/* mempool elements in a separate chunk of memory. */
+	} else {
+		mp->elt_va_start = (uintptr_t)vaddr;
+		memcpy(mp->elt_pa, paddr, sizeof(mp->elt_pa[0]) * pg_num);
+	}
+
+	mp->elt_va_end = mp->elt_va_start;
+
+	/* call the initializer */
+	if (mp_init)
+		mp_init(mp, mp_init_arg);
+
+	mempool_populate(mp, n, 1, obj_init, obj_init_arg);
+
+	te->data = (void *) mp;
+
+	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+	TAILQ_INSERT_TAIL(mempool_list, te, next);
+	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+exit:
+	rte_rwlock_write_unlock(RTE_EAL_MEMPOOL_RWLOCK);
+
+	return mp;
+}
+#endif
 
 /* Return the number of entries in the mempool */
 unsigned
@@ -705,7 +931,11 @@  rte_mempool_count(const struct rte_mempool *mp)
 {
 	unsigned count;
 
+#ifdef RTE_NEXT_ABI
 	count = rte_mempool_ext_get_count(mp);
+#else
+	count = rte_ring_count(mp->ring);
+#endif
 
 #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
 	{
@@ -861,6 +1091,9 @@  rte_mempool_dump(FILE *f, const struct rte_mempool *mp)
 
 	fprintf(f, "mempool <%s>@%p\n", mp->name, mp);
 	fprintf(f, "  flags=%x\n", mp->flags);
+#ifndef RTE_NEXT_ABI
+	fprintf(f, "  ring=<%s>@%p\n", mp->ring->name, mp->ring);
+#endif
 	fprintf(f, "  phys_addr=0x%" PRIx64 "\n", mp->phys_addr);
 	fprintf(f, "  size=%"PRIu32"\n", mp->size);
 	fprintf(f, "  header_size=%"PRIu32"\n", mp->header_size);
@@ -883,7 +1116,11 @@  rte_mempool_dump(FILE *f, const struct rte_mempool *mp)
 			mp->size);
 
 	cache_count = rte_mempool_dump_cache(f, mp);
+#ifdef RTE_NEXT_ABI
 	common_count = rte_mempool_ext_get_count(mp);
+#else
+	common_count = rte_ring_count(mp->ring);
+#endif
 	if ((cache_count + common_count) > mp->size)
 		common_count = mp->size - cache_count;
 	fprintf(f, "  common_pool_count=%u\n", common_count);
@@ -978,7 +1215,7 @@  void rte_mempool_walk(void (*func)(const struct rte_mempool *, void *),
 	rte_rwlock_read_unlock(RTE_EAL_MEMPOOL_RWLOCK);
 }
 
-
+#ifdef RTE_NEXT_ABI
 /* create the mempool using an external mempool manager */
 struct rte_mempool *
 rte_mempool_create_ext(const char *name, unsigned n, unsigned elt_size,
@@ -1004,3 +1241,4 @@  rte_mempool_create_ext(const char *name, unsigned n, unsigned elt_size,
 
 
 }
+#endif
diff --git a/lib/librte_mempool/rte_mempool.h b/lib/librte_mempool/rte_mempool.h
index 8d8201f..e676296 100644
--- a/lib/librte_mempool/rte_mempool.h
+++ b/lib/librte_mempool/rte_mempool.h
@@ -88,8 +88,10 @@  extern "C" {
 struct rte_mempool_debug_stats {
 	uint64_t put_bulk;         /**< Number of puts. */
 	uint64_t put_objs;         /**< Number of objects successfully put. */
+#ifdef RTE_NEXT_ABI
 	uint64_t put_pool_bulk;    /**< Number of puts into pool. */
 	uint64_t put_pool_objs;    /**< Number of objects into pool. */
+#endif
 	uint64_t get_success_bulk; /**< Successful allocation number. */
 	uint64_t get_success_objs; /**< Objects successfully allocated. */
 	uint64_t get_fail_bulk;    /**< Failed allocation number. */
@@ -177,6 +179,7 @@  struct rte_mempool_objtlr {
 #endif
 };
 
+#ifdef RTE_NEXT_ABI
 /* Handler functions for external mempool support */
 typedef void *(*rte_mempool_alloc_t)(struct rte_mempool *mp,
 		const char *name, unsigned n, int socket_id, unsigned flags);
@@ -250,12 +253,16 @@  rte_mempool_ext_get_count(const struct rte_mempool *mp);
  */
 int
 rte_mempool_ext_free(struct rte_mempool *mp);
+#endif
 
 /**
  * The RTE mempool structure.
  */
 struct rte_mempool {
 	char name[RTE_MEMPOOL_NAMESIZE]; /**< Name of mempool. */
+#ifndef RTE_NEXT_ABI
+	struct rte_ring *ring;           /**< Ring to store objects. */
+#endif
 	phys_addr_t phys_addr;           /**< Phys. addr. of mempool struct. */
 	int flags;                       /**< Flags of the mempool. */
 	uint32_t size;                   /**< Size of the mempool. */
@@ -269,10 +276,12 @@  struct rte_mempool {
 
 	unsigned private_data_size;      /**< Size of private data. */
 
+#ifdef RTE_NEXT_ABI
 	/* Common pool data structure pointer */
 	void *rt_pool;
 
 	int16_t handler_idx;
+#endif
 
 #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
 	/** Per-lcore local cache. */
@@ -303,9 +312,10 @@  struct rte_mempool {
 #define MEMPOOL_F_NO_CACHE_ALIGN 0x0002 /**< Do not align objs on cache lines.*/
 #define MEMPOOL_F_SP_PUT         0x0004 /**< Default put is "single-producer".*/
 #define MEMPOOL_F_SC_GET         0x0008 /**< Default get is "single-consumer".*/
+#ifdef RTE_NEXT_ABI
 #define MEMPOOL_F_USE_STACK      0x0010 /**< Use a stack for the common pool. */
 #define MEMPOOL_F_INT_HANDLER    0x0020 /**< Using internal mempool handler */
-
+#endif
 
 /**
  * @internal When debug is enabled, store some statistics.
@@ -836,7 +846,11 @@  void rte_mempool_dump(FILE *f, const struct rte_mempool *mp);
  */
 static inline void __attribute__((always_inline))
 __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
+#ifdef RTE_NEXT_ABI
 		    unsigned n, __attribute__((unused)) int is_mp)
+#else
+		    unsigned n, int is_mp)
+#endif
 {
 #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
 	struct rte_mempool_cache *cache;
@@ -852,7 +866,12 @@  __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
 
 #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
 	/* cache is not enabled or single producer or non-EAL thread */
+#ifdef RTE_NEXT_ABI
 	if (unlikely(cache_size == 0 || lcore_id >= RTE_MAX_LCORE))
+#else
+	if (unlikely(cache_size == 0 || is_mp == 0 ||
+		     lcore_id >= RTE_MAX_LCORE))
+#endif
 		goto ring_enqueue;
 
 	/* Go straight to ring if put would overflow mem allocated for cache */
@@ -875,9 +894,15 @@  __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
 
 	cache->len += n;
 
+#ifdef RTE_NEXT_ABI
 	if (unlikely(cache->len >= flushthresh)) {
 		rte_mempool_ext_put_bulk(mp, &cache->objs[cache_size],
 				cache->len - cache_size);
+#else
+	if (cache->len >= flushthresh) {
+		rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache_size],
+				cache->len - cache_size);
+#endif
 		cache->len = cache_size;
 	}
 
@@ -886,10 +911,28 @@  __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
 ring_enqueue:
 #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
 
+#ifdef RTE_NEXT_ABI
 	/* Increment stats counter to tell us how many pool puts happened */
 	__MEMPOOL_STAT_ADD(mp, put_pool, n);
 
 	rte_mempool_ext_put_bulk(mp, obj_table, n);
+#else
+	/* push remaining objects in ring */
+#ifdef RTE_LIBRTE_MEMPOOL_DEBUG
+	if (is_mp) {
+		if (rte_ring_mp_enqueue_bulk(mp->ring, obj_table, n) < 0)
+			rte_panic("cannot put objects in mempool\n");
+	} else {
+		if (rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n) < 0)
+			rte_panic("cannot put objects in mempool\n");
+	}
+#else
+	if (is_mp)
+		rte_ring_mp_enqueue_bulk(mp->ring, obj_table, n);
+	else
+		rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n);
+#endif
+#endif
 }
 
 
@@ -1013,7 +1056,11 @@  rte_mempool_put(struct rte_mempool *mp, void *obj)
  */
 static inline int __attribute__((always_inline))
 __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
+#ifdef RTE_NEXT_ABI
 		   unsigned n, __attribute__((unused))int is_mc)
+#else
+		   unsigned n, int is_mc)
+#endif
 {
 	int ret;
 #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
@@ -1024,8 +1071,13 @@  __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
 	uint32_t cache_size = mp->cache_size;
 
 	/* cache is not enabled or single consumer */
+#ifdef RTE_NEXT_ABI
 	if (unlikely(cache_size == 0 || n >= cache_size ||
 						lcore_id >= RTE_MAX_LCORE))
+#else
+	if (unlikely(cache_size == 0 || is_mc == 0 ||
+		     n >= cache_size || lcore_id >= RTE_MAX_LCORE))
+#endif
 		goto ring_dequeue;
 
 	cache = &mp->local_cache[lcore_id];
@@ -1037,8 +1089,13 @@  __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
 		uint32_t req = n + (cache_size - cache->len);
 
 		/* How many do we require i.e. number to fill the cache + the request */
+#ifdef RTE_NEXT_ABI
 		ret = rte_mempool_ext_get_bulk(mp,
 						&cache->objs[cache->len], req);
+#else
+		ret = rte_ring_mc_dequeue_bulk(mp->ring,
+						&cache->objs[cache->len], req);
+#endif
 		if (unlikely(ret < 0)) {
 			/*
 			 * In the offchance that we are buffer constrained,
@@ -1066,7 +1123,14 @@  ring_dequeue:
 #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
 
 	/* get remaining objects from ring */
+#ifdef RTE_NEXT_ABI
 	ret = rte_mempool_ext_get_bulk(mp, obj_table, n);
+#else
+	if (is_mc)
+		ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, n);
+	else
+		ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);
+#endif
 
 	if (ret < 0)
 		__MEMPOOL_STAT_ADD(mp, get_fail, n);
@@ -1468,6 +1532,7 @@  ssize_t rte_mempool_xmem_usage(void *vaddr, uint32_t elt_num, size_t elt_sz,
  */
 void rte_mempool_walk(void (*func)(const struct rte_mempool *, void *arg),
 		      void *arg);
+#ifdef RTE_NEXT_ABI
 
 /**
  * Function to get the name of a mempool handler
@@ -1541,6 +1606,7 @@  rte_mempool_create_ext(const char *name, unsigned n, unsigned elt_size,
 		rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg,
 		int socket_id, unsigned flags,
 		const char *handler_name);
+#endif
 
 #ifdef __cplusplus
 }