[v6,4/5] net/af_xdp: use mbuf mempool for buffer management

Message ID 20190326122029.59359-5-xiaolong.ye@intel.com
State Superseded, archived
Delegated to: Ferruh Yigit
Headers show
Series
  • Introduce AF_XDP PMD
Related show

Checks

Context Check Description
ci/Intel-compilation success Compilation OK
ci/checkpatch success coding style OK

Commit Message

Ye Xiaolong March 26, 2019, 12:20 p.m.
Now, af_xdp registered memory buffer is managed by rte_mempool. mbuf
allocated from rte_mempool can be converted to xdp_desc's address and
vice versa.

Signed-off-by: Xiaolong Ye <xiaolong.ye@intel.com>
---
 drivers/net/af_xdp/rte_eth_af_xdp.c | 117 +++++++++++++++++-----------
 1 file changed, 72 insertions(+), 45 deletions(-)

Comments

Olivier Matz March 29, 2019, 5:42 p.m. | #1
Hi Xiaolong,

On Tue, Mar 26, 2019 at 08:20:28PM +0800, Xiaolong Ye wrote:
> Now, af_xdp registered memory buffer is managed by rte_mempool. mbuf
> allocated from rte_mempool can be converted to xdp_desc's address and
> vice versa.
> 
> Signed-off-by: Xiaolong Ye <xiaolong.ye@intel.com>
> ---
>  drivers/net/af_xdp/rte_eth_af_xdp.c | 117 +++++++++++++++++-----------
>  1 file changed, 72 insertions(+), 45 deletions(-)
> 
> diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c b/drivers/net/af_xdp/rte_eth_af_xdp.c
> index 47a496ed7..a1fda9212 100644
> --- a/drivers/net/af_xdp/rte_eth_af_xdp.c
> +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
> @@ -48,7 +48,11 @@ static int af_xdp_logtype;
>  
>  #define ETH_AF_XDP_FRAME_SIZE		XSK_UMEM__DEFAULT_FRAME_SIZE
>  #define ETH_AF_XDP_NUM_BUFFERS		4096
> -#define ETH_AF_XDP_DATA_HEADROOM	0
> +/* mempool hdrobj size (64 bytes) + sizeof(struct rte_mbuf) (128 bytes) */
> +#define ETH_AF_XDP_MBUF_OVERHEAD	192
> +/* data start from offset 320 (192 + 128) bytes */
> +#define ETH_AF_XDP_DATA_HEADROOM				\
> +	(ETH_AF_XDP_MBUF_OVERHEAD + RTE_PKTMBUF_HEADROOM)

Having these constants looks quite dangerous too me. It imposes the size
of the mbuf, and the mempool header size. It would at least require
compilation checks.

[...]

> +	umem->mb_pool = rte_pktmbuf_pool_create_with_flags("af_xdp_mempool",
> +			ETH_AF_XDP_NUM_BUFFERS,
> +			250, 0,
> +			ETH_AF_XDP_FRAME_SIZE -
> +			ETH_AF_XDP_MBUF_OVERHEAD,
> +			MEMPOOL_F_NO_SPREAD | MEMPOOL_CHUNK_F_PAGE_ALIGN,
> +			SOCKET_ID_ANY);
> +	if (umem->mb_pool == NULL || umem->mb_pool->nb_mem_chunks != 1) {
> +		AF_XDP_LOG(ERR, "Failed to create mempool\n");
>  		goto err;
>  	}
> +	base_addr = (void *)get_base_addr(umem->mb_pool);

Creating a mempool in the pmd like this does not look good to me for
several reasons:
- the user application creates its mempool with a specific private
  area in its mbufs. Here there is no private area, so it will break
  applications doing this.
- in patch 3 (mempool), you ensure that the chunk starts at a
  page-aligned address, and you expect that given the other flags and
  the constants at the top of the file, the data will be aligned. In
  my opinion it is not ideal.
- the user application may create a large number of mbufs, for instance
  if the application manages large reassembly queues, or tcp sockets.
  Here the driver limits the number of mbufs to 4k per rx queue.
- the socket_id is any, so it won't be efficient on numa architectures.

May I suggest another idea?

Allocate the xsk_umem almost like in patch 1, but using rte_memzone
allocation instead of posix_memalign() (and it will be faster, because
it will use hugepages). And do not allocate any mempool in the driver.

When you receive a packet in the xsk_umem, allocate a new mbuf from
the standard pool. Then, use rte_pktmbuf_attach_extbuf() to attach the
xsk memory to the mbuf. You will have to register a callback to return
the xsk memory when the mbuf is transmitted or freed.

This is, by the way, something I don't understand in your current
implementation: what happens if a mbuf is received in the af_xdp driver,
and freed by the application? How does the xsk buffer is returned?

Using rte_pktmbuf_attach_extbuf() would remove changes in mbuf and
mempool, at the price of (maybe) decreasing the performance. But I think
there are some places where it can be optimized.

I understand my feedback comes late -- as usual :( -- but if you are in
a hurry for RC1, maybe we can consider to put the 1st patch only, and
add the zero-copy mode in a second patch later. What do you think?

Regards,
Olivier
Ye Xiaolong March 31, 2019, 12:38 p.m. | #2
Hi, Olivier

Thanks for the comments.

On 03/29, Olivier Matz wrote:
>Hi Xiaolong,
>
>On Tue, Mar 26, 2019 at 08:20:28PM +0800, Xiaolong Ye wrote:
>> Now, af_xdp registered memory buffer is managed by rte_mempool. mbuf
>> allocated from rte_mempool can be converted to xdp_desc's address and
>> vice versa.
>> 
>> Signed-off-by: Xiaolong Ye <xiaolong.ye@intel.com>
>> ---
>>  drivers/net/af_xdp/rte_eth_af_xdp.c | 117 +++++++++++++++++-----------
>>  1 file changed, 72 insertions(+), 45 deletions(-)
>> 
>> diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c b/drivers/net/af_xdp/rte_eth_af_xdp.c
>> index 47a496ed7..a1fda9212 100644
>> --- a/drivers/net/af_xdp/rte_eth_af_xdp.c
>> +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
>> @@ -48,7 +48,11 @@ static int af_xdp_logtype;
>>  
>>  #define ETH_AF_XDP_FRAME_SIZE		XSK_UMEM__DEFAULT_FRAME_SIZE
>>  #define ETH_AF_XDP_NUM_BUFFERS		4096
>> -#define ETH_AF_XDP_DATA_HEADROOM	0
>> +/* mempool hdrobj size (64 bytes) + sizeof(struct rte_mbuf) (128 bytes) */
>> +#define ETH_AF_XDP_MBUF_OVERHEAD	192
>> +/* data start from offset 320 (192 + 128) bytes */
>> +#define ETH_AF_XDP_DATA_HEADROOM				\
>> +	(ETH_AF_XDP_MBUF_OVERHEAD + RTE_PKTMBUF_HEADROOM)
>
>Having these constants looks quite dangerous too me. It imposes the size
>of the mbuf, and the mempool header size. It would at least require
>compilation checks.
>
>[...]
>
>> +	umem->mb_pool = rte_pktmbuf_pool_create_with_flags("af_xdp_mempool",
>> +			ETH_AF_XDP_NUM_BUFFERS,
>> +			250, 0,
>> +			ETH_AF_XDP_FRAME_SIZE -
>> +			ETH_AF_XDP_MBUF_OVERHEAD,
>> +			MEMPOOL_F_NO_SPREAD | MEMPOOL_CHUNK_F_PAGE_ALIGN,
>> +			SOCKET_ID_ANY);
>> +	if (umem->mb_pool == NULL || umem->mb_pool->nb_mem_chunks != 1) {
>> +		AF_XDP_LOG(ERR, "Failed to create mempool\n");
>>  		goto err;
>>  	}
>> +	base_addr = (void *)get_base_addr(umem->mb_pool);
>
>Creating a mempool in the pmd like this does not look good to me for
>several reasons:
>- the user application creates its mempool with a specific private
>  area in its mbufs. Here there is no private area, so it will break
>  applications doing this.
>- in patch 3 (mempool), you ensure that the chunk starts at a
>  page-aligned address, and you expect that given the other flags and
>  the constants at the top of the file, the data will be aligned. In
>  my opinion it is not ideal.
>- the user application may create a large number of mbufs, for instance
>  if the application manages large reassembly queues, or tcp sockets.
>  Here the driver limits the number of mbufs to 4k per rx queue.
>- the socket_id is any, so it won't be efficient on numa architectures.

Our mbuf/mempool change regarding to zero copy does have limitations.

>
>May I suggest another idea?
>
>Allocate the xsk_umem almost like in patch 1, but using rte_memzone
>allocation instead of posix_memalign() (and it will be faster, because
>it will use hugepages). And do not allocate any mempool in the driver.
>

rte_memzone_reserve_aligned is better than posix_memalign, I'll use it in my
first patch.

>When you receive a packet in the xsk_umem, allocate a new mbuf from
>the standard pool. Then, use rte_pktmbuf_attach_extbuf() to attach the
>xsk memory to the mbuf. You will have to register a callback to return
>the xsk memory when the mbuf is transmitted or freed.

I'll try to investigate how to implement it.

>
>This is, by the way, something I don't understand in your current
>implementation: what happens if a mbuf is received in the af_xdp driver,
>and freed by the application? How does the xsk buffer is returned?

It is coordinated by the fill ring. The fill ring is used by the application (
user space) to send down addr for the kernel to fill in with Rx packet data.
So for the free side, we just return it to the mempool, and each time in 
rx_pkt_burst, we would allocate new mbufs and submit corresponding addrs to fill 
ring, that's how we return the xsk buffer to kernel.

>
>Using rte_pktmbuf_attach_extbuf() would remove changes in mbuf and
>mempool, at the price of (maybe) decreasing the performance. But I think
>there are some places where it can be optimized.
>
>I understand my feedback comes late -- as usual :( -- but if you are in

Sorry for not Cc you in my patch set.

>a hurry for RC1, maybe we can consider to put the 1st patch only, and
>add the zero-copy mode in a second patch later. What do you think?

Sounds a sensible plan, I'll try to exteranl mbuf buffer scheme first.


Thanks,
Xiaolong

>
>Regards,
>Olivier
>
>
Zhang, Qi Z April 1, 2019, 5:47 a.m. | #3
> -----Original Message-----
> From: Ye, Xiaolong
> Sent: Sunday, March 31, 2019 8:38 PM
> To: Olivier Matz <olivier.matz@6wind.com>
> Cc: dev@dpdk.org; David Marchand <david.marchand@redhat.com>; Andrew
> Rybchenko <arybchenko@solarflare.com>; Zhang, Qi Z <qi.z.zhang@intel.com>;
> Karlsson, Magnus <magnus.karlsson@intel.com>; Topel, Bjorn
> <bjorn.topel@intel.com>; Maxime Coquelin <maxime.coquelin@redhat.com>;
> Stephen Hemminger <stephen@networkplumber.org>; Yigit, Ferruh
> <ferruh.yigit@intel.com>; Luca Boccassi <bluca@debian.org>; Richardson, Bruce
> <bruce.richardson@intel.com>; Ananyev, Konstantin
> <konstantin.ananyev@intel.com>
> Subject: Re: [dpdk-dev] [PATCH v6 4/5] net/af_xdp: use mbuf mempool for
> buffer management
> 
> Hi, Olivier
> 
> Thanks for the comments.
> 
> On 03/29, Olivier Matz wrote:
> >Hi Xiaolong,
> >
> >On Tue, Mar 26, 2019 at 08:20:28PM +0800, Xiaolong Ye wrote:
> >> Now, af_xdp registered memory buffer is managed by rte_mempool. mbuf
> >> allocated from rte_mempool can be converted to xdp_desc's address and
> >> vice versa.
> >>
> >> Signed-off-by: Xiaolong Ye <xiaolong.ye@intel.com>
> >> ---
> >>  drivers/net/af_xdp/rte_eth_af_xdp.c | 117
> >> +++++++++++++++++-----------
> >>  1 file changed, 72 insertions(+), 45 deletions(-)
> >>
> >> diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c
> >> b/drivers/net/af_xdp/rte_eth_af_xdp.c
> >> index 47a496ed7..a1fda9212 100644
> >> --- a/drivers/net/af_xdp/rte_eth_af_xdp.c
> >> +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
> >> @@ -48,7 +48,11 @@ static int af_xdp_logtype;
> >>
> >>  #define ETH_AF_XDP_FRAME_SIZE
> 	XSK_UMEM__DEFAULT_FRAME_SIZE
> >>  #define ETH_AF_XDP_NUM_BUFFERS		4096
> >> -#define ETH_AF_XDP_DATA_HEADROOM	0
> >> +/* mempool hdrobj size (64 bytes) + sizeof(struct rte_mbuf) (128 bytes) */
> >> +#define ETH_AF_XDP_MBUF_OVERHEAD	192
> >> +/* data start from offset 320 (192 + 128) bytes */
> >> +#define ETH_AF_XDP_DATA_HEADROOM				\
> >> +	(ETH_AF_XDP_MBUF_OVERHEAD + RTE_PKTMBUF_HEADROOM)
> >
> >Having these constants looks quite dangerous too me. It imposes the
> >size of the mbuf, and the mempool header size. It would at least
> >require compilation checks.
> >
> >[...]
> >
> >> +	umem->mb_pool =
> rte_pktmbuf_pool_create_with_flags("af_xdp_mempool",
> >> +			ETH_AF_XDP_NUM_BUFFERS,
> >> +			250, 0,
> >> +			ETH_AF_XDP_FRAME_SIZE -
> >> +			ETH_AF_XDP_MBUF_OVERHEAD,
> >> +			MEMPOOL_F_NO_SPREAD |
> MEMPOOL_CHUNK_F_PAGE_ALIGN,
> >> +			SOCKET_ID_ANY);
> >> +	if (umem->mb_pool == NULL || umem->mb_pool->nb_mem_chunks != 1)
> {
> >> +		AF_XDP_LOG(ERR, "Failed to create mempool\n");
> >>  		goto err;
> >>  	}
> >> +	base_addr = (void *)get_base_addr(umem->mb_pool);
> >
> >Creating a mempool in the pmd like this does not look good to me for
> >several reasons:
> >- the user application creates its mempool with a specific private
> >  area in its mbufs. Here there is no private area, so it will break
> >  applications doing this.
> >- in patch 3 (mempool), you ensure that the chunk starts at a
> >  page-aligned address, and you expect that given the other flags and
> >  the constants at the top of the file, the data will be aligned. In
> >  my opinion it is not ideal.
> >- the user application may create a large number of mbufs, for instance
> >  if the application manages large reassembly queues, or tcp sockets.
> >  Here the driver limits the number of mbufs to 4k per rx queue.
> >- the socket_id is any, so it won't be efficient on numa architectures.
> 
> Our mbuf/mempool change regarding to zero copy does have limitations.

Just want to clarify, the above code is only reached for non-zero copy case.
here we create a private memory pool be used to manage AF_XDP umem, it's not the Rx queues' s memory pool itself.
so I don't think there is concern on the private area and 4k per rx queue for above code.
 
> 
> >
> >May I suggest another idea?
> >
> >Allocate the xsk_umem almost like in patch 1, but using rte_memzone
> >allocation instead of posix_memalign() (and it will be faster, because
> >it will use hugepages). And do not allocate any mempool in the driver.
> >
> 
> rte_memzone_reserve_aligned is better than posix_memalign, I'll use it in my
> first patch.
> 
> >When you receive a packet in the xsk_umem, allocate a new mbuf from the
> >standard pool. Then, use rte_pktmbuf_attach_extbuf() to attach the xsk
> >memory to the mbuf. You will have to register a callback to return the
> >xsk memory when the mbuf is transmitted or freed.
> 
> I'll try to investigate how to implement it.
> 
> >
> >This is, by the way, something I don't understand in your current
> >implementation: what happens if a mbuf is received in the af_xdp
> >driver, and freed by the application? How does the xsk buffer is returned?
> 
> It is coordinated by the fill ring. The fill ring is used by the application ( user space)
> to send down addr for the kernel to fill in with Rx packet data.
> So for the free side, we just return it to the mempool, and each time in
> rx_pkt_burst, we would allocate new mbufs and submit corresponding addrs to
> fill ring, that's how we return the xsk buffer to kernel.
> 
> >
> >Using rte_pktmbuf_attach_extbuf() would remove changes in mbuf and
> >mempool, at the price of (maybe) decreasing the performance. But I think
> >there are some places where it can be optimized.
> >
> >I understand my feedback comes late -- as usual :( -- but if you are in
> 
> Sorry for not Cc you in my patch set.
> 
> >a hurry for RC1, maybe we can consider to put the 1st patch only, and
> >add the zero-copy mode in a second patch later. What do you think?
> 
> Sounds a sensible plan, I'll try to exteranl mbuf buffer scheme first.
> 
> 
> Thanks,
> Xiaolong
> 
> >
> >Regards,
> >Olivier
> >
> >

Patch

diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c b/drivers/net/af_xdp/rte_eth_af_xdp.c
index 47a496ed7..a1fda9212 100644
--- a/drivers/net/af_xdp/rte_eth_af_xdp.c
+++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
@@ -48,7 +48,11 @@  static int af_xdp_logtype;
 
 #define ETH_AF_XDP_FRAME_SIZE		XSK_UMEM__DEFAULT_FRAME_SIZE
 #define ETH_AF_XDP_NUM_BUFFERS		4096
-#define ETH_AF_XDP_DATA_HEADROOM	0
+/* mempool hdrobj size (64 bytes) + sizeof(struct rte_mbuf) (128 bytes) */
+#define ETH_AF_XDP_MBUF_OVERHEAD	192
+/* data start from offset 320 (192 + 128) bytes */
+#define ETH_AF_XDP_DATA_HEADROOM				\
+	(ETH_AF_XDP_MBUF_OVERHEAD + RTE_PKTMBUF_HEADROOM)
 #define ETH_AF_XDP_DFLT_NUM_DESCS	XSK_RING_CONS__DEFAULT_NUM_DESCS
 #define ETH_AF_XDP_DFLT_QUEUE_IDX	0
 
@@ -61,7 +65,7 @@  struct xsk_umem_info {
 	struct xsk_ring_prod fq;
 	struct xsk_ring_cons cq;
 	struct xsk_umem *umem;
-	struct rte_ring *buf_ring;
+	struct rte_mempool *mb_pool;
 	void *buffer;
 };
 
@@ -123,10 +127,30 @@  static struct rte_eth_link pmd_link = {
 	.link_autoneg = ETH_LINK_AUTONEG
 };
 
+static inline struct rte_mbuf *
+addr_to_mbuf(struct xsk_umem_info *umem, uint64_t addr)
+{
+	uint64_t offset = (addr / ETH_AF_XDP_FRAME_SIZE *
+			ETH_AF_XDP_FRAME_SIZE);
+	struct rte_mbuf *mbuf = (struct rte_mbuf *)((uint64_t)umem->buffer +
+				    offset + ETH_AF_XDP_MBUF_OVERHEAD -
+				    sizeof(struct rte_mbuf));
+	mbuf->data_off = addr - offset - ETH_AF_XDP_MBUF_OVERHEAD;
+	return mbuf;
+}
+
+static inline uint64_t
+mbuf_to_addr(struct xsk_umem_info *umem, struct rte_mbuf *mbuf)
+{
+	return (uint64_t)mbuf->buf_addr + mbuf->data_off -
+		(uint64_t)umem->buffer;
+}
+
 static inline int
 reserve_fill_queue(struct xsk_umem_info *umem, int reserve_size)
 {
 	struct xsk_ring_prod *fq = &umem->fq;
+	struct rte_mbuf *mbuf;
 	uint32_t idx;
 	int i, ret;
 
@@ -138,13 +162,15 @@  reserve_fill_queue(struct xsk_umem_info *umem, int reserve_size)
 
 	for (i = 0; i < reserve_size; i++) {
 		__u64 *fq_addr;
-		void *addr = NULL;
-		if (rte_ring_dequeue(umem->buf_ring, &addr)) {
+		uint64_t addr;
+		mbuf = rte_pktmbuf_alloc(umem->mb_pool);
+		if (unlikely(mbuf == NULL)) {
 			i--;
 			break;
 		}
+		addr = mbuf_to_addr(umem, mbuf);
 		fq_addr = xsk_ring_prod__fill_addr(fq, idx++);
-		*fq_addr = (uint64_t)addr;
+		*fq_addr = addr;
 	}
 
 	xsk_ring_prod__submit(fq, i);
@@ -196,7 +222,7 @@  eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 		rx_bytes += len;
 		bufs[count++] = mbufs[i];
 
-		rte_ring_enqueue(umem->buf_ring, (void *)addr);
+		rte_pktmbuf_free(addr_to_mbuf(umem, addr));
 	}
 
 	xsk_ring_cons__release(rx, rcvd);
@@ -219,7 +245,7 @@  static void pull_umem_cq(struct xsk_umem_info *umem, int size)
 	for (i = 0; i < n; i++) {
 		uint64_t addr;
 		addr = *xsk_ring_cons__comp_addr(cq, idx_cq++);
-		rte_ring_enqueue(umem->buf_ring, (void *)addr);
+		rte_pktmbuf_free(addr_to_mbuf(umem, addr));
 	}
 
 	xsk_ring_cons__release(cq, n);
@@ -248,7 +274,7 @@  eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 	struct pkt_tx_queue *txq = queue;
 	struct xsk_umem_info *umem = txq->pair->umem;
 	struct rte_mbuf *mbuf;
-	void *addrs[ETH_AF_XDP_TX_BATCH_SIZE];
+	struct rte_mbuf *mbuf_to_tx;
 	unsigned long tx_bytes = 0;
 	int i, valid = 0;
 	uint32_t idx_tx;
@@ -257,11 +283,6 @@  eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 
 	pull_umem_cq(umem, nb_pkts);
 
-	nb_pkts = rte_ring_dequeue_bulk(umem->buf_ring, addrs,
-					nb_pkts, NULL);
-	if (nb_pkts == 0)
-		return 0;
-
 	if (xsk_ring_prod__reserve(&txq->tx, nb_pkts, &idx_tx) != nb_pkts) {
 		kick_tx(txq);
 		return 0;
@@ -275,7 +296,12 @@  eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 		desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx + i);
 		mbuf = bufs[i];
 		if (mbuf->pkt_len <= buf_len) {
-			desc->addr = (uint64_t)addrs[valid];
+			mbuf_to_tx = rte_pktmbuf_alloc(umem->mb_pool);
+			if (unlikely(mbuf_to_tx == NULL)) {
+				rte_pktmbuf_free(mbuf);
+				continue;
+			}
+			desc->addr = mbuf_to_addr(umem, mbuf_to_tx);
 			desc->len = mbuf->pkt_len;
 			pkt = xsk_umem__get_data(umem->buffer,
 						 desc->addr);
@@ -291,10 +317,6 @@  eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 
 	kick_tx(txq);
 
-	if (valid < nb_pkts)
-		rte_ring_enqueue_bulk(umem->buf_ring, &addrs[valid],
-				 nb_pkts - valid, NULL);
-
 	txq->stats.err_pkts += nb_pkts - valid;
 	txq->stats.tx_pkts += valid;
 	txq->stats.tx_bytes += tx_bytes;
@@ -443,16 +465,29 @@  eth_link_update(struct rte_eth_dev *dev __rte_unused,
 
 static void xdp_umem_destroy(struct xsk_umem_info *umem)
 {
-	free(umem->buffer);
-	umem->buffer = NULL;
-
-	rte_ring_free(umem->buf_ring);
-	umem->buf_ring = NULL;
+	rte_mempool_free(umem->mb_pool);
+	umem->mb_pool = NULL;
 
 	rte_free(umem);
 	umem = NULL;
 }
 
+static inline uint64_t get_base_addr(struct rte_mempool *mp)
+{
+	struct rte_mempool_memhdr *memhdr;
+
+	memhdr = STAILQ_FIRST(&mp->mem_list);
+	return (uint64_t)(memhdr->addr);
+}
+
+static inline uint64_t get_len(struct rte_mempool *mp)
+{
+	struct rte_mempool_memhdr *memhdr;
+
+	memhdr = STAILQ_FIRST(&mp->mem_list);
+	return (uint64_t)(memhdr->len);
+}
+
 static struct xsk_umem_info *xdp_umem_configure(void)
 {
 	struct xsk_umem_info *umem;
@@ -461,9 +496,8 @@  static struct xsk_umem_info *xdp_umem_configure(void)
 		.comp_size = ETH_AF_XDP_DFLT_NUM_DESCS,
 		.frame_size = ETH_AF_XDP_FRAME_SIZE,
 		.frame_headroom = ETH_AF_XDP_DATA_HEADROOM };
-	void *bufs = NULL;
+	void *base_addr = NULL;
 	int ret;
-	uint64_t i;
 
 	umem = rte_zmalloc_socket("umem", sizeof(*umem), 0, rte_socket_id());
 	if (umem == NULL) {
@@ -471,26 +505,20 @@  static struct xsk_umem_info *xdp_umem_configure(void)
 		return NULL;
 	}
 
-	umem->buf_ring = rte_ring_create("af_xdp_ring",
-					 ETH_AF_XDP_NUM_BUFFERS,
-					 SOCKET_ID_ANY,
-					 0x0);
-	if (umem->buf_ring == NULL) {
-		AF_XDP_LOG(ERR, "Failed to create rte_ring\n");
+	umem->mb_pool = rte_pktmbuf_pool_create_with_flags("af_xdp_mempool",
+			ETH_AF_XDP_NUM_BUFFERS,
+			250, 0,
+			ETH_AF_XDP_FRAME_SIZE -
+			ETH_AF_XDP_MBUF_OVERHEAD,
+			MEMPOOL_F_NO_SPREAD | MEMPOOL_CHUNK_F_PAGE_ALIGN,
+			SOCKET_ID_ANY);
+	if (umem->mb_pool == NULL || umem->mb_pool->nb_mem_chunks != 1) {
+		AF_XDP_LOG(ERR, "Failed to create mempool\n");
 		goto err;
 	}
+	base_addr = (void *)get_base_addr(umem->mb_pool);
 
-	for (i = 0; i < ETH_AF_XDP_NUM_BUFFERS; i++)
-		rte_ring_enqueue(umem->buf_ring,
-				 (void *)(i * ETH_AF_XDP_FRAME_SIZE +
-					  ETH_AF_XDP_DATA_HEADROOM));
-
-	if (posix_memalign(&bufs, getpagesize(),
-			   ETH_AF_XDP_NUM_BUFFERS * ETH_AF_XDP_FRAME_SIZE)) {
-		AF_XDP_LOG(ERR, "Failed to allocate memory pool.\n");
-		goto err;
-	}
-	ret = xsk_umem__create(&umem->umem, bufs,
+	ret = xsk_umem__create(&umem->umem, base_addr,
 			       ETH_AF_XDP_NUM_BUFFERS * ETH_AF_XDP_FRAME_SIZE,
 			       &umem->fq, &umem->cq,
 			       &usr_config);
@@ -499,7 +527,7 @@  static struct xsk_umem_info *xdp_umem_configure(void)
 		AF_XDP_LOG(ERR, "Failed to create umem");
 		goto err;
 	}
-	umem->buffer = bufs;
+	umem->buffer = base_addr;
 
 	return umem;
 
@@ -903,10 +931,9 @@  rte_pmd_af_xdp_remove(struct rte_vdev_device *dev)
 
 	internals = eth_dev->data->dev_private;
 
-	rte_ring_free(internals->umem->buf_ring);
-	rte_free(internals->umem->buffer);
 	rte_free(internals->umem);
 
+	rte_mempool_free(internals->umem->mb_pool);
 	rte_eth_dev_release_port(eth_dev);